superbalist / php-pubsub-kafka
该软件包已被废弃,不再维护。未建议替代软件包。
php-pubsub 软件包的 Kafka 适配器
2.0.0
2017-05-18 12:48 UTC
Requires
- php: >=5.6.0
- ext-rdkafka: *
- superbalist/php-pubsub: ^2.0
Requires (Dev)
- mockery/mockery: ^0.9.5
- phpunit/phpunit: ^5.5
This package is auto-updated.
Last update: 2024-08-11 23:57:26 UTC
README
php-pubsub 软件包的 Kafka 适配器。
安装
-
$ cd /tmp $ mkdir librdkafka $ cd librdkafka $ git clone https://github.com/edenhill/librdkafka.git . $ ./configure $ make $ make install
-
$ pecl install rdkafka
-
将以下内容添加到您的 php.ini 文件中,以启用 php-rdkafka 扩展:
extension=rdkafka.so
-
composer require superbalist/php-pubsub-kafka
用法
// create consumer $topicConf = new \RdKafka\TopicConf(); $topicConf->set('auto.offset.reset', 'largest'); $conf = new \RdKafka\Conf(); $conf->set('group.id', 'php-pubsub'); $conf->set('metadata.broker.list', '127.0.0.1'); $conf->set('enable.auto.commit', 'false'); $conf->set('offset.store.method', 'broker'); $conf->set('socket.blocking.max.ms', 50); $conf->setDefaultTopicConf($topicConf); $consumer = new \RdKafka\KafkaConsumer($conf); // create producer $conf = new \RdKafka\Conf(); $conf->set('socket.blocking.max.ms', 50); $conf->set('queue.buffering.max.ms', 20); $producer = new \RdKafka\Producer($conf); $producer->addBrokers('127.0.0.1'); $adapter = new \Superbalist\PubSub\Kafka\KafkaPubSubAdapter($producer, $consumer); // consume messages // note: this is a blocking call $adapter->subscribe('my_channel', function ($message) { var_dump($message); }); // publish messages $adapter->publish('my_channel', 'HELLO WORLD'); $adapter->publish('my_channel', ['hello' => 'world']); $adapter->publish('my_channel', 1); $adapter->publish('my_channel', false); // publish multiple messages $messages = [ ['hello' => 'world'], 'lorem ipsum', ]; $adapter->publishBatch('my_channel', $messages);
示例
该库包含适配器的 示例 和运行示例脚本的 Dockerfile。
运行 make up
。
您将进入 bash
提示符,位于 /opt/php-pubsub
目录。
如果您需要另一个 shell 来向阻塞消费者发布消息,可以运行 make shell
运行示例
$ php examples/KafkaConsumerExample.php $ php examples/KafkaPublishExample.php (in a separate shell)