gdx/p-service-bus

PServiceBus

1.13.0 2024-09-10 16:58 UTC

README

PHP的Service Bus,灵感来源于 NServiceBus

您可以从他们的文档中了解使用原则和为什么需要它:)

文档很糟糕。如有需要,请在问题中寻求帮助。

Telegram群组: https://t.me/PServiceBus

Symfony: https://packagist.org.cn/packages/gdx/p-service-bus-symfony-bundle

Laravel: https://packagist.org.cn/packages/gdx/p-service-bus-laravel-package

简单概念

PServiceBus Simple

扩展概念

PServiceBus

安装

composer require gdx/p-service-bus

使用

到目前为止没有很好的例子

请查看 https://gitlab.com/GDXbsv/pservicebus/-/tree/master/TestApp 中的示例

或在 symfony 扩展 https://packagist.org.cn/packages/gdx/p-service-bus-symfony-bundle

如何在您的项目中开始使用它

特性

  • Saga/Aggregate 消费命令/事件,产生事件。
  • Bus 允许发送命令或发布事件。
  • CoroutineBus 允许发送多个命令或发布多个事件
  • Doctrine 集成(Saga 持久化,事务性消息(OutBox 模式),onlyOnce 控制)
  • 自动为您初始化所有资源
  • ServiceBus 作为主要入口点

初始化

p-service-bus:init

发送/发布命令/事件

\GDXbsv\PServiceBus\Bus\ServiceBus 实现了所有 Bus 接口。

如果您有一组消息,请使用 Bus

#command
$bus->send(new TestCommand());
#event
$bus->publish(new TestEvent());

如果您有许多消息,请使用 CoroutineBus

#command
$coroutine = $coroutineBus->sendCoroutine();
$coroutine->send(new Message(new TestCommand(1), CommandOptions::record()));
$coroutine->send(new Message(new TestCommand(2), CommandOptions::record()));
$coroutine->send(null);
#event
$coroutine = $coroutineBus->publishCoroutine();
$coroutine->send(new Message(new TestEvent(1), EventOptions::record()));
$coroutine->send(new Message(new TestEvent(2), EventOptions::record()));
$coroutine->send(null);

消费

要开始消费,请运行命令

p-service-bus:transport:consume  memory

其中

  • memory 是传输名称

处理器

您可以将任何方法作为处理器,只需使用 PHP 属性即可。您可以将

  • 重试次数 设置为消息进入死信队列前的重试次数(SQS 不支持,对于 sqs,请使用 DSN 字符串配置队列本身)
  • 超时秒数 消息第一次被处理前的初始延迟。(SQS 最大为 15 分钟)
  • 重试超时表达式 用于计算每次重试延迟的定制公式。我们传递 retries_count。语法请参阅这里 https://symfony.com.cn/doc/current/reference/formats/expression_language.html

您可以使用 MessageOption 设置所有这些选项。处理器将覆盖它们。不推荐这样做,因此没有记录。

<?php declare(strict_types=1);

use GDXbsv\PServiceBus\Bus\Handling\Handle;
use GDXbsv\PServiceBus\Bus\Handling\MessageHandleContext;

/**
 * @internal
 */
final class Handlers
{
    public string $result = '';

    #[Handle('memory')]
    public function handleCommand(TestMultiHandlersCommand $command, MessageHandleContext $context): void
    {
        $this->result .= '||' . $command->name;
    }

    /**
     * 5 tries = initial try + 4 retries
     *
     * Retry no  | Delay
     * --------- | -------------
     *      1    |  0 h  5 min
     *      2    |  0 h 25 min
     *      3    |  2 h  5 min
     *      4    | 10 h 25 min
     *
     * after all retries -> push to DLQ after 10s
     */
    #[Handle(transportName: 'memory', retries: 5, timeoutSec: 100, retriesTimeoutExpression: '(retries_count > 4) ? 10 : (60 * (5 ** retries_count))')]
    public function anyNameFunction(Test1Event $event, MessageHandleContext $context): void
    {
        $this->result .= '||' . $event->name;
    }
}

外部事件

发送到订阅的客户端外部(例如从 SNS)。或从外部接收我们已订阅的内容(例如到 SNS)。

<?php
declare(strict_types=1);

use GDXbsv\PServiceBus\Message\ExternalIn;
use GDXbsv\PServiceBus\Message\ExternalOut;

/**
 * @internal
 * @immutable
 * @psalm-immutable
 */
#[ExternalOut(transportName: 'memory-external', externalName: 'test.external_1_event')]
final class ExternalOutEvent
{
}

