comlaude / laravel-amqp
Laravel和Lumen的AMQP包装器,用于发布和消费消息
Requires
- php: ^7.4|^8.0
- php-amqplib/php-amqplib: >=3.2
Requires (Dev)
- comlaude/php-formatter: ^1.0
- illuminate/support: >=v5.5.28
- mockery/mockery: ^1.0
- php-mock/php-mock-phpunit: ^2.6
- phpunit/phpunit: ^9.2
- tightenco/tlint: ^6.0
README
用于与RabbitMQ交互的简单PhpAmqpLib包装器
安装
Composer
将以下内容添加到composer.json文件中的require部分
"comlaude/laravel-amqp": "^1.0.0"
$ php composer update
或
$ php composer require comlaude/laravel-amqp
集成
Lumen
在Lumen应用的根目录下创建一个config文件夹,并将vendor/comlaude/laravel-amqp/config/amqp.php的内容复制到config/amqp.php。
根据需要调整属性。
return [ 'use' => 'production', 'properties' => [ 'production' => [ 'host' => 'localhost', 'port' => 5672, 'username' => '', 'password' => '', 'vhost' => '/', 'connect_options' => [], 'ssl_options' => [], 'exchange' => 'amq.topic', 'exchange_type' => 'topic', 'exchange_passive' => false, 'exchange_durable' => true, 'exchange_auto_delete' => false, 'exchange_internal' => false, 'exchange_nowait' => false, 'exchange_properties' => [], 'queue_force_declare' => false, 'queue_passive' => false, 'queue_durable' => true, // only change when not using quorum queues 'queue_exclusive' => false, 'queue_auto_delete' => false, // only change when not using quorum queues 'queue_nowait' => false, 'queue_properties' => [ 'x-ha-policy' => ['S', 'all'], 'x-queue-type' => ['S', 'quorum'], // 'x-dead-letter-exchange' => ['S', 'amq.topic-dlx'], // if provided an exchange and queue (queue_name-dlx) will be automatically declared // 'x-delivery-limit' => ['I', 5], // the delivery limit will be set on the relevant queue but not the DLX queue itself ], 'queue_acknowledge_is_final' => true, // if important work is done inside a consumer after the acknowledge call, this should be false 'queue_reject_is_final' => true, // if important work is done inside a consumer after the reject call, this should be false 'consumer_tag' => '', 'consumer_no_local' => false, 'consumer_no_ack' => false, 'consumer_exclusive' => false, 'consumer_nowait' => false, 'timeout' => 0, // seconds 'persistent' => false, 'persistent_restart_period' => 0, // seconds 'request_accepted_timeout' => 0.5, // seconds in decimal accepted 'request_handled_timeout' => 5, // seconds in decimal accepted 'qos' => true, 'qos_prefetch_size' => 0, 'qos_prefetch_count' => 1, 'qos_a_global' => false, /* |-------------------------------------------------------------------------- | An example binding set up when declaring exchange and queues |-------------------------------------------------------------------------- |'bindings' => [ | [ | 'queue' => 'example', | 'routing' => 'example.route.key', | ], |], */ ], ], ];
在bootstrap/app.php中注册Lumen服务提供者
/* |-------------------------------------------------------------------------- | Register Service Providers |-------------------------------------------------------------------------- */ //... $app->configure('amqp'); $app->register(ComLaude\Amqp\LumenServiceProvider::class); //...
为Lumen 5.2+添加Facade支持
//... $app->withFacades(true, [ 'ComLaude\Amqp\Facades\Amqp' => 'Amqp', ]); //...
Laravel
打开config/app.php并添加服务提供者和别名
'ComLaude\Amqp\AmqpServiceProvider',
'Amqp' => 'ComLaude\Amqp\Facades\Amqp',
发布消息
使用路由键推送消息
Amqp::publish('routing-key', 'message');
使用路由键和覆盖属性推送消息
Amqp::publish('routing-key', 'message' , ['exchange' => 'amq.direct']);
消费消息
永久消费消息
Amqp::consume(function ($message) { var_dump($message->body); Amqp::acknowledge($message); });
使用自定义设置消费消息
Amqp::consume(function ($message) { var_dump($message->body); Amqp::acknowledge($message); }, [ 'timeout' => 2, 'vhost' => 'vhost3', 'queue' => 'queue-name', 'persistent' => true // required if you want to listen forever ]);
扇出示例
发布消息
Amqp::publish('', 'message' , [ 'exchange_type' => 'fanout', 'exchange' => 'amq.fanout', ]);
禁用发布
这在开发和同步需求中很有用,如果您使用观察者或事件触发AMQP消息,您可能希望暂时禁用消息的发布。当关闭发布时,publish方法将静默丢弃消息并返回。
检查状态
if(Amqp::isEnabled()) { // It is going to publish }
禁用
Amqp::disable();
启用
Amqp::enable();
远程过程调用服务器和客户端
RPC在微服务世界中可能是一个反模式,因此不要随意使用它,尽管有时您确实需要那种请求-响应行为,并且愿意接受其局限性。只需在消费者处理程序中返回响应,如果消息是来自客户端的请求,则响应将自动路由到正确的请求者。有两个可配置的超时值,以防止无限阻塞等待。
request_accepted_timeout
- 等待服务器确认工作正在进行的超时时间,这是一个检查是否有人正在监听的所有检查之一,应该相当小
request_handled_timeout
- 等待完整请求完成(所有消息)的超时时间,请务必确保这足够大,如果您的作业持续时间较长或要处理的邮件数量较大
服务器
Amqp::consume(function ($message) { Amqp::acknowledge($message); return "I handled this message " . $message->getBody(); });
客户端
Amqp::request('example.routing.key', [ 'message1', 'message2', ], function ($message) { echo("The remote server said " . $message->getBody()); }); // Or for single message requests you can just do $response = Amqp::requestWithResponse('example.routing.key', 'quickly'); // $response is already the message content as a string "I handled this message quickly"
使用配置了死信交换的消息进行消费
当在队列属性中使用x-dead-letter-exchange
参数时,此包将声明额外的
- 声明
-dlx队列 - 声明交换本身
当消费者失败或重新排队消息5次时,消息将通过死信交换路由到这个新队列。
Amqp::consume(function ($message) { var_dump($message->body); Amqp::acknowledge($message); }, [ 'timeout' => 2, 'vhost' => 'vhost3', 'queue' => 'my-example-queue', 'persistent' => true // required if you want to listen forever 'queue_properties' => [ 'x-ha-policy' => ['S', 'all'], 'x-queue-type' => ['S', 'quorum'], 'x-dead-letter-exchange' => ['S', 'amq.topic-dlx'], // will auto-declare queue named my-example-queue-dlx 'x-delivery-limit' => ['I', 5], // after 5 deliveries the message is routed to my-example-queue-dlx ], ]);
鸣谢
许可
此软件包是开源软件,受MIT许可许可