danikdantist / queue-wrapper
kafka包装器
2.0.3
2024-01-18 16:34 UTC
Requires
- php: >=8.0
README
安装
composer require danikdantist/queue-wrapper
如果您想使用Kafka,则需要安装kafka库 https://github.com/edenhill/librdkafka 和php的rdkafka扩展 https://github.com/arnaud-lb/php-rdkafka
您可以使用dockerfile创建已经安装了librdkafka和PHP kafka扩展的Docker镜像: https://github.com/danikdantist/Dockerfiles/blob/master/php/php5-fpm-nginx-kafka/Dockerfile
消费者使用
<?php class Receiver implements DanikDantist\QueueWrapper\Interfaces\iReceivable { public function receiveMessage(DanikDantist\QueueWrapper\Message $message) { echo $message->toString()."\n"; } } class EchoLogger implements DanikDantist\QueueWrapper\Interfaces\iLogable { public function info($info) { echo 'Info: '.$info."\n"; } public function error($error) { echo 'Error: '.$error."\n"; } } $config = new DanikDantist\QueueWrapper\Drivers\Kafka\Config(); $config ->setGroup('my_group') ->addBroker('172.17.0.31:9092') ->addTopic('my-test-topic') ->addTopic('my-test-topic-2') ; $demon = new DanikDantist\QueueWrapper\Manager(new DanikDantist\QueueWrapper\Drivers\Kafka\Connector($config, new EchoLogger())); $demon->addReceiver(new Receiver()); $demon->listenMessage();
生产者使用
<?php class EchoLogger implements DanikDantist\QueueWrapper\Interfaces\iLogable { public function info($info) { echo 'Info: '.$info."\n"; } public function error($error) { echo 'Error: '.$error."\n"; } } $config = new DanikDantist\QueueWrapper\Drivers\Kafka\Config(); $config->addBroker('172.17.0.31:9092'); $demon = new DanikDantist\QueueWrapper\Manager(new DanikDantist\QueueWrapper\Drivers\Kafka\Connector($config, new EchoLogger())); $demon->sendMessage(new DanikDantist\QueueWrapper\Message('My test message', 'my-test-topic'));