hauptmedia / rdkafka
该包已被废弃,不再维护。未建议替代包。
dev-master
2018-08-19 21:43 UTC
Requires
- php: >=5.3
- ext-rdkafka: *
This package is auto-updated.
Last update: 2022-12-15 23:59:47 UTC
README
php-rdkafka 扩展的 PHP 模板。
参见https://github.com/arnaud-lb/php-rdkafka
命名空间 Kafka 中的包装/外观对象
您可以使用命名空间 Kafka 中提供的包装对象,以获得对 RdKafka 对象的更 PHP 风格的接口。
生产者示例
include_once('vendor/autoload.php'); use Kafka\Configuration\ProducerConfiguration; use Kafka\Manager; $producerConfiguration = (new ProducerConfiguration()) ->setRequestRequiredAcks(0); $manager = new Manager(['hostname']); $topic = $manager->createProducerTopicFacade("test", $producerConfiguration); for($i=1;$i<=1000;$i++) { $topic->produce(0 /* partition */, "test" . time()); }
消费者示例
实现 \Kafka\ConsumerInterface
并将该类添加到 ConsumerTopicFacade 对象中。
include_once('../vendor/autoload.php'); class MyConsumer implements \Kafka\ConsumerInterface { /** * @param string $topic Topic name * @param int $partition Partition * @param int $offset Message offset * @param string $key Optional message key * @param string $payload Message payload * @return mixed */ public function consume($topic, $partition, $offset, $key, $payload) { echo "Received message with payload " . $payload; } } $consumerConfiguration = (new ConsumerConfiguration()) ->setAutoCommitIntervalMs(1000) ->setOffsetStoreSyncIntervalMs(60); $manager = new Manager(['hostname']); $consumerTopic = $manager->createConsumerTopicFacade("test", $consumerConfiguration); $consumerTopic->addConsumer(new MyConsumer()); $consumerTopic->consumeStart(0 /* partition */, 0 /* offset */); $consumerTopic->consume(0 /* partition */, 1000 /* timeout in ms */); $consumerTopic->consumeStop(0 /* partition */);