hweihwang/laravel-rabbitmq-pubsub

为 Laravel 提供发布和消费消息的 AMQP 包装器


README

Laravel pub/sub 库允许您发布、消费和处理 rabbit 事件。

安装

composer require chocofamilyme/laravel-pubsub

从 v6 升级到 v7

阅读升级指南 这里

从 v5 升级到 v6

阅读升级指南 这里

从 v3 升级到 v4

阅读升级指南 这里

发布配置和迁移

php artisan vendor:publish --provider="Chocofamilyme\LaravelPubSub\Providers\PubSubServiceProvider"

配置

AMQP (RabbitMQ) 配置

  • 设置环境变量 BROADCAST_DRIVER = rabbitmq
  • 将 rabbitmq 驱动添加到 config/broadcasting.php
...
'connections' => [
        'rabbitmq' => [
            'driver' => 'rabbitmq',
        ],
...
]
  • AMQP 配置应插入到 config/queue.php
'sync' => [  
...
],  
  
'database' => [  
...
],  
  
'beanstalkd' => [  
...
],

// Insert into your config/queue.php
'rabbitmq' => [  
    'driver' => 'rabbitmq',  
    'queue' => env('RABBITMQ_QUEUE', 'default'),  
    'connection' => PhpAmqpLib\Connection\AMQPSocketConnection::class,  
    'worker' => env('RABBITMQ_WORKER', Chocofamilyme\LaravelPubSub\Queue\RabbitMQQueue::class),  
  
    'hosts' => [  
        [
        'host' => env('SERVICE_RABBITMQ_HOST', '127.0.0.1'),  
        'port' => env('SERVICE_RABBITMQ_PORT', 5672),  
        'user' => env('SERVICE_RABBITMQ_USER', 'guest'),  
        'password' => env('SERVICE_RABBITMQ_PASSWORD', 'guest'),  
        'vhost' => env('SERVICE_RABBITMQ_VHOST', '/'),  
        ],
     ],  
  
    'options' => [  
        'ssl_options' => [  
            'cafile' => env('RABBITMQ_SSL_CAFILE', null),  
            'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),  
            'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),  
            'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),  
            'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),  
        ],
    
        'heartbeat' => 60,  
        'message-ttl' => 60000000,  
  
        'publisher' => [  
            'queue' => [  
                'declare' => false,  
                'bind' => false,
            ],  
            'exchange' => [  
                'declare' => false,
                'name' => 'exchange-name',  
            ],  
        ],
    ],
]

参数

事件路由配置

事件路由配置文件位于 config/pubsub.php 中,包含 EventRouting 和将事件存储在数据库中的配置。

<?php

return [
    /*
    |--------------------------------------------------------------------------
    | Listen for events
    |--------------------------------------------------------------------------
    |
    | Define event name and it's listeners. Please notice that one event name may have multiple listeners
    |
    | Example:
    |
    | listen => [
    |     'UserNotified' => [
    |         'durable' => true,
    |         'listeners' => [
    |             NotifyAboutDeviceChangeListener::class,
    |         ],
    |     ]
    | ],
    |
    */
    'listen' => [

    ],
    
    /**
     * Define database tables for storing data (publishing events, incoming events, etc.)
     */
    'tables' => [
        'events' => 'pubsub_events'
    ]
];  

使用方法

您可以监听 RabbitMQ 事件并将它们发布。

如果您想监听来自 rabbitmq 队列的项目间事件,请使用 --job=laravel 标志。

php artisan event:listen --job=laravel

用于项目间通信时,请使用 --job=laravel 标志。

php artisan event:listen --job=external

示例

单个事件

php artisan event:listen gateway.user.authenticated --job=external

将在默认交换机和队列名称中监听单个事件 "gateway.user.authenticated"。在 config/pubsub.php 中配置内部事件路由。事件从有效负载中获取,当发布事件时,会自动将事件名称附加到那里。

通配符事件

php artisan event:listen gateway.user.# --exchange=gateway --queue=guardqueue --job=external

将在交换机 gateway 和队列名称 "guardqueue" 中监听所有 "gateway.user.*" 事件。

项目间通信

php artisan event:listen

将监听默认的 Laravel 事件,在默认情况下,--job=laravel 已默认设置。

php artisan event:listen 标志和参数

connection=rabbitmq                        : The name of the queue connection to work
--queue=                                   : The names of the queues to work
--exchange=                                : Optional, specifies exchange which should be listened [for default value see app/config/queue.php]
--exchange_type=topic                      : Optional, specifies exchange which should be listened [for default value see app/config/queue.php] [RabbitMQ Doc](https://rabbitmq.cn/tutorials/amqp-concepts.html)
--once                                     : Only process the next job on the queue
--job=laravel                              : Handler for internal or external message
--stop-when-empty                          : Stop when the queue is empty
--delay=0                                  : The number of seconds to delay failed jobs
--force                                    : Force the worker to run even in maintenance mode
--memory=128                               : The memory limit in megabytes
--sleep=3                                  : Number of seconds to sleep when no job is available
--timeout=0                                : The number of seconds a child process can run
--tries=1                                  : Number of times to attempt a job before logging it failed
--exclusive=0                              : used by only one connection and the queue will be deleted when that connection close
--consumer_exclusive=0                     : request exclusive consumer access, meaning only this consumer can access the queue
--wait_non_blocking=0                      : non-blocking actions
--exchange_passive=0                       : If set, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not [RabbitMQ Doc](https://rabbitmq.cn/amqp-0-9-1-reference.html#exchange.declare.passive)
--exchange_durable=1                       : If set when creating a new exchange, the exchange will be marked as durable [RabbitMQ Doc](https://rabbitmq.cn/amqp-0-9-1-reference.html#exchange.declare.durable)
--exchange_auto_delete=0                   : If set, the exchange is deleted when all queues have finished using it [RabbitMQ Doc](https://rabbitmq.cn/amqp-0-9-1-reference.html#exchange.declare.auto-delete)
--consumer-tag                             :
--prefetch-size=0                          :
--prefetch-count=1                         : [RabbitMQ Doc](https://rabbitmq.cn/consumer-prefetch.html)  |

如何发布事件

有关 Laravel 默认方式,请参阅 Laravel 文档

https://laravel.net.cn/

如果您想将事件发布到 RabbitMQ

我们已尽力使其尽可能简单,请看它是如何工作的

  1. 使用 php artisan make:event 创建事件,请注意,您的事件类名称将是消息有效负载中的事件名称。它随后用于内部路由 config/pubsub.php
  2. 打开新创建的事件,并从 Chocofamilyme\LaravelPubSub\Events\PublishEvent 继承
  3. 您必须覆盖一些方法的常量值,例如 EXCHANGE_NAMEROUTING_KEY。这些常量告诉分发器应使用哪个交换机以及哪个路由键。看到了吗?这相当直观。
  4. 由于您从 PublishEvent 类继承,因此可以覆盖更多方法,这可以使事件更精确,有关详细信息,请参阅该类内部。
  5. 我们的事件准备就绪后,现在我们可以以 Laravel 的方式发布它
event(new UserUpdatedEvent(1, 'Josh'));

由于此事件从 PublishEvent 类继承并实现了 SendToRabbitMQInterface,因此它将自动发送到 rabbitmq。

PS:请注意,您需要覆盖 toPayload() 方法,返回用作消息有效负载的数组。

示例事件类

<?php

namespace App\Events;

use Chocofamilyme\LaravelPubSub\Events\PublishEvent;

class UserUpdatedEvent extends PublishEvent
{
    public int $id;
    public string $name;

    public const EXCHANGE_NAME = 'exchangeName';
    public const ROUTING_KEY   = 'user.updated';

    /**
     * Create a new event instance.
     *
     * @param int $id
     * @param string $name
     */
    public function __construct(int $id, string $name)
    {
        $this->id = $id;
        $this->name = $name;
    }
}

在 config/pubsub.php 中设置示例 listen

...
  'listen' => [
      'App\Events\UserUpdatedEvent' => [
          'durable' => true,
          'listeners' => [
               UserChangeListener::class,
          ],
      ],
  ],
...

要保存失败的任务,您需要在 queue.php 中进行以下更改,并将 failed_jobs 表中的 uuid 列设置为可空。

...
      'failed' => [
        'driver' => env('QUEUE_FAILED_DRIVER', 'database-uuids'),
        'database' => env('DB_CONNECTION', 'mysql'),
        'table' => 'failed_jobs',
    ],
...
...
      'failed' => [
//        'driver' => env('QUEUE_FAILED_DRIVER', 'database-uuids'),
        'database' => env('DB_CONNECTION', 'mysql'),
        'table' => 'failed_jobs',
    ],
...