loetfi/laravel-service-bus

laravel 的服务总线

v0.1.0 2024-05-24 07:58 UTC

This package is auto-updated.

Last update: 2024-09-24 08:38:23 UTC


README

这个库提供了多个服务之间的通信。支持同步和异步通信。

此库使用

基础逻辑

基础逻辑基于包结构。这种结构非常类似于 .Net MassTransit 包。包包含

  • 包元数据
  • 消息类型
  • 消息

算法

  • 发送者创建带有消息的包
  • 发送者将包提交给消费者
  • 消费者通过消息类型定义请求的包并反序列化包
  • 消费者使用接收到的包执行逻辑

同步通信

  • 发送者创建带有消息的包
  • 发送者以同步方式将包提交给消费者
    • 库将包序列化为 json
    • 库向消费者主机提交 http 请求
    • 消费者处理接收到的包
    • 消费者返回带有响应的新包
  • 发送者从消费者接收响应包

异步通信

异步通信由 RabbitMQ 提供

  • 发送者创建带有消息的包
  • 发送者以异步方式将包提交给消费者
    • 库将包序列化为 json
    • 库将包提交到 RabbitMQ 交换机
    • RabbitMQ 将接收到的包提交到队列
  • 消费者监听 RabbitMQ 队列
  • 消费者从队列接收包
  • 消费者处理接收到的包

安装

  • composer require smskin/laravel-service-bus
  • php artisan vendor:publish --provider="SMSkin\ServiceBus\Providers\ServiceProvider"
  • service-bus.php 配置文件中配置 RabbitMQ 连接
  • 运行 php artisan service-bus:setup 以初始化 RabbitMQ 交换机和队列
  • 将服务总线消费者命令添加到监督配置中

监督配置以提供服务总线消费者

[program:laravel-service-bus]
process_name=%(program_name)s
command=php /var/www/html/artisan service-bus:supervisor
autostart=true
autorestart=true
user=www-data
group=www-data
redirect_stderr=true
numprocs=1
stderr_logfile=/dev/stderr
stdout_logfile=/dev/stdout

Artisan 命令

  • service-bus:setup - 初始化 RabbitMQ 交换机和队列的命令
  • service-bus:supervisor - 服务总线监督器。启动从 RabbitMQ 队列消费包的进程
  • service-bus:delete-all - 从 RabbitMQ 删除交换机和队列的命令
  • service-bus:list - 列出交换机和队列的命令
  • service-bus:publish - 将测试消息提交到 RabbitMQ 交换机的命令(用于测试)
  • service-bus:consume - 用于从 RabbitMQ 队列接收包的消费者命令(由监督命令使用)

服务总线监督器

监督命令为每个已注册的消费者运行消费者进程(配置文件 service-bus.phpsupervisor 部分)

覆盖默认枚举

您可以在 service-bus.php 配置文件中创建新枚举并覆盖调用它。这提供了扩展默认消费者和交换机的能力。

例如

  • 创建交换枚举
  • 从 SMSkin\ServiceBus\Enums\Exchanges 扩展它
  • 更改 items 方法以提供新的交换机
  • service-bus.php 配置文件中替换默认枚举(第 14 行)

  • 包 - 基础交换数据包
  • 消息 - 包有效载荷(DTO)
  • 处理器 - 在收到传入包时运行的类。在同步交换模式下可以返回新的包。

交换机

在交换枚举中注册交换机(SMSkin\ServiceBus\Enums\Exchanges - 可以覆盖)

  • id - 内部 ID
  • connection - 连接 ID(SMSkin\ServiceBus\Enums\Connections)
  • rabbitMqName - RabbitMQ 交换机名称
  • attributes - RabbitMQ 交换机属性
    • exchangeType (exchange_type)
    • 被动
    • 持久
    • autoDelete (auto_delete)
    • 内部
    • nowait
    • throwExceptionOnRedeclare (throw_exception_on_redeclare)
    • throwExceptionOnBindFail (throw_exception_on_bind_fail)

队列

在队列枚举中注册队列(SMSkin\ServiceBus\Enums\Queues - 可覆盖)

  • id - 内部 ID
  • connection - 连接 ID(SMSkin\ServiceBus\Enums\Connections)
  • rabbitMqName - 队列的RabbitMQ名称
  • 属性
    • 被动
    • 持久
    • autoDelete (auto_delete)
    • 内部
    • nowait
    • exclusive
    • bind - 绑定数组
      • exchange - 交换机的id(SMSkin\ServiceBus\Enums\Exchanges)
      • routing_key
    • arguments
      • x-max-priority - 最大消息优先级

发布者

在发布者枚举中注册发布者(SMSkin\ServiceBus\Enums\Publishers - 可覆盖)

  • id - 内部 ID
  • exchange - 交换机的id(SMSkin\ServiceBus\Enums\Exchanges)

消费者

在消费者枚举中注册消费者(SMSkin\ServiceBus\Enums\Consumers - 可覆盖)

  • id - 内部 ID
  • queue - 队列的id(SMSkin\ServiceBus\Enums\Queues)
  • prefetchCount

