dada-amater / nette-rabbitmq
v2.1.0
2018-07-09 08:12 UTC
Requires
- php: >=7.1
- bunny/bunny: ^0.2.4 || ^0.3 || ^0.4
- symfony/console: ~3.3 || ^4.0
Requires (Dev)
- mockery/mockery: ^1.1
- nette/neon: ^2.4
- nette/tester: ^2.0
README
Nette RabbitMQ
Nette 扩展用于 RabbitMQ (使用 composer 包 jakubkulhan/bunny)
示例配置
下载 composer 包
composer require gamee/nette-rabbitmq
扩展注册
config.neon
extensions:
rabbitmq: Gamee\RabbitMQ\DI\RabbitMQExtension
示例配置
services:
- TestConsumer
rabbitmq:
connections:
default:
user: guest
password: guest
host: localhost
port: 5672
queues:
testQueue:
connection: default
exchanges:
testExchange:
connection: default
type: fanout
queueBindings:
testQueue:
routingKey: testRoutingKey
producers:
testProducer:
exchange: testExchange
# queue: testQueue
contentType: application/json
deliveryMode: 2 # Producer::DELIVERY_MODE_PERSISTENT
consumers:
testConsumer:
queue: testQueue
callback: [@TestConsumer, consume]
qos:
prefetchSize: 0
prefetchCount: 5
发布消息
注意:发布第一条消息后,队列将自动创建。
services.neon
services:
- TestQueue(@Gamee\RabbitMQ\Client::getProducer(testProducer))
TestQueue.php
<?php declare(strict_types=1); use Gamee\RabbitMQ\Producer\Producer; final class TestQueue { /** * @var Producer */ private $testProducer; public function __construct(Producer $testProducer) { $this->testProducer = $testProducer; } public function publish(string $message): void { $json = json_encode(['message' => $message]); $headers = []; $this->testProducer->publish($json, $headers); } }
消费消息
您的消费者回调必须返回一个确认,表明特定消息已被确认(或不同的状态 - unack, reject)。
TestConsumer.php
<?php declare(strict_types=1); use Bunny\Message; use Gamee\RabbitMQ\Consumer\IConsumer; final class TestConsumer implements IConsumer { public function consume(Message $message): int { $messageData = json_decode($message->content); $headers = $message->headers; /** * @todo Some logic here... */ return IConsumer::MESSAGE_ACK; // Or ::MESSAGE_NACK || ::MESSAGE_REJECT } }
通过 CLI 运行消费者
已准备两个消费者命令。 rabbitmq:consumer
将消费指定时间(以秒为单位)的消息。以下命令将消费一小时的消息
php index.php rabbitmq:consumer testConsumer 3600
rabbitmq:staticConsumer
将消费特定数量的消息。以下示例将消费 20 条消息
php index.php rabbitmq:staticConsumer testConsumer 20