medeiroz / laravel-amqp-toolkit
Laravel AMQP Toolkit 用于发布和消费 RabbitMQ 消息
Requires
- php: ^8.1
- ext-mbstring: *
- ext-sockets: *
- illuminate/contracts: ^10.0|^11.0
- php-amqplib/php-amqplib: ^3.6
- spatie/laravel-package-tools: ^1.14.0
Requires (Dev)
- larastan/larastan: ^2.0.1
- laravel/pint: ^1.0
- nunomaduro/collision: ^7.8|^8.0
- orchestra/testbench: ^8.8|^9.0
- pestphp/pest: ^2.20
- pestphp/pest-plugin-arch: ^2.0
- pestphp/pest-plugin-laravel: ^2.0
- phpstan/extension-installer: ^1.1
- phpstan/phpstan-deprecation-rules: ^1.0
- phpstan/phpstan-phpunit: ^1.0
- spatie/laravel-ray: ^1.26
README
此包旨在简化 Laravel 应用程序与 RabbitMQ 的集成,提供消费和发布消息的功能,同时还通过架构迁移的方式(灵感来源于 Laravel 的数据库迁移)提供了一种简单的方式来管理 AMQP / RabbitMQ 基础设施。
如果您需要从 RabbitMQ 队列中消费消息、在交换中发布消息或管理 RabbitMQ 基础设施,此包适合您。
如果您在管理 RabbitMQ 架构(包括队列、交换和推土机)方面遇到问题,此包也适合您。
主要功能
- 架构迁移: 使用架构迁移来创建、删除和管理 RabbitMQ 中的队列、交换和推土机,类似于 Laravel 为数据库提供的服务。
- 消费队列: 以简单高效的方式从 RabbitMQ 队列中消费消息的能力。
- 发布消息: 在交换中或直接在 RabbitMQ 队列中发布消息的能力。
- 事件监听器: 监听从 AMQP / RabbitMQ 队列和交换接收到的消息的能力,类似于 Laravel 的事件监听器。
要求
- PHP >= 8.1
- Laravel >= 10
安装
按照以下步骤安装包
- 使用 composer 安装
medeiroz/laravel-amqp-toolkit
包 - 发布并运行迁移
- 发布配置文件
- 环境变量 .env
- 配置 AMQP 队列和 Laravel 监听器
1. 安装包
运行以下命令通过 composer 安装包
composer require medeiroz/laravel-amqp-toolkit
2. 发布迁移
您必须使用以下命令发布数据库迁移
php artisan vendor:publish --tag="amqp-toolkit-migrations"
然后运行以下命令在数据库中创建表。
php artisan migrate
3. 发布配置文件
您可以使用以下命令发布配置文件
php artisan vendor:publish --tag="amqp-toolkit-config"
这是已发布的配置文件的内容
<?php // config for Medeiroz/AmqpToolkit return [ 'schemas' => base_path('amqp-toolkit-schemas'), 'table_name' => env('AMQP_TABLE_NAME', 'amqp_schemas'), 'max-attempts' => env('AMQP_MAX_ATTEMPTS', 10), 'heartbeat' => env('AMQP_HEARTBEAT', 30), 'keepalive' => env('AMQP_KEEPALIVE', true), /** * The default connection to use when no connection is provided to the AMQP client. */ 'connection' => env('AMQP_CONNECTION', 'rabbitmq'), /** * The default logging channel to use when no channel is provided to the AMQP client. * You can use the same channels as the Laravel logging configuration * Like as 'stack', 'single', 'daily' etc... */ 'logging-channel' => env('AMQP_LOG_CHANNEL', env('LOG_CHANNEL')), /** * The queues to be consumed by the consumer command without arguments. */ 'consumer-queues' => [ // 'payment-received' => \App\Listeners\PaymentReceivedListener::class, ], 'connections' => [ 'rabbitmq' => [ 'host' => env('AMQP_HOST', 'localhost'), 'port' => env('AMQP_PORT', 5672), 'api-port' => env('AMQP_API_PORT', 15672), 'user' => env('AMQP_USER', 'guest'), 'password' => env('AMQP_PASSWORD', ''), 'vhost' => env('AMQP_VHOST', '/'), ], ], ];
4. 环境变量 .env
编辑 .env
文件并添加您的 AMQP / RabbitMQ 服务器环境变量。
AMQP_HOST=your-amqp-host AMQP_PORT=5672 AMQP_API_PORT=15672 AMQP_USER=user AMQP_PASSWORD=password AMQP_VHOST=/
请根据您的环境替换值。
5. 配置 AMQP 队列和 Laravel 监听器
编辑 config/amqp-toolkit.php
文件并添加您想要消费的队列和当收到消息时将被执行的监听器。
数组键是队列名称,值是当收到消息时将被执行的监听器类。
示例:将监听器 PaymentReceivedListener
绑定到队列 payment-received
/** * The queues to be consumed by the consumer command without arguments. */ 'consumer-queues' => [ 'payment-received' => \App\Listeners\PaymentReceivedListener::class, ],
请参阅配置文件以获取更多详细信息。
使用方法
架构迁移
允许的架构类型包括
- 队列
- 交换
- 推土机
创建架构
php artisan amqp:make-schema {type} {name}
示例
php artisan amqp:make-schema queue my-first-queue php artisan amqp:make-schema exchange my-exchange php artisan amqp:make-schema shovel my-shovel
运行迁移
php artisan amqp:migrate
回滚迁移
php artisan amqp:migrate --rollback --step=1
回滚所有迁移
php artisan amqp:migrate --refresh
向队列或交换发布消息
use Medeiroz\AmqpToolkit\Facades\AmqpPublisher; AmqpPublisher::onQueue(['say' => 'Hello World'], 'my-queue-name'); AmqpPublisher::onExchange(['say' => 'Hello World'], 'my-exchange-name'); AmqpPublisher::onExchange(['say' => 'Hello World with routing key'], 'my-exchange-name', 'my-routing-key');
开始消费 AMQP / RabbitMQ 队列
运行所有队列的消费者
要开始消费所有队列,必须运行以下 artisan 命令
php artisan amqp:consumer
要开始消费特定队列,必须运行以下 artisan 命令
其中 my-first-queue
和 payment-received
是您想要消费的队列名称。
php artisan amqp:consumer my-first-queue payment-received
监听消息
本包提供了一种监听从 AMQP / RabbitMQ 队列和交换机接收到的消息的方法,类似于 Laravel 的事件监听器。
1. 自动监听注册
当在 config/amqp-toolkit.php
文件中配置了队列和监听器时,监听器将在执行消费者时自动注册。
2. 手动监听注册
编辑 app/Providers/EventServiceProvider.php
文件并添加您想监听的事件。
事件名称应为 amqp.QUEUE_NAME
,其中 QUEUE_NAME
是您想监听的队列名称。
app/Providers/EventServiceProvider.php
public function boot() { Event::listen( 'amqp:payment-received', \App\Listeners\PaymentReceivedListener::class, ); Event::listen( 'amqp:my-queue', \App\Listeners\MyQueueListener::class, ); Event::listen( 'amqp:*', \App\Listeners\AllListener::class, ); }
注意:amqp:* 事件是一个特殊的事件,它监听所有队列接收到的消息。只有当消费者正在执行时才会调用队列事件。
创建事件监听器
要为要监听的事件创建监听器,请运行以下命令
php artisan make:listener PaymentReceivedListener
此包提供了一个事件对象 Medeiroz\AmqpToolkit\Events\AmqpReceivedMessageEvent
,它包含队列名称和消息体。
<?php namespace App\Listeners; use Illuminate\Contracts\Queue\ShouldQueue; use Medeiroz\AmqpToolkit\Events\AmqpReceivedMessageEvent; class PaymentReceivedListener { public function handle(AmqpReceivedMessageEvent $event): void { \Log::debug('Queue' . $event->queue); \Log::debug('Message Body', $event->messageBody); } }
如果您想使用 Laravel Horizon
异步执行事件,可以使用 ShouldQueue
接口。
<?php namespace App\Listeners; use Illuminate\Contracts\Queue\ShouldQueue; use Medeiroz\AmqpToolkit\Events\AmqpReceivedMessageEvent; class PaymentReceivedListener implements ShouldQueue { /* ... */ }
有关 AMQP 迁移架构的更多信息
创建交换机架构
使用以下命令创建一个新的交换机架构
php artisan amqp:make-schema exchange my-exchange
生成的文件
<?php use Medeiroz\AmqpToolkit\SchemaMigration\SchemaMigration; return new class extends SchemaMigration { private const NAME = 'my-exchange'; public function up(): void { $this->createExchangeIfNonExists(self::NAME); } public function down(): void { $this->deleteExchangeIfExists(self::NAME); } };
创建队列架构
使用以下命令创建一个新的队列架构
php artisan amqp:make-schema queue payment-received
生成的文件
<?php use Medeiroz\AmqpToolkit\SchemaMigration\SchemaMigration; return new class extends SchemaMigration { private const NAME = 'payment-received'; public function up(): void { $this->createQueueIfNonExists(self::NAME) ->withRetry() ->withTtl(seconds: 5) ->withDlq(); } public function down(): void { $this->deleteQueueIfExists(self::NAME); $this->deleteQueueIfExists(self::NAME . '.retry'); $this->deleteQueueIfExists(self::NAME . '.dlq'); } };
此架构创建了一个名为 payment-received
的队列,具有重试、TTL 和 DLQ。RabbitMQ 将自动创建 payment-received.retry
和 payment-received.dlq
队列。
您可以通过调用 bind
方法将队列绑定到交换机。
// ... public function up(): void { $this->createQueue('my-queue') ->bind('my-exchange', 'my-route-key'); } // or public function up(): void { $this->createQueue('my-queue'); $this->bind('my-queue', 'my-exchange', 'my-route-key'); }
创建 Shovel 架构
使用以下命令创建一个新的 Shovel 架构
php artisan amqp:make-schema shovel my-shovel
生成的文件
<?php use Medeiroz\AmqpToolkit\SchemaMigration\SchemaMigration; use Medeiroz\AmqpToolkit\SchemaMigration\Shovel\Resource0dot9; use Medeiroz\AmqpToolkit\SchemaMigration\Shovel\Resource1dot0; return new class extends SchemaMigration { private const NAME = 'my-shovel'; public function up(): void { $this->createShovelIfNonExists( name: self::NAME, source: new Resource0dot9( type: 'queue', uri: 'amqp://', queue: 'my-queue-on-amqp-0-9', autoDelete: 'never', addForwardingHeaders: 'No', ), destination: new Resource1dot0( uri: 'amqps://user:password@my-host.servicebus.windows.net:5671/?verify=verify_none', address: 'my-topic-on-service-bus', ), ); } public function down(): void { $this->deleteShovelIfExists(self::NAME); } };
您可以使用 createShovel
或 createShovelIfNonExists
方法创建 Shovel,传入 Shovel 名称、源和目标资源。
source
和 destination
是表示消息 broker
资源的对象。
可用资源
- Resource0dot9
- AMQP 0-9-1 的消息代理资源,与 RabbitMQ、Qpid 等相同。
- Resource1dot0
- AMQP 1.0 的消息代理资源,与 Azure Service Bus、ActiveMQ 等相同。
例如,您创建一个 Shovel 从 Broker A 将消息移动到 Broker B。
- 将消息从 RabbitMQ 移动到 Azure Service Bus
- 将消息从 Azure Service Bus 移动到 RabbitMQ
- 将 AWS Cloud 中的 RabbitMQ 消息移动到 Azure Cloud 中的 RabbitMQ
- 将 Azure Service Bus 中的消息移动到 Azure Service Bus 中的另一个订阅
有关 Shovel 配置的更多信息,请参阅 RabbitMQ Shovel 文档。
注意:Shovel 架构仅适用于 RabbitMQ 3.8.0 或更高版本。
架构迁移中所有可用的方法
createQueue(string $name): Medeiroz\AmqpToolkit\SchemaMigration\Queue
- 创建一个新的队列
createQueueIfNonExists(string $name): Medeiroz\AmqpToolkit\SchemaMigration\Queue
- 如果不存在则创建一个新的队列
deleteQueue(string $name): void
- 删除一个队列
deleteQueueIfExists(string $name): void
- 如果存在则删除一个队列
createExchange(string $name): Medeiroz\AmqpToolkit\SchemaMigration\Exchange
- 创建一个新的交换机
createExchangeIfNonExists(string $name): Medeiroz\AmqpToolkit\SchemaMigration\Exchange
- 如果不存在则创建一个新的交换机
deleteExchange(string $name): void
- 删除一个交换机
deleteExchangeIfExists(string $name): void
- 如果存在则删除一个交换机
bind(string $queue, string $exchange, ?string $routingKey = null): Medeiroz\AmqpToolkit\SchemaMigration\Bind
- 使用路由键将队列绑定到交换机
bindIfExist(string $queue, string $exchange, ?string $routingKey = null): Medeiroz\AmqpToolkit\SchemaMigration\Bind
- 如果队列存在,则将其绑定到具有路由键的交换机
unbind(string $queue, string $exchange, ?string $routingKey = null): void
- 解除队列与交换机及其路由键的绑定
unbindIfExists(string $queue, string $exchange, ?string $routingKey = null): void
- 如果队列存在,则解除队列与交换机及其路由键的绑定
createShovel(string $name): Medeiroz\AmqpToolkit\SchemaMigration\Shovel
- 创建一个新的shovel
createShovelIfNonExists(string $name): Medeiroz\AmqpToolkit\SchemaMigration\Shovel
- 如果shovel不存在,则创建一个新的shovel
deleteShovel(string $name): void
- 删除一个shovel
deleteShovelIfExists(string $name): void
- 如果shovel存在,则删除它
关于Shovel的更多信息
shovel是一个功能,允许你在不同的代理之间移动消息,例如,从RabbitMQ到Azure Service Bus,从Azure Service Bus到RabbitMQ等。
如果你的消息源是Azure Service Bus,而你使用PHP/Laravel编写应用程序代码,你不能直接从Azure Service Bus消费消息。你可以创建一个shovel,将消息从Azure Service Bus移动到RabbitMQ,然后从RabbitMQ消费消息。
从
Azure Service Bus
转发消息到RabbitMQ
你需要为你的shovel创建特定的资源,例如,你下面有这些资源
- 在Azure Service Bus上
- 订阅A
(源)
- 订阅A
- 在RabbitMQ上
- 交换机
(目标)
- 队列A(供你的PHP应用程序消费消息)
- Shovel "My First Shovel"
- 交换机
从
RabbitMQ
转发消息到Azure Service Bus
你需要为你的shovel创建特定的资源,例如,你下面有这些资源
- 在RabbitMQ上
- 交换机
- 队列D
(源)
- Shovel "My Second Shovel"
- 在Azure Service Bus上
- 主题
(目标)
- 主题
测试
composer test
变更日志
有关最近更改的更多信息,请参阅CHANGELOG。
贡献
有关详细信息,请参阅CONTRIBUTING。
安全漏洞
有关如何报告安全漏洞的详细信息,请参阅我们的安全策略。
鸣谢
许可
MIT许可(MIT)。有关更多信息,请参阅许可文件。