solcloud/consumer

RabbitMQ 基础工作队列消费

v3.1 2022-11-22 12:42 UTC

This package is auto-updated.

Last update: 2024-09-30 01:47:11 UTC


README

Consumer 需要 AMQPChannel 通道作为唯一依赖。

通道设置

如果你的项目中已经有了通道实例,则可以跳过此步骤,否则请设置 rabbitmq 连接,我们建议使用容器进行此操作。

$config = new \Solcloud\Consumer\QueueConfig();
$config
    ->setHost('solcloud_rabbitmq')
    ->setVhost('/')
    #->setHeartbeatSec(5)
    ->setUsername('dev')
    ->setPassword('dev')
;
$connectionFactory = new \Solcloud\Consumer\QueueConnectionFactory($config);
$connection = $connectionFactory->createSocketConnection();
#(new \PhpAmqpLib\Connection\Heartbeat\PCNTLHeartbeatSender($connection))->register(); // if heartbeat and pcntl_async_signals() is available

从连接创建通道或使用自己的通道

/** @var \PhpAmqpLib\Channel\AMQPChannel $channel */
$channel = $connection->channel();

工作器

为你的业务逻辑创建 worker(消费者)类,并注入 $channel 依赖。你可以扩展 AbstractConsumer 以实现轻量级抽象,或使用 "solcloud 标准" BaseConsumer。在这个例子中我们将使用 BaseConsumer

$worker = new class($channel) extends \Solcloud\Consumer\BaseConsumer {

    protected function run(): void
    {
        // Your hard work here
        echo "Processing message: " . $this->data->id . PHP_EOL;
    }

};

使用阻塞方法 wait 从队列开始消费消息

$worker->consume($consumeQueueName);
while ($worker->hasCallback()) {
    try {
        // While we have callback lets enter event loop with some timeout
        $worker->wait(rand(8, 11));
    } catch (\PhpAmqpLib\Exception\AMQPTimeoutException $ex) {
        echo $ex->getMessage() . PHP_EOL;
    }
}
$worker->closeChannel();

消息发布

对于消息发布,你可以直接使用 $worker 或使用 rabbitmq 管理插件或不同的脚本

$worker->publishMessage(
    $worker->createMessageHelper([], ["id" => 1]),
    '',
    $consumeQueueName
); // OR open rabbitmq management and publish: {"meta":[],"data":{"id":1}}

日志记录

工作器可以将日志记录到与 Psr\Log\LoggerInterface 兼容的记录器。

$worker->setLogger(new YourPsrLogger());
$worker->getLogger()->info('Something');

示例

有关此 README 的完整示例,请参阅 example.php