#[ExternalIn(transportName: 'memory-external', externalName: 'test.external_1_event')]
final class ExternalInEvent
{
}

重放

有时某些事情出了问题,您想重放某些事件。为此,请使用重放注释。

<?php
declare(strict_types=1);

use GDXbsv\PServiceBus\Message\EventOptions;
use GDXbsv\PServiceBus\Message\Message;
use GDXbsv\PServiceBus\Message\Replay\Replay;
use GDXbsv\PServiceBusTestApp\Handling\Test1Event;

/**
 * @internal
 * @immutable
 * @psalm-immutable
 *
 * @psalm-import-type ReplayOutput from \GDXbsv\PServiceBus\Message\Replay\Replaying
 */
final class ReplayForEvent
{
    /**
     * @return ReplayOutput
     */
    #[Replay(replayName: 'testReplay')]
    public function anyName(): \Traversable {
        for ($i=1; $i<=5; ++$i) {
            yield new Message(new Test1Event(), EventOptions::record());
        }
    }
}

然后使用命令开始重放

p-service-bus:message:replay testReplay "\GDXbsv\PServiceBusTestApp\Handling\Handlers::handle2Event1" memory

其中

  • testReplay 是属性中的重放名称
  • "\GDXbsv\PServiceBusTestApp\Handling\Handlers::handle2Event1" 是 className + :: + methodName
  • memory 是传输名称

Saga

灵感来源:https://docs.particular.net/nservicebus/sagas/

这是一个长期运行的过程,当你需要响应多个事件来做出某些决策时。或者当你的数据和消息需要通过outbox模式进行事务性绑定时。

使用doctrine的示例

<?php declare(strict_types=1);

use GDXbsv\PServiceBus\Bus\Handling\Handle;
use GDXbsv\PServiceBus\Id;
use GDXbsv\PServiceBus\Message\TimeSpan;
use GDXbsv\PServiceBus\Saga\MessageSagaContext;
use GDXbsv\PServiceBus\Saga\Saga;
use GDXbsv\PServiceBus\Saga\SagaContext;
use GDXbsv\PServiceBus\Saga\SagaPropertyMapper;
use Doctrine\ORM\Mapping as ORM;
use GDXbsv\PServiceBusTestApp\Saga\TestSagaCommand;
use GDXbsv\PServiceBusTestApp\Saga\TestSagaMapStringCommand;
use GDXbsv\PServiceBusTestApp\Saga\TestsSagaInEvent;
use GDXbsv\PServiceBusTestApp\Saga\TestsSagaOutputEvent;


/**
 * @final
 */
 #[ORM\Entity]
final class TestSaga extends Saga
{
    #[ORM\Column(type: 'id', nullable: false)]
    #[ORM\Id]
    private Id $id;
    #[ORM\Column(type: 'string', nullable: false)]
    private ?string $string;
    #[ORM\Column(type: 'string', nullable: true)]
    private ?string $value;

    /**
     * @param Id<static> $id
     */
    private function __construct(Id $id, string $string)
    {
        $this->id = $id;
        $this->string = $string;
    }
    
    public static function configureHowToCreateSaga(SagaCreateMapper $mapper): void
    {
        $mapper
            ->toMessage(
                // do not forget to create handling function in a case if saga exists and to let saga know that we wait this message
                function (TestSagaCreateCommand $command, MessageSagaContext $context) {
                    return new self(new Id($command->id), $command->string);
                }
            );
    }

    public static function configureHowToFindSaga(SagaPropertyMapper $mapper): void
    {
        $mapper
            // Find saga by id
            ->mapSaga(new \ReflectionProperty(TestSaga::class, 'id'))
            ->toMessage(
                function (TestSagaCommand $command, MessageSagaContext $context) {
                    return new Id($command->id);
                }
            )
            ->toMessage(
                function (TestsSagaInEvent $message, MessageSagaContext $context) {
                    return new Id($message->string);
                }
            );
        $mapper
            // Find saga by string propery
            ->mapSaga(new \ReflectionProperty(TestSaga::class, 'string'))
            ->toMessage(
                function (TestSagaMapStringCommand $command, MessageSagaContext $context) {
                    return $command->string;
                }
            );
    }
    
    /** We have to tell saga we wait this message, or saga could already exist */
    #[Handle('memory', 3)]
    public function testSagaCreateCommand(TestSagaCreateCommand $command, SagaContext $context)
    {
        $this->string = $command->string;
    }
    
    /** We can remove saga after all */
    #[Handle('memory', 3)]
    public function testRemove(TestSagaRemoveCommand $command, SagaContext $context)
    {
        $this->markAsComplete();
    }

