chocofamilyme / laravel-pubsub
为Laravel提供发布和消费消息的AMQP包装器
10.1.3
2024-09-06 08:37 UTC
Requires
- php: ^8.0
- ext-json: *
- vladimir-yuldashev/laravel-queue-rabbitmq: ^13.0
Requires (Dev)
- orchestra/testbench: ^7.3.0
- phpunit/phpunit: ^9.3
- squizlabs/php_codesniffer: ^3.5
- vimeo/psalm: ^4.2
- dev-master
- 10.1.3
- 10.1.2
- 10.1.1
- 10.0.2
- 10.0.1
- 10.0.0
- 9.0.1
- 9.0.0
- 8.0.1
- 8.0.0
- 7.1.0
- 7.0.2
- 7.0.1
- 7.0.0
- 6.0.2
- 6.0.1
- 6.0.0
- 5.1.0
- 5.0.2
- 5.0.1
- 5.0.0
- 4.1.0
- 4.0.1
- 3.1.0
- 3.0.3
- 3.0.2
- 3.0.1
- 3.0.0
- 2.0.3
- 2.0.2
- 2.0.1
- 2.0.0
- 1.0.4
- 1.0.3
- 1.0.2
- 1.0.1
- 1.0
- dev-restart-worker
- dev-report
- dev-feat.headers_in_payload
- dev-SP-472
- dev-aidyn.php8_support
- dev-restflag
- dev-CT-2883
- dev-aidyn.check_event_for_exist
- dev-aidyn.refactor_durable_flag
- dev-v6-alfa
- dev-CT-2598
- dev-aidyn.changelog
- dev-aidyn.add_sub_logging
- dev-CT-2598-0
- dev-v4.changelog
- dev-CT-2038
- dev-aidyn.set_listener_job
- dev-laravel7
- dev-CT-1739
- dev-listener.ack
- dev-CT-1802
- dev-CT-1802-test
- dev-CT-1619
- dev-CT-1740
- dev-amqpextendet
This package is auto-updated.
Last update: 2024-09-06 08:40:00 UTC
README
Laravel pub/sub库允许您发布、消费和处理RabbitMQ事件。
安装
composer require chocofamilyme/laravel-pubsub
从v6升级到v7
阅读升级指南这里
从v5升级到v6
阅读升级指南这里
从v3升级到v4
阅读升级指南这里
发布配置和迁移
php artisan vendor:publish --provider="Chocofamilyme\LaravelPubSub\Providers\PubSubServiceProvider"
配置
AMQP (RabbitMQ) 配置
- 设置环境变量 BROADCAST_DRIVER = rabbitmq
- 将rabbitmq驱动添加到config/broadcasting.php
... 'connections' => [ 'rabbitmq' => [ 'driver' => 'rabbitmq', ], ... ]
- AMQP配置应插入到config/queue.php
'sync' => [ ... ], 'database' => [ ... ], 'beanstalkd' => [ ... ], // Insert into your config/queue.php 'rabbitmq' => [ 'driver' => 'rabbitmq', 'queue' => env('RABBITMQ_QUEUE', 'default'), 'connection' => PhpAmqpLib\Connection\AMQPSocketConnection::class, 'worker' => env('RABBITMQ_WORKER', Chocofamilyme\LaravelPubSub\Queue\RabbitMQQueue::class), 'hosts' => [ [ 'host' => env('SERVICE_RABBITMQ_HOST', '127.0.0.1'), 'port' => env('SERVICE_RABBITMQ_PORT', 5672), 'user' => env('SERVICE_RABBITMQ_USER', 'guest'), 'password' => env('SERVICE_RABBITMQ_PASSWORD', 'guest'), 'vhost' => env('SERVICE_RABBITMQ_VHOST', '/'), ], ], 'options' => [ 'ssl_options' => [ 'cafile' => env('RABBITMQ_SSL_CAFILE', null), 'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null), 'local_key' => env('RABBITMQ_SSL_LOCALKEY', null), 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), 'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null), ], 'heartbeat' => 60, 'message-ttl' => 60000000, 'publisher' => [ 'queue' => [ 'declare' => false, 'bind' => false, ], 'exchange' => [ 'declare' => false, 'name' => 'exchange-name', ], ], ], ]
参数
事件路由配置
事件路由配置文件位于config/pubsub.php中,包含EventRouting和将事件存储在数据库中的配置。
<?php return [ /* |-------------------------------------------------------------------------- | Listen for events |-------------------------------------------------------------------------- | | Define event name and it's listeners. Please notice that one event name may have multiple listeners | | Example: | | listen => [ | 'UserNotified' => [ | 'durable' => true, | 'listeners' => [ | NotifyAboutDeviceChangeListener::class, | ], | ] | ], | */ 'listen' => [ ], /** * Define database tables for storing data (publishing events, incoming events, etc.) */ 'tables' => [ 'events' => 'pubsub_events' ] ];
用法
您可以监听RabbitMQ事件并发布它们。
如果您想监听来自rabbitmq队列的跨项目事件,请使用--job=laravel标志。
php artisan event:listen --job=laravel
对于跨微服务通信,请使用--job=laravel标志。
php artisan event:listen --job=external
示例
单个事件
php artisan event:listen gateway.user.authenticated --job=external
将在默认交换和队列名称中监听单个事件"gateway.user.authenticated"。在config/pubsub.php
中配置内部事件路由。事件从负载中提取,当您发布事件时,它会自动附加事件名称。
通配符事件
php artisan event:listen gateway.user.# --exchange=gateway --queue=guardqueue --job=external
将监听交换机gateway和队列名称"guardqueue"中的所有"gateway.user.*"事件。
跨项目通信
php artisan event:listen
将监听默认Laravel事件,默认情况下已设置--job=laravel
php artisan event:listen 标志和参数
connection=rabbitmq : The name of the queue connection to work
--queue= : The names of the queues to work
--exchange= : Optional, specifies exchange which should be listened [for default value see app/config/queue.php]
--exchange_type=topic : Optional, specifies exchange which should be listened [for default value see app/config/queue.php] [RabbitMQ Doc](https://rabbitmq.cn/tutorials/amqp-concepts.html)
--once : Only process the next job on the queue
--job=laravel : Handler for internal or external message
--stop-when-empty : Stop when the queue is empty
--delay=0 : The number of seconds to delay failed jobs
--force : Force the worker to run even in maintenance mode
--memory=128 : The memory limit in megabytes
--sleep=3 : Number of seconds to sleep when no job is available
--timeout=0 : The number of seconds a child process can run
--tries=1 : Number of times to attempt a job before logging it failed
--exclusive=0 : used by only one connection and the queue will be deleted when that connection close
--consumer_exclusive=0 : request exclusive consumer access, meaning only this consumer can access the queue
--wait_non_blocking=0 : non-blocking actions
--exchange_passive=0 : If set, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not [RabbitMQ Doc](https://rabbitmq.cn/amqp-0-9-1-reference.html#exchange.declare.passive)
--exchange_durable=1 : If set when creating a new exchange, the exchange will be marked as durable [RabbitMQ Doc](https://rabbitmq.cn/amqp-0-9-1-reference.html#exchange.declare.durable)
--exchange_auto_delete=0 : If set, the exchange is deleted when all queues have finished using it [RabbitMQ Doc](https://rabbitmq.cn/amqp-0-9-1-reference.html#exchange.declare.auto-delete)
--consumer-tag :
--prefetch-size=0 :
--prefetch-count=1 : [RabbitMQ Doc](https://rabbitmq.cn/consumer-prefetch.html) |
如何发布事件
有关Laravel默认方式,请参阅Laravel文档
如果您想将事件发布到RabbitMQ
我们已尽量使其尽可能简单,请看它是如何工作的
- 使用
php artisan make:event
创建事件,请注意,您的事件类名称将是消息负载中的事件名称。它随后用于内部路由器config/pubsub.php
- 打开新创建的事件,并从
Chocofamilyme\LaravelPubSub\Events\PublishEvent
扩展 - 您需要覆盖一些方法的常量值,如
EXCHANGE_NAME
和ROUTING_KEY
。这些常量告诉分发器应使用哪个交换机以及哪个路由键。看?它非常直观。 - 由于您扩展了
PublishEvent
类,因此您可以覆盖更多方法,使事件更加精确,请参阅该类内部。 - 事件准备就绪后,我们现在可以以Laravel的方式发布它
event(new UserUpdatedEvent(1, 'Josh'));
由于此事件扩展了PublishEvent类并实现了SendToRabbitMQInterface,因此它将自动发送到rabbitmq。
PS:请注意,您需要重写toPayload()方法,返回用作消息负载的数组。
示例事件类
<?php namespace App\Events; use Chocofamilyme\LaravelPubSub\Events\PublishEvent; class UserUpdatedEvent extends PublishEvent { public int $id; public string $name; public const EXCHANGE_NAME = 'exchangeName'; public const ROUTING_KEY = 'user.updated'; /** * Create a new event instance. * * @param int $id * @param string $name */ public function __construct(int $id, string $name) { $this->id = $id; $this->name = $name; } }
在config/pubsub.php中设置示例listen
... 'listen' => [ 'App\Events\UserUpdatedEvent' => [ 'durable' => true, 'listeners' => [ UserChangeListener::class, ], ], ], ...
要保存失败的任务,您需要在queue.php中进行以下更改,并在failed_jobs表中设置uuid列可为空
... 'failed' => [ 'driver' => env('QUEUE_FAILED_DRIVER', 'database-uuids'), 'database' => env('DB_CONNECTION', 'mysql'), 'table' => 'failed_jobs', ], ...
... 'failed' => [ // 'driver' => env('QUEUE_FAILED_DRIVER', 'database-uuids'), 'database' => env('DB_CONNECTION', 'mysql'), 'table' => 'failed_jobs', ], ...