loetfi / laravel-service-bus
laravel 的服务总线
v0.1.0
2024-05-24 07:58 UTC
Requires
- php: ^8.1
- ext-pcntl: *
- ext-sockets: *
- guzzlehttp/guzzle: ^7.0
- laravel/framework: ^8 || ^9 || ^10
- needle-project/laravel-rabbitmq: ^0.5.0
- smskin/laravel-support: ^2.0
README
这个库提供了多个服务之间的通信。支持同步和异步通信。
此库使用
- https://github.com/needle-project/laravel-rabbitmq - 支持AMPQ的库。一些类被重写以支持当前库的逻辑。
- https://github.com/laravel/horizon - 监督部分的代码库
基础逻辑
基础逻辑基于包结构。这种结构非常类似于 .Net MassTransit 包。包包含
- 包元数据
- 消息类型
- 消息
算法
- 发送者创建带有消息的包
- 发送者将包提交给消费者
- 消费者通过消息类型定义请求的包并反序列化包
- 消费者使用接收到的包执行逻辑
同步通信
- 发送者创建带有消息的包
- 发送者以同步方式将包提交给消费者
- 库将包序列化为 json
- 库向消费者主机提交 http 请求
- 消费者处理接收到的包
- 消费者返回带有响应的新包
- 发送者从消费者接收响应包
异步通信
异步通信由 RabbitMQ 提供
- 发送者创建带有消息的包
- 发送者以异步方式将包提交给消费者
- 库将包序列化为 json
- 库将包提交到 RabbitMQ 交换机
- RabbitMQ 将接收到的包提交到队列
- 消费者监听 RabbitMQ 队列
- 消费者从队列接收包
- 消费者处理接收到的包
安装
composer require smskin/laravel-service-bus
php artisan vendor:publish --provider="SMSkin\ServiceBus\Providers\ServiceProvider"
- 在
service-bus.php
配置文件中配置 RabbitMQ 连接 - 运行
php artisan service-bus:setup
以初始化 RabbitMQ 交换机和队列 - 将服务总线消费者命令添加到监督配置中
监督配置以提供服务总线消费者
[program:laravel-service-bus]
process_name=%(program_name)s
command=php /var/www/html/artisan service-bus:supervisor
autostart=true
autorestart=true
user=www-data
group=www-data
redirect_stderr=true
numprocs=1
stderr_logfile=/dev/stderr
stdout_logfile=/dev/stdout
Artisan 命令
service-bus:setup
- 初始化 RabbitMQ 交换机和队列的命令service-bus:supervisor
- 服务总线监督器。启动从 RabbitMQ 队列消费包的进程service-bus:delete-all
- 从 RabbitMQ 删除交换机和队列的命令service-bus:list
- 列出交换机和队列的命令service-bus:publish
- 将测试消息提交到 RabbitMQ 交换机的命令(用于测试)service-bus:consume
- 用于从 RabbitMQ 队列接收包的消费者命令(由监督命令使用)
服务总线监督器
监督命令为每个已注册的消费者运行消费者进程(配置文件 service-bus.php
,supervisor
部分)
覆盖默认枚举
您可以在 service-bus.php
配置文件中创建新枚举并覆盖调用它。这提供了扩展默认消费者和交换机的能力。
例如
- 创建交换枚举
- 从 SMSkin\ServiceBus\Enums\Exchanges 扩展它
- 更改
items
方法以提供新的交换机 - 在
service-bus.php
配置文件中替换默认枚举(第 14 行)
类
- 包 - 基础交换数据包
- 消息 - 包有效载荷(DTO)
- 处理器 - 在收到传入包时运行的类。在同步交换模式下可以返回新的包。
交换机
在交换枚举中注册交换机(SMSkin\ServiceBus\Enums\Exchanges - 可以覆盖)
- id - 内部 ID
- connection - 连接 ID(SMSkin\ServiceBus\Enums\Connections)
- rabbitMqName - RabbitMQ 交换机名称
- attributes - RabbitMQ 交换机属性
- exchangeType (exchange_type)
- 被动
- 持久
- autoDelete (auto_delete)
- 内部
- nowait
- throwExceptionOnRedeclare (throw_exception_on_redeclare)
- throwExceptionOnBindFail (throw_exception_on_bind_fail)
队列
在队列枚举中注册队列(SMSkin\ServiceBus\Enums\Queues - 可覆盖)
- id - 内部 ID
- connection - 连接 ID(SMSkin\ServiceBus\Enums\Connections)
- rabbitMqName - 队列的RabbitMQ名称
- 属性
- 被动
- 持久
- autoDelete (auto_delete)
- 内部
- nowait
- exclusive
- bind - 绑定数组
- exchange - 交换机的id(SMSkin\ServiceBus\Enums\Exchanges)
- routing_key
- arguments
- x-max-priority - 最大消息优先级
发布者
在发布者枚举中注册发布者(SMSkin\ServiceBus\Enums\Publishers - 可覆盖)
- id - 内部 ID
- exchange - 交换机的id(SMSkin\ServiceBus\Enums\Exchanges)
消费者
在消费者枚举中注册消费者(SMSkin\ServiceBus\Enums\Consumers - 可覆盖)
- id - 内部 ID
- queue - 队列的id(SMSkin\ServiceBus\Enums\Queues)
- prefetchCount
主机
在主机枚举中注册主机(SMSkin\ServiceBus\Enums\Hosts - 可覆盖)
- id - 内部 ID
- host - 消费者主机的url(用于提供同步服务总线)
交换机包
包必须提供处理器和消息类的链接。包在Packages枚举中注册( SMSkin\ServiceBus\Enums\Packages - 可覆盖)。交换机包具有非常简单的结构,支持在任何语言中重现此逻辑。
序列化包示例
{ "messageId": "e5e471ae-62e3-46c5-93fd-6dd589c32748", "correlationId": "2712f617-4d55-4851-ad2c-5c841224e251", "conversationId": null, "initiatorId": null, "sourceAddress": null, "destinationAddress": null, "messageType": [ "urn:message:TEST_ASYNC_LOCAL" ], "message": { "name": "Sergey" }, "sentTime": "2022-06-07T10:19:15.152620Z", "host": null }
包示例
<?php namespace App\Modules\ServiceBus\Packages; use App\Modules\ServiceBus\Enums\Packages; use App\Modules\ServiceBus\Packages\Messages\TestMessage; use App\Modules\ServiceBus\Packages\Processors\TestMessageProcessor; use SMSkin\ServiceBus\Packages\BasePackage; class TestMessagePackage extends BasePackage { public function package(): string { return Packages::TEST_SYNC; } protected function messageClass(): string { return TestMessage::class; } public function getProcessorClass(): string { return TestMessageProcessor::class; } }
处理器类示例
<?php namespace App\Modules\ServiceBus\Packages\Processors; use App\Modules\ServiceBus\Packages\TestMessagePackage; use Illuminate\Support\Facades\Log; use SMSkin\ServiceBus\Packages\BasePackage; use SMSkin\ServiceBus\Packages\Processors\BaseProcessor; class TestMessageProcessor extends BaseProcessor { public function __construct(protected TestMessagePackage|BasePackage $package) { parent::__construct($package); } public function execute(): ?BasePackage { Log::debug('Received package', $this->package->toArray()); return null; } }
消息类示例
<?php namespace App\Modules\ServiceBus\Packages\Messages; use SMSkin\ServiceBus\Packages\Messages\BaseMessage; class TestMessage extends BaseMessage { public string $name; public function fromArray(array $data): static { $this->name = $data['name']; return $this; } public function toArray(): array { return [ 'name' => $this->name ]; } /** * @param string $name * @return TestMessage */ public function setName(string $name): TestMessage { $this->name = $name; return $this; } }
在Packages枚举中注册注册包的示例
<?php namespace App\Modules\ServiceBus\Enums; use App\Modules\ServiceBus\Packages\TestMessagePackage; use Illuminate\Support\Collection; use SMSkin\ServiceBus\Enums\Models\PackageItem; class Packages extends \SMSkin\ServiceBus\Enums\Packages { public const TEST_ASYNC_LOCAL = 'TEST_ASYNC_LOCAL'; /** * @return Collection<PackageItem> */ protected static function getItems(): Collection { return parent::getItems()->merge([ (new PackageItem) ->setId(self::TEST_ASYNC_LOCAL) ->setClass(TestMessagePackage::class) ]); } }
提交
同步命令提交示例
$result = (new ServiceBus)->syncPublish( (new SyncPublishRequest) ->setHost(Hosts::LOCALHOST) ->setPackage((new TestSyncMessagePackage) ->setMessageType(Packages::TEST_ASYNC_LOCAL) ->setMessageId(Str::uuid()->toString()) ->setCorrelationId(Str::uuid()->toString()) ->setSentTime(now()) ->setMessage( (new TestMessage) ->setString1('a1') ->setString2('b2') )) ); dd($result);
异步命令提交示例
(new ServiceBus)->asyncPublish( (new AsyncPublishRequest) ->setPublisher(\App\Modules\ServiceBus\Enums\Publishers::TEST_LOCAL) ->setRoutingKey('*') ->setPackage( (new TestMessagePackage()) ->setPackage(\App\Modules\ServiceBus\Enums\Packages::TEST_ASYNC_LOCAL) ->setMessageId(Str::uuid()->toString()) ->setCorrelationId(Str::uuid()->toString()) ->setSentTime(now()) ->setMessage( (new \App\Modules\ServiceBus\Packages\Messages\TestMessage()) ->setName('Sergey') ) ) ->setProperties([ 'priority' => 0 ]) );
- priority - 消息的优先级