English Document
Kafka-php 使用纯粹的 PHP 编写的 kafka 客户端,目前支持 0.8.x 以上版本的 Kafka,该项目 v0.2.x 和 v0.1.x 不兼容,如果使用原有的 v0.1.x 的可以参照文档 Kafka PHP v0.1.x Document , 不过建议切换到 v0.2.x 上。v0.2.x 使用 PHP 异步执行的方式来和 kafka broker 交互,较 v0.1.x 更加稳定高效, 由于使用 PHP 语言编写所以不用编译任何的扩展就可以使用,降低了接入与维护成本
添加 composer 依赖
到项目的
- nmred/kafka-php
文件中即可,如:
- composer.json
- {
- "require": {
- "nmred/kafka-php": "0.2.*"
- }
- }
- <?php
- require '../vendor/autoload.php';
- date_default_timezone_set('PRC');
- use Monolog\Logger;
- use Monolog\Handler\StdoutHandler;
- // Create the logger
- $logger = new Logger('my_logger');
- // Now add some handlers
- $logger->pushHandler(new StdoutHandler());
- // 设置生产相关配置,具体配置参数见 [Configuration](Configuration.md)
- $config = \Kafka\ProducerConfig::getInstance();
- $config->setMetadataRefreshIntervalMs(10000);
- $config->setMetadataBrokerList('10.13.4.159:9192');
- $config->setBrokerVersion('0.9.0.1');
- $config->setRequiredAck(1);
- $config->setIsAsyn(false);
- $config->setProduceInterval(500);
- $producer = new \Kafka\Producer(function() {
- return array(
- array(
- 'topic' => 'test',
- 'value' => 'test....message.',
- 'key' => 'testkey',
- ),
- );
- });
- $producer->setLogger($logger);
- $producer->success(function($result) {
- var_dump($result);
- });
- $producer->error(function($errorCode, $context) {
- var_dump($errorCode);
- });
- $producer->send();
- <?php
- require '../vendor/autoload.php';
- date_default_timezone_set('PRC');
- use Monolog\Logger;
- use Monolog\Handler\StdoutHandler;
- // Create the logger
- $logger = new Logger('my_logger');
- // Now add some handlers
- $logger->pushHandler(new StdoutHandler());
- $config = \Kafka\ConsumerConfig::getInstance();
- $config->setMetadataRefreshIntervalMs(10000);
- $config->setMetadataBrokerList('10.13.4.159:9192');
- $config->setGroupId('test');
- $config->setBrokerVersion('0.9.0.1');
- $config->setTopics(array('test'));
- //$config->setOffsetReset('earliest');
- $consumer = new \Kafka\Consumer();
- $consumer->setLogger($logger);
- $consumer->start(function($topic, $part, $message) {
- var_dump($message);
- });
基础协议 API 调用方式见 Example
来源: http://www.tuicool.com/articles/MZFFzeN