主机

在主机枚举中注册主机(SMSkin\ServiceBus\Enums\Hosts - 可覆盖)

  • id - 内部 ID
  • host - 消费者主机的url(用于提供同步服务总线)

交换机包

包必须提供处理器和消息类的链接。包在Packages枚举中注册( SMSkin\ServiceBus\Enums\Packages - 可覆盖)。交换机包具有非常简单的结构,支持在任何语言中重现此逻辑。

序列化包示例

{
    "messageId": "e5e471ae-62e3-46c5-93fd-6dd589c32748",
    "correlationId": "2712f617-4d55-4851-ad2c-5c841224e251",
    "conversationId": null,
    "initiatorId": null,
    "sourceAddress": null,
    "destinationAddress": null,
    "messageType": [
        "urn:message:TEST_ASYNC_LOCAL"
    ],
    "message": {
        "name": "Sergey"
    },
    "sentTime": "2022-06-07T10:19:15.152620Z",
    "host": null
}

包示例

<?php

namespace App\Modules\ServiceBus\Packages;

use App\Modules\ServiceBus\Enums\Packages;
use App\Modules\ServiceBus\Packages\Messages\TestMessage;
use App\Modules\ServiceBus\Packages\Processors\TestMessageProcessor;
use SMSkin\ServiceBus\Packages\BasePackage;

class TestMessagePackage extends BasePackage
{
    public function package(): string
    {
        return Packages::TEST_SYNC;
    }

    protected function messageClass(): string
    {
        return TestMessage::class;
    }

    public function getProcessorClass(): string
    {
        return TestMessageProcessor::class;
    }
}

处理器类示例

<?php

namespace App\Modules\ServiceBus\Packages\Processors;

use App\Modules\ServiceBus\Packages\TestMessagePackage;
use Illuminate\Support\Facades\Log;
use SMSkin\ServiceBus\Packages\BasePackage;
use SMSkin\ServiceBus\Packages\Processors\BaseProcessor;

class TestMessageProcessor extends BaseProcessor
{
    public function __construct(protected TestMessagePackage|BasePackage $package)
    {
        parent::__construct($package);
    }

    public function execute(): ?BasePackage
    {
        Log::debug('Received package', $this->package->toArray());
        return null;
    }
}

消息类示例

<?php

namespace App\Modules\ServiceBus\Packages\Messages;

use SMSkin\ServiceBus\Packages\Messages\BaseMessage;

class TestMessage extends BaseMessage
{
    public string $name;

    public function fromArray(array $data): static
    {
        $this->name = $data['name'];
        return $this;
    }

    public function toArray(): array
    {
        return [
            'name' => $this->name
        ];
    }

    /**
     * @param string $name
     * @return TestMessage
     */
    public function setName(string $name): TestMessage
    {
        $this->name = $name;
        return $this;
    }
}

在Packages枚举中注册注册包的示例

<?php

namespace App\Modules\ServiceBus\Enums;

use App\Modules\ServiceBus\Packages\TestMessagePackage;
use Illuminate\Support\Collection;
use SMSkin\ServiceBus\Enums\Models\PackageItem;

class Packages extends \SMSkin\ServiceBus\Enums\Packages
{
    public const TEST_ASYNC_LOCAL = 'TEST_ASYNC_LOCAL';

    /**
     * @return Collection<PackageItem>
     */
    protected static function getItems(): Collection
    {
        return parent::getItems()->merge([
            (new PackageItem)
                ->setId(self::TEST_ASYNC_LOCAL)
                ->setClass(TestMessagePackage::class)
        ]);
    }
}

提交

同步命令提交示例

$result = (new ServiceBus)->syncPublish(
            (new SyncPublishRequest)
                ->setHost(Hosts::LOCALHOST)
                ->setPackage((new TestSyncMessagePackage)
                    ->setMessageType(Packages::TEST_ASYNC_LOCAL)
                    ->setMessageId(Str::uuid()->toString())
                    ->setCorrelationId(Str::uuid()->toString())
                    ->setSentTime(now())
                    ->setMessage(
                        (new TestMessage)
                            ->setString1('a1')
                            ->setString2('b2')
                    ))
        );
dd($result);

异步命令提交示例

(new ServiceBus)->asyncPublish(
            (new AsyncPublishRequest)
                ->setPublisher(\App\Modules\ServiceBus\Enums\Publishers::TEST_LOCAL)
                ->setRoutingKey('*')
                ->setPackage(
                    (new TestMessagePackage())
                        ->setPackage(\App\Modules\ServiceBus\Enums\Packages::TEST_ASYNC_LOCAL)
                        ->setMessageId(Str::uuid()->toString())
                        ->setCorrelationId(Str::uuid()->toString())
                        ->setSentTime(now())
                        ->setMessage(
                            (new \App\Modules\ServiceBus\Packages\Messages\TestMessage())
                                ->setName('Sergey')
                        )
                )
                ->setProperties([
                    'priority' => 0
                ])
        );
  • priority - 消息的优先级