avto-dev / amqp-rabbit-manager
RabbitMQ 管理器
Requires
- php: ^8.1
- ext-amqp: *
- enqueue/amqp-ext: ^0.10.19
- illuminate/console: ~10.0 || ~11.0
- illuminate/events: ~10.0 || ~11.0
- illuminate/support: ~10.0 || ~11.0
- queue-interop/queue-interop: ^0.8
- symfony/console: ~6.0 || ~7.0
Requires (Dev)
- laravel/laravel: ~10.0 || ~11.0
- mockery/mockery: ^1.6
- phpstan/phpstan: ^1.10
- phpunit/phpunit: ^10.5
README
Laravel 应用程序的 RabbitMQ 管理器
此包可用于轻松访问 RabbitMQ 实体,如连接或队列。
需要安装 php 扩展
ext-amqp
。安装步骤可以在 Dockerfile 中找到。
安装
使用以下命令通过 composer 安装此包
$ composer require avto-dev/amqp-rabbit-manager "^2.0"
需要安装
composer
(如何安装 composer)。
您需要修复包的主要版本。
之后,您应使用以下命令“发布”包配置文件
$ php ./artisan vendor:publish --provider='AvtoDev\AmqpRabbitManager\ServiceProvider'
并在文件 ./config/rabbitmq.php
中进行配置。
用法
首先,您应执行命令 rabbit:setup
以在 RabbitMQ 服务器上创建所有队列和交换。
然后,在应用程序的任何部分,您都可以解析连接或队列/交换工厂。例如,在 artisan 命令中
<?php namespace App\Console\Commands; use AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface; class SomeCommand extends \Illuminate\Console\Command { /** * The console command name. * * @var string */ protected $name = 'some:command'; /** * Execute the console command. * * @param ConnectionsFactoryInterface $connections * * @return void */ public function handle(ConnectionsFactoryInterface $connections): void { $connections->default(); // Get the default RabbitMQ connection instance } }
手动创建队列
声明队列操作在代理端创建队列(使用命令 rabbit:setup
代替此操作)
<?php /** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */ /** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */ $exchange = $connections ->default() ->declareQueue($queues->make('some-queue-id'));
手动创建交换
声明交换操作在代理端创建主题(使用命令 rabbit:setup
代替此操作)
<?php /** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */ /** @var \AvtoDev\AmqpRabbitManager\ExchangesFactoryInterface $exchanges */ $exchange = $connections ->default() ->declareTopic($exchanges->make('some-exchange-id'));
将队列绑定到交换
将队列连接到交换。因此,来自该主题的消息将进入队列并可以被处理(使用命令 rabbit:setup
事件 \AvtoDev\AmqpRabbitManager\Commands\Events\*
代替此操作)
<?php /** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */ /** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */ /** @var \AvtoDev\AmqpRabbitManager\ExchangesFactoryInterface $exchanges */ $connections ->default() ->bind(new \Interop\Amqp\Impl\AmqpBind( $exchanges->make('some-exchange-id'), $queues->make('some-queue-id') ));
向交换发送消息
创建消息并将其发送到交换
<?php /** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */ /** @var \AvtoDev\AmqpRabbitManager\ExchangesFactoryInterface $exchanges */ $connection = $connections->default(); $message = $connection->createMessage('Hello world!'); $connection ->createProducer() ->send($exchanges->make('some-exchange-id'), $message);
向队列发送消息
创建消息并将其发送到队列
<?php /** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */ /** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */ $connection = $connections->default(); $message = $connection->createMessage('Hello world!'); $connection ->createProducer() ->send($queues->make('some-queue-id'), $message);
发送优先消息
消息优先级用于消息排序
<?php /** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */ /** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */ $connection = $connections->default(); $message = $connection->createMessage('Hello world!'); $connection ->createProducer() ->setPriority(10) // ... ->send($queues->make('some-queue-id'), $message);
发送过期消息
也称为消息 TTL
<?php /** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */ /** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */ $connection = $connections->default(); $message = $connection->createMessage('Hello world!'); $connection ->createProducer() ->setTimeToLive(60000) // 60 sec // ... ->send($queues->make('some-queue-id'), $message);
发送延迟消息
如果可能,请避免使用 enqueue/amqp-tools
延迟策略。如果您手动操作,您可以完全控制它。
获取(消费)单个消息
获取一条消息并继续脚本执行
<?php /** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */ /** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */ $consumer = $connections->default()->createConsumer($queues->make('some-queue-id')); $message = $consumer->receive(); try { // .. process a message .. $consumer->acknowledge($message); } catch (\Exception $e) { // .. process exception .. $consumer->reject($message); }
订阅消费者
启动(几乎是)无限循环以处理消息(您可以在同一时间启动多个消费者,只需调用即可)
<?php /** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */ /** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */ $connection = $connections->default(); $queue = $queues->make('some-queue-id'); $consumer = $connection->createConsumer($queue); $subscriber = $connection->createSubscriptionConsumer(); $subscriber->subscribe( $consumer, function(\Interop\Amqp\AmqpMessage $message, \Enqueue\AmqpExt\AmqpConsumer $consumer): bool { try { // .. process a message .. $consumer->acknowledge($message); } catch (\Exception $e) { // .. process exception .. $consumer->reject($message); return false; // Subscription will be cancelled } return true; // Subscription will be continued } ); $subscriber->consume(); // You can pass timeout in milliseconds
清除队列消息
移除队列中的所有消息
<?php /** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */ /** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */ $connection = $connections->default(); $connection->purgeQueue($queues->make('some-queue-id'));
测试
对于包测试,我们使用 phpunit
框架和 docker-ce
+ docker-compose
作为开发环境。因此,在克隆存储库后,只需在终端中写入
$ make build $ make latest # or 'make lowest' $ make test
变更日志
变更日志可以在 此处找到。
支持
如果您在此包中发现任何错误,请在此存储库中 创建一个问题。
许可
这是一个开源软件,受 MIT 许可证 许可。