chocofamilyme/laravel-pubsub

为Laravel提供发布和消费消息的AMQP包装器


README

Laravel pub/sub库允许您发布、消费和处理RabbitMQ事件。

安装

composer require chocofamilyme/laravel-pubsub

从v6升级到v7

阅读升级指南这里

从v5升级到v6

阅读升级指南这里

从v3升级到v4

阅读升级指南这里

发布配置和迁移

php artisan vendor:publish --provider="Chocofamilyme\LaravelPubSub\Providers\PubSubServiceProvider"

配置

AMQP (RabbitMQ) 配置

  • 设置环境变量 BROADCAST_DRIVER = rabbitmq
  • 将rabbitmq驱动添加到config/broadcasting.php
...
'connections' => [
        'rabbitmq' => [
            'driver' => 'rabbitmq',
        ],
...
]
  • AMQP配置应插入到config/queue.php
'sync' => [  
...
],  
  
'database' => [  
...
],  
  
'beanstalkd' => [  
...
],

// Insert into your config/queue.php
'rabbitmq' => [  
    'driver' => 'rabbitmq',  
    'queue' => env('RABBITMQ_QUEUE', 'default'),  
    'connection' => PhpAmqpLib\Connection\AMQPSocketConnection::class,  
    'worker' => env('RABBITMQ_WORKER', Chocofamilyme\LaravelPubSub\Queue\RabbitMQQueue::class),  
  
    'hosts' => [  
        [
        'host' => env('SERVICE_RABBITMQ_HOST', '127.0.0.1'),  
        'port' => env('SERVICE_RABBITMQ_PORT', 5672),  
        'user' => env('SERVICE_RABBITMQ_USER', 'guest'),  
        'password' => env('SERVICE_RABBITMQ_PASSWORD', 'guest'),  
        'vhost' => env('SERVICE_RABBITMQ_VHOST', '/'),  
        ],
     ],  
  
    'options' => [  
        'ssl_options' => [  
            'cafile' => env('RABBITMQ_SSL_CAFILE', null),  
            'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),  
            'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),  
            'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),  
            'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),  
        ],
    
        'heartbeat' => 60,  
        'message-ttl' => 60000000,  
  
        'publisher' => [  
            'queue' => [  
                'declare' => false,  
                'bind' => false,
            ],  
            'exchange' => [  
                'declare' => false,
                'name' => 'exchange-name',  
            ],  
        ],
    ],
]

参数

事件路由配置

事件路由配置文件位于config/pubsub.php中,包含EventRouting和将事件存储在数据库中的配置。

<?php

return [
    /*
    |--------------------------------------------------------------------------
    | Listen for events
    |--------------------------------------------------------------------------
    |
    | Define event name and it's listeners. Please notice that one event name may have multiple listeners
    |
    | Example:
    |
    | listen => [
    |     'UserNotified' => [
    |         'durable' => true,
    |         'listeners' => [
    |             NotifyAboutDeviceChangeListener::class,
    |         ],
    |     ]
    | ],
    |
    */
    'listen' => [

    ],
    
    /**
     * Define database tables for storing data (publishing events, incoming events, etc.)
     */
    'tables' => [
        'events' => 'pubsub_events'
    ]
];  

用法

您可以监听RabbitMQ事件并发布它们。

如果您想监听来自rabbitmq队列的跨项目事件,请使用--job=laravel标志。

php artisan event:listen --job=laravel

对于跨微服务通信,请使用--job=laravel标志。

php artisan event:listen --job=external

示例

单个事件

php artisan event:listen gateway.user.authenticated --job=external

将在默认交换和队列名称中监听单个事件"gateway.user.authenticated"。在config/pubsub.php中配置内部事件路由。事件从负载中提取,当您发布事件时,它会自动附加事件名称。

通配符事件

php artisan event:listen gateway.user.# --exchange=gateway --queue=guardqueue --job=external

将监听交换机gateway和队列名称"guardqueue"中的所有"gateway.user.*"事件。

跨项目通信

php artisan event:listen

将监听默认Laravel事件,默认情况下已设置--job=laravel

php artisan event:listen 标志和参数

