comlaude/laravel-amqp

Laravel和Lumen的AMQP包装器,用于发布和消费消息

2.0.5 2024-09-23 09:34 UTC

README

用于与RabbitMQ交互的简单PhpAmqpLib包装器

Build Status Latest Stable Version License

安装

Composer

将以下内容添加到composer.json文件中的require部分

"comlaude/laravel-amqp": "^1.0.0"
$ php composer update

$ php composer require comlaude/laravel-amqp

集成

Lumen

在Lumen应用的根目录下创建一个config文件夹,并将vendor/comlaude/laravel-amqp/config/amqp.php的内容复制到config/amqp.php。

根据需要调整属性。

return [

    'use' => 'production',

    'properties' => [

        'production' => [
            'host'                  => 'localhost',
            'port'                  =>  5672,
            'username'              => '',
            'password'              => '',
            'vhost'                 => '/',
            'connect_options'       => [],
            'ssl_options'           => [],

            'exchange'              => 'amq.topic',
            'exchange_type'         => 'topic',
            'exchange_passive'      => false,
            'exchange_durable'      => true,
            'exchange_auto_delete'  => false,
            'exchange_internal'     => false,
            'exchange_nowait'       => false,
            'exchange_properties'   => [],

            'queue_force_declare'   => false,
            'queue_passive'         => false,
            'queue_durable'         => true,          // only change when not using quorum queues
            'queue_exclusive'       => false,
            'queue_auto_delete'     => false,         // only change when not using quorum queues
            'queue_nowait'          => false,
            'queue_properties'      => [
                'x-ha-policy' => ['S', 'all'],
                'x-queue-type' => ['S', 'quorum'],
                // 'x-dead-letter-exchange' => ['S', 'amq.topic-dlx'], // if provided an exchange and queue (queue_name-dlx) will be automatically declared
                // 'x-delivery-limit' => ['I', 5],                     // the delivery limit will be set on the relevant queue but not the DLX queue itself
            ],
            'queue_acknowledge_is_final' => true,     // if important work is done inside a consumer after the acknowledge call, this should be false
            'queue_reject_is_final'      => true,     // if important work is done inside a consumer after the reject call, this should be false

            'consumer_tag'              => '',
            'consumer_no_local'         => false,
            'consumer_no_ack'           => false,
            'consumer_exclusive'        => false,
            'consumer_nowait'           => false,
            'timeout'                   => 0,        // seconds
            'persistent'                => false,
            'persistent_restart_period' => 0,        // seconds
            'request_accepted_timeout'  => 0.5,      // seconds in decimal accepted
            'request_handled_timeout'   => 5,        // seconds in decimal accepted

            'qos'                   => true,
            'qos_prefetch_size'     => 0,
            'qos_prefetch_count'    => 1,
            'qos_a_global'          => false,

            /*
            |--------------------------------------------------------------------------
            | An example binding set up when declaring exchange and queues
            |--------------------------------------------------------------------------
            |'bindings' => [
            |    [
            |        'queue'    => 'example',
            |        'routing'  => 'example.route.key',
            |    ],
            |],
            */
        ],

    ],

];

在bootstrap/app.php中注册Lumen服务提供者

/*
|--------------------------------------------------------------------------
| Register Service Providers
|--------------------------------------------------------------------------
*/

//...

$app->configure('amqp');
$app->register(ComLaude\Amqp\LumenServiceProvider::class);

//...

为Lumen 5.2+添加Facade支持

//...
$app->withFacades(true, [
    'ComLaude\Amqp\Facades\Amqp' => 'Amqp',
]);
//...

Laravel

打开config/app.php并添加服务提供者和别名

'ComLaude\Amqp\AmqpServiceProvider',
'Amqp' => 'ComLaude\Amqp\Facades\Amqp',

发布消息

使用路由键推送消息

Amqp::publish('routing-key', 'message');

使用路由键和覆盖属性推送消息

Amqp::publish('routing-key', 'message' , ['exchange' => 'amq.direct']);

消费消息

永久消费消息

Amqp::consume(function ($message) {

    var_dump($message->body);

    Amqp::acknowledge($message);

});

使用自定义设置消费消息

Amqp::consume(function ($message) {

   var_dump($message->body);

   Amqp::acknowledge($message);

}, [
    'timeout' => 2,
    'vhost'   => 'vhost3',
    'queue'   => 'queue-name',
    'persistent' => true // required if you want to listen forever
]);

扇出示例

发布消息

Amqp::publish('', 'message' , [
    'exchange_type' => 'fanout',
    'exchange' => 'amq.fanout',
]);

禁用发布

这在开发和同步需求中很有用,如果您使用观察者或事件触发AMQP消息,您可能希望暂时禁用消息的发布。当关闭发布时,publish方法将静默丢弃消息并返回。

检查状态

if(Amqp::isEnabled()) {
    // It is going to publish
}

禁用

Amqp::disable();

启用

Amqp::enable();

远程过程调用服务器和客户端

RPC在微服务世界中可能是一个反模式,因此不要随意使用它,尽管有时您确实需要那种请求-响应行为,并且愿意接受其局限性。只需在消费者处理程序中返回响应,如果消息是来自客户端的请求,则响应将自动路由到正确的请求者。有两个可配置的超时值,以防止无限阻塞等待。

request_accepted_timeout - 等待服务器确认工作正在进行的超时时间,这是一个检查是否有人正在监听的所有检查之一,应该相当小

request_handled_timeout - 等待完整请求完成(所有消息)的超时时间,请务必确保这足够大,如果您的作业持续时间较长或要处理的邮件数量较大

服务器

Amqp::consume(function ($message) {

   Amqp::acknowledge($message);

   return "I handled this message " . $message->getBody();

});

客户端

Amqp::request('example.routing.key', [

    'message1',
    'message2',

], function ($message) {

   echo("The remote server said " . $message->getBody());

});

// Or for single message requests you can just do
$response = Amqp::requestWithResponse('example.routing.key', 'quickly');
// $response is already the message content as a string "I handled this message quickly"

使用配置了死信交换的消息进行消费

当在队列属性中使用x-dead-letter-exchange参数时,此包将声明额外的

  • 声明-dlx队列
  • 声明交换本身

当消费者失败或重新排队消息5次时,消息将通过死信交换路由到这个新队列。

Amqp::consume(function ($message) {

   var_dump($message->body);

   Amqp::acknowledge($message);

}, [
    'timeout' => 2,
    'vhost'   => 'vhost3',
    'queue'   => 'my-example-queue',
    'persistent' => true // required if you want to listen forever
    'queue_properties'      => [
        'x-ha-policy' => ['S', 'all'],
        'x-queue-type' => ['S', 'quorum'],
        'x-dead-letter-exchange' => ['S', 'amq.topic-dlx'], // will auto-declare queue named my-example-queue-dlx
        'x-delivery-limit' => ['I', 5], // after 5 deliveries the message is routed to my-example-queue-dlx
    ],
]);

鸣谢

许可

此软件包是开源软件,受MIT许可许可