solcloud / consumer
RabbitMQ 基础工作队列消费
v3.1
2022-11-22 12:42 UTC
Requires
- php: >= 7.1
- php-amqplib/php-amqplib: ^3.1
- psr/log: *
README
Consumer 需要 AMQPChannel 通道作为唯一依赖。
通道设置
如果你的项目中已经有了通道实例,则可以跳过此步骤,否则请设置 rabbitmq 连接,我们建议使用容器进行此操作。
$config = new \Solcloud\Consumer\QueueConfig(); $config ->setHost('solcloud_rabbitmq') ->setVhost('/') #->setHeartbeatSec(5) ->setUsername('dev') ->setPassword('dev') ; $connectionFactory = new \Solcloud\Consumer\QueueConnectionFactory($config); $connection = $connectionFactory->createSocketConnection(); #(new \PhpAmqpLib\Connection\Heartbeat\PCNTLHeartbeatSender($connection))->register(); // if heartbeat and pcntl_async_signals() is available
从连接创建通道或使用自己的通道
/** @var \PhpAmqpLib\Channel\AMQPChannel $channel */ $channel = $connection->channel();
工作器
为你的业务逻辑创建 worker(消费者)类,并注入 $channel 依赖。你可以扩展 AbstractConsumer 以实现轻量级抽象,或使用 "solcloud 标准" BaseConsumer。在这个例子中我们将使用 BaseConsumer
$worker = new class($channel) extends \Solcloud\Consumer\BaseConsumer { protected function run(): void { // Your hard work here echo "Processing message: " . $this->data->id . PHP_EOL; } };
使用阻塞方法 wait 从队列开始消费消息
$worker->consume($consumeQueueName); while ($worker->hasCallback()) { try { // While we have callback lets enter event loop with some timeout $worker->wait(rand(8, 11)); } catch (\PhpAmqpLib\Exception\AMQPTimeoutException $ex) { echo $ex->getMessage() . PHP_EOL; } } $worker->closeChannel();
消息发布
对于消息发布,你可以直接使用 $worker 或使用 rabbitmq 管理插件或不同的脚本
$worker->publishMessage( $worker->createMessageHelper([], ["id" => 1]), '', $consumeQueueName ); // OR open rabbitmq management and publish: {"meta":[],"data":{"id":1}}
日志记录
工作器可以将日志记录到与 Psr\Log\LoggerInterface
兼容的记录器。
$worker->setLogger(new YourPsrLogger()); $worker->getLogger()->info('Something');
示例
有关此 README 的完整示例,请参阅 example.php