    #[Handle('memory', 3)]
    public function testHandlerFunction(TestSagaCommand $command, SagaContext $context)
    {
        $this->string = $command->string;
        $context->timeout(new TestsSagaOutputEvent('testHandlerFunction'), TimeSpan::fromSeconds(0));
    }

    #[Handle('memory', 3)]
    public function testListeningFunction(TestsSagaInEvent $event, SagaContext $context)
    {
        $this->string = $event->string;
        $this->value = $event->value;
        $context->publish(new TestsSagaOutputEvent('testListeningFunction'));
    }

    #[Handle('memory', 3)]
    public function handleTestSagaMapStringCommand(
        TestSagaMapStringCommand $command,
        SagaContext $context
    ) {
        $context->publish(new TestsSagaOutputEvent($this->id->toString()));
    }
}

其中

  • configureHowToCreateSaga 描述了如何创建saga(警告:所有创建消息都应该有处理器)
  • configureHowToFindSaga 描述了如何查找saga
  • #[Handle('memory', 3)] 将方法设置为处理器

您可以通过使用 #[SagaFind] 属性创建自定义查找器,例如

<?php declare(strict_types=1);

use Doctrine\ORM\EntityManager;
use GDXbsv\PServiceBus\Message\MessageOptions;
use GDXbsv\PServiceBus\Saga\SagaFind;
use GDXbsv\PServiceBusTestApp\Saga\CustomDoctrineSearchEvent;
use GDXbsv\PServiceBusTestApp\Saga\TestSaga;

/**
 * @internal
 * @immutable
 * @psalm-immutable
 */
final class CustomDoctrineSagaFinder
{
    public function __construct(
        private EntityManager $em
    ) {
    }

    #[SagaFind]
    public function findByMultipleFields(
        CustomDoctrineSearchEvent $event,
        MessageOptions $messageOptions
    ): TestSaga {
        $qb = $this->em->createQueryBuilder();
        $qb
            ->from(TestSaga::class, 'saga')
            ->select('saga')
            ->where($qb->expr()->eq('saga.string', ':propertyValue'))
            ->setParameter(':propertyValue', $event->string);
        $saga = $qb->getQuery()->getSingleResult();

        return $saga;
    }
}

传输

到目前为止,仅实现了InMemoryTransport、RabbitMq(BunnyTransport)和SQS以及SNS。但您可以通过实现2个接口来适配自己的任何传输方式。

interface Transport
{
    /**
     * @return \Generator<int, void, Envelope|null, void>
     */
    public function sending(): \Generator;

    /**
     * @return \Generator<int, Envelope, Result\Ok<null, mixed>|Result\Err<mixed, \Exception>, void>
     */
    public function receive(int $limit = 0): \Generator;

    public function stop(): void;
}
interface TransportSynchronisation
{
    public function sync(): void;
}

请注意,如果希望外部总线的 sync() 方法自动发生,则必须在外部消息名称上订阅您。

SQS传输

示例DSN或配置 sqs+http://key:secret@aws:4100/123456789012?region=eu-west-1&retries=3&visibilityTimeout=30&waitSeconds=20&waitBetweenLoopsSeconds=40&messagesBatch=10&preload=true&tags[name]=value&tags[name2]=value2&assume=arn%3Aaws%3Aiam%3A%3A123456789012%3Arole%2Fxaccounts3access&queue=QueueName"

选项

  • region - AWS区域
  • retries - 在创建期间,在死信队列之前重试的次数
  • visibilityTimeout - 消费消息将被阻止多长时间才能进行下一次尝试
  • waitSeconds - 长轮询期间,AWS将等待多长时间来收集一批消息
  • waitBetweenLoopsSeconds - 如果没有消息,我们将睡眠多长时间
  • messagesBatch - 我们将使用一个请求从AWS消费多少条消息
  • preload - 在处理前一条消息的同时异步加载下一条消息
  • tags[name] - 在创建时向队列添加标签
  • assume - 我们想要承担的AWS角色
  • queue - 实际的队列名称

bunny传输

bunny传输内部和外部传输不同。外部使用交换和发布/订阅。

请参阅 https://gitlab.com/GDXbsv/pservicebus/-/tree/master/src/Transport/Bunny

SQS-SNS传输

对于SQS-SNS传输,SNS仅用于外部消息。

请参阅 https://gitlab.com/GDXbsv/pservicebus/-/tree/master/src/Transport/Sqs 请参阅 https://gitlab.com/GDXbsv/pservicebus/-/tree/master/src/Transport/Sns