connection=rabbitmq                        : The name of the queue connection to work
--queue=                                   : The names of the queues to work
--exchange=                                : Optional, specifies exchange which should be listened [for default value see app/config/queue.php]
--exchange_type=topic                      : Optional, specifies exchange which should be listened [for default value see app/config/queue.php] [RabbitMQ Doc](https://rabbitmq.cn/tutorials/amqp-concepts.html)
--once                                     : Only process the next job on the queue
--job=laravel                              : Handler for internal or external message
--stop-when-empty                          : Stop when the queue is empty
--delay=0                                  : The number of seconds to delay failed jobs
--force                                    : Force the worker to run even in maintenance mode
--memory=128                               : The memory limit in megabytes
--sleep=3                                  : Number of seconds to sleep when no job is available
--timeout=0                                : The number of seconds a child process can run
--tries=1                                  : Number of times to attempt a job before logging it failed
--exclusive=0                              : used by only one connection and the queue will be deleted when that connection close
--consumer_exclusive=0                     : request exclusive consumer access, meaning only this consumer can access the queue
--wait_non_blocking=0                      : non-blocking actions
--exchange_passive=0                       : If set, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not [RabbitMQ Doc](https://rabbitmq.cn/amqp-0-9-1-reference.html#exchange.declare.passive)
--exchange_durable=1                       : If set when creating a new exchange, the exchange will be marked as durable [RabbitMQ Doc](https://rabbitmq.cn/amqp-0-9-1-reference.html#exchange.declare.durable)
--exchange_auto_delete=0                   : If set, the exchange is deleted when all queues have finished using it [RabbitMQ Doc](https://rabbitmq.cn/amqp-0-9-1-reference.html#exchange.declare.auto-delete)
--consumer-tag                             :
--prefetch-size=0                          :
--prefetch-count=1                         : [RabbitMQ Doc](https://rabbitmq.cn/consumer-prefetch.html)  |

如何发布事件

有关Laravel默认方式,请参阅Laravel文档

https://laravel.net.cn/

如果您想将事件发布到RabbitMQ

我们已尽量使其尽可能简单,请看它是如何工作的

  1. 使用php artisan make:event创建事件,请注意,您的事件类名称将是消息负载中的事件名称。它随后用于内部路由器config/pubsub.php
  2. 打开新创建的事件,并从Chocofamilyme\LaravelPubSub\Events\PublishEvent扩展
  3. 您需要覆盖一些方法的常量值,如EXCHANGE_NAMEROUTING_KEY。这些常量告诉分发器应使用哪个交换机以及哪个路由键。看?它非常直观。
  4. 由于您扩展了PublishEvent类,因此您可以覆盖更多方法,使事件更加精确,请参阅该类内部。
  5. 事件准备就绪后,我们现在可以以Laravel的方式发布它
event(new UserUpdatedEvent(1, 'Josh'));

由于此事件扩展了PublishEvent类并实现了SendToRabbitMQInterface,因此它将自动发送到rabbitmq。

PS:请注意,您需要重写toPayload()方法,返回用作消息负载的数组。

示例事件类

<?php

namespace App\Events;

use Chocofamilyme\LaravelPubSub\Events\PublishEvent;

class UserUpdatedEvent extends PublishEvent
{
    public int $id;
    public string $name;

    public const EXCHANGE_NAME = 'exchangeName';
    public const ROUTING_KEY   = 'user.updated';

    /**
     * Create a new event instance.
     *
     * @param int $id
     * @param string $name
     */
    public function __construct(int $id, string $name)
    {
        $this->id = $id;
        $this->name = $name;
    }
}

在config/pubsub.php中设置示例listen

...
  'listen' => [
      'App\Events\UserUpdatedEvent' => [
          'durable' => true,
          'listeners' => [
               UserChangeListener::class,
          ],
      ],
  ],
...

要保存失败的任务,您需要在queue.php中进行以下更改,并在failed_jobs表中设置uuid列可为空

...
      'failed' => [
        'driver' => env('QUEUE_FAILED_DRIVER', 'database-uuids'),
        'database' => env('DB_CONNECTION', 'mysql'),
        'table' => 'failed_jobs',
    ],
...
...
      'failed' => [
//        'driver' => env('QUEUE_FAILED_DRIVER', 'database-uuids'),
        'database' => env('DB_CONNECTION', 'mysql'),
        'table' => 'failed_jobs',
    ],
...