gdx / p-service-bus
PServiceBus
Requires
- php: >=8.2
- ext-json: *
- ext-pcntl: *
- doctrine/instantiator: ^1.4 || ^2.0
- prewk/result: ^3.1.0
- psr/log: ^3.0|^2.0|^1.0
- ramsey/uuid: ^4.5
- symfony/console: *
Requires (Dev)
- alexeyshockov/guzzle-psalm-plugin: ^v1.0.0
- aws/aws-sdk-php: ^3.285.0
- bunny/bunny: ^0.5.0
- doctrine/orm: ^2.10.1
- enqueue/dsn: ^0.10.8
- guzzlehttp/guzzle: ^7.3
- php-standard-library/psalm-plugin: ^2.2.1
- phpunit/phpunit: ^10.2
- react/async: ^v3.0 || ^4.0
- rector/rector: ^1.0.0
- roave/security-advisories: dev-master
- symfony/cache: *
- symfony/expression-language: *
- vimeo/psalm: ^5.12
Suggests
- aws/aws-sdk-php: Allows to use SQS adn/or SNS
- bunny/bunny: Allows to use RabbitMq as transport
- doctrine/orm: If you want to use it with Doctrine.
- enqueue/dsn: Allows to use SQS adn/or SNS
- react/async: Allows to use RabbitMq as transport
- symfony/expression-language: Allows to use custom VisibilityTimeout SQS
- dev-master
- 1.13.0
- 1.12.1
- 1.12.0
- 1.11.0
- 1.10.1
- 1.10.0
- 1.9.0
- 1.8.0
- 1.7.5
- 1.7.4
- 1.7.3
- 1.7.2
- 1.7.1
- 1.7.0
- 1.6.4
- 1.6.3
- 1.6.2
- 1.6.1
- 1.6.0
- 1.5.1
- 1.5.0
- 1.4.1
- 1.4.0
- 1.3.0
- 1.2.0
- 1.1.1
- 1.1.0
- 1.0.0
- 0.17.0
- 0.16.2
- 0.16.1
- 0.16.0
- 0.15.0
- 0.14.3
- 0.14.2
- 0.14.1
- 0.14.0
- 0.13.3
- 0.13.2
- 0.13.1
- 0.13.0
- 0.12.0
- 0.11.2
- 0.11.1
- 0.11.0
- 0.10.0
- 0.9.0
- 0.8.5
- 0.8.4
- 0.8.3
- 0.8.2
- 0.8.1
- 0.8.0
- 0.7.4
- 0.7.3
- 0.7.2
- 0.7.1
- 0.7.0
- 0.6.5
- 0.6.4
- 0.6.3
- 0.6.2
- 0.6.1
- 0.6.0
- 0.5.6
- 0.5.4
- 0.5.3
- 0.5.2
- 0.5.1
- 0.5.0
- 0.4.16
- 0.4.15
- 0.4.14
- 0.4.13
- 0.4.12
- 0.4.11
- 0.4.10
- 0.4.9
- 0.4.8
- 0.4.7
- 0.4.6
- 0.4.5
- 0.4.4
- 0.4.3
- 0.4.2
- 0.4.1
- 0.4.0
- 0.3.10
- 0.3.9
- 0.3.8
- 0.3.7
- 0.3.6
- 0.3.5
- 0.3.4
- 0.3.2
- 0.3.1
- 0.3.0
- 0.2.0
- dev-chore/assume-token
- dev-dapr
This package is auto-updated.
Last update: 2024-09-18 08:50:40 UTC
README
PHP的Service Bus,灵感来源于 NServiceBus。
您可以从他们的文档中了解使用原则和为什么需要它:)
文档很糟糕。如有需要,请在问题中寻求帮助。
Telegram群组: https://t.me/PServiceBus
Symfony: https://packagist.org.cn/packages/gdx/p-service-bus-symfony-bundle
Laravel: https://packagist.org.cn/packages/gdx/p-service-bus-laravel-package
简单概念
扩展概念
安装
composer require gdx/p-service-bus
使用
到目前为止没有很好的例子
请查看 https://gitlab.com/GDXbsv/pservicebus/-/tree/master/TestApp 中的示例
或在 symfony 扩展 https://packagist.org.cn/packages/gdx/p-service-bus-symfony-bundle
如何在您的项目中开始使用它
- 初始化示例 https://gitlab.com/GDXbsv/pservicebus/-/blob/master/src/Setup.php
- 我在测试中如何做 https://gitlab.com/GDXbsv/pservicebus/-/blob/master/tests/Integration/IntegrationTestCase.php#L45
特性
- Saga/Aggregate 消费命令/事件,产生事件。
- Bus 允许发送命令或发布事件。
- CoroutineBus 允许发送多个命令或发布多个事件
- Doctrine 集成(Saga 持久化,事务性消息(OutBox 模式),onlyOnce 控制)
- 自动为您初始化所有资源
- ServiceBus 作为主要入口点
初始化
p-service-bus:init
发送/发布命令/事件
\GDXbsv\PServiceBus\Bus\ServiceBus
实现了所有 Bus 接口。
如果您有一组消息,请使用 Bus
#command
$bus->send(new TestCommand());
#event
$bus->publish(new TestEvent());
如果您有许多消息,请使用 CoroutineBus
#command
$coroutine = $coroutineBus->sendCoroutine();
$coroutine->send(new Message(new TestCommand(1), CommandOptions::record()));
$coroutine->send(new Message(new TestCommand(2), CommandOptions::record()));
$coroutine->send(null);
#event
$coroutine = $coroutineBus->publishCoroutine();
$coroutine->send(new Message(new TestEvent(1), EventOptions::record()));
$coroutine->send(new Message(new TestEvent(2), EventOptions::record()));
$coroutine->send(null);
消费
要开始消费,请运行命令
p-service-bus:transport:consume memory
其中
- memory 是传输名称
处理器
您可以将任何方法作为处理器,只需使用 PHP 属性即可。您可以将
- 重试次数 设置为消息进入死信队列前的重试次数(SQS 不支持,对于 sqs,请使用 DSN 字符串配置队列本身)
- 超时秒数 消息第一次被处理前的初始延迟。(SQS 最大为 15 分钟)
- 重试超时表达式 用于计算每次重试延迟的定制公式。我们传递
retries_count
。语法请参阅这里 https://symfony.com.cn/doc/current/reference/formats/expression_language.html
您可以使用 MessageOption 设置所有这些选项。处理器将覆盖它们。不推荐这样做,因此没有记录。
<?php declare(strict_types=1);
use GDXbsv\PServiceBus\Bus\Handling\Handle;
use GDXbsv\PServiceBus\Bus\Handling\MessageHandleContext;
/**
* @internal
*/
final class Handlers
{
public string $result = '';
#[Handle('memory')]
public function handleCommand(TestMultiHandlersCommand $command, MessageHandleContext $context): void
{
$this->result .= '||' . $command->name;
}
/**
* 5 tries = initial try + 4 retries
*
* Retry no | Delay
* --------- | -------------
* 1 | 0 h 5 min
* 2 | 0 h 25 min
* 3 | 2 h 5 min
* 4 | 10 h 25 min
*
* after all retries -> push to DLQ after 10s
*/
#[Handle(transportName: 'memory', retries: 5, timeoutSec: 100, retriesTimeoutExpression: '(retries_count > 4) ? 10 : (60 * (5 ** retries_count))')]
public function anyNameFunction(Test1Event $event, MessageHandleContext $context): void
{
$this->result .= '||' . $event->name;
}
}
外部事件
发送到订阅的客户端外部(例如从 SNS)。或从外部接收我们已订阅的内容(例如到 SNS)。
<?php
declare(strict_types=1);
use GDXbsv\PServiceBus\Message\ExternalIn;
use GDXbsv\PServiceBus\Message\ExternalOut;
/**
* @internal
* @immutable
* @psalm-immutable
*/
#[ExternalOut(transportName: 'memory-external', externalName: 'test.external_1_event')]
final class ExternalOutEvent
{
}
#[ExternalIn(transportName: 'memory-external', externalName: 'test.external_1_event')]
final class ExternalInEvent
{
}
重放
有时某些事情出了问题,您想重放某些事件。为此,请使用重放注释。
<?php
declare(strict_types=1);
use GDXbsv\PServiceBus\Message\EventOptions;
use GDXbsv\PServiceBus\Message\Message;
use GDXbsv\PServiceBus\Message\Replay\Replay;
use GDXbsv\PServiceBusTestApp\Handling\Test1Event;
/**
* @internal
* @immutable
* @psalm-immutable
*
* @psalm-import-type ReplayOutput from \GDXbsv\PServiceBus\Message\Replay\Replaying
*/
final class ReplayForEvent
{
/**
* @return ReplayOutput
*/
#[Replay(replayName: 'testReplay')]
public function anyName(): \Traversable {
for ($i=1; $i<=5; ++$i) {
yield new Message(new Test1Event(), EventOptions::record());
}
}
}
然后使用命令开始重放
p-service-bus:message:replay testReplay "\GDXbsv\PServiceBusTestApp\Handling\Handlers::handle2Event1" memory
其中
- testReplay 是属性中的重放名称
- "\GDXbsv\PServiceBusTestApp\Handling\Handlers::handle2Event1" 是 className + :: + methodName
- memory 是传输名称
Saga
灵感来源:https://docs.particular.net/nservicebus/sagas/
这是一个长期运行的过程,当你需要响应多个事件来做出某些决策时。或者当你的数据和消息需要通过outbox模式进行事务性绑定时。
使用doctrine的示例
<?php declare(strict_types=1);
use GDXbsv\PServiceBus\Bus\Handling\Handle;
use GDXbsv\PServiceBus\Id;
use GDXbsv\PServiceBus\Message\TimeSpan;
use GDXbsv\PServiceBus\Saga\MessageSagaContext;
use GDXbsv\PServiceBus\Saga\Saga;
use GDXbsv\PServiceBus\Saga\SagaContext;
use GDXbsv\PServiceBus\Saga\SagaPropertyMapper;
use Doctrine\ORM\Mapping as ORM;
use GDXbsv\PServiceBusTestApp\Saga\TestSagaCommand;
use GDXbsv\PServiceBusTestApp\Saga\TestSagaMapStringCommand;
use GDXbsv\PServiceBusTestApp\Saga\TestsSagaInEvent;
use GDXbsv\PServiceBusTestApp\Saga\TestsSagaOutputEvent;
/**
* @final
*/
#[ORM\Entity]
final class TestSaga extends Saga
{
#[ORM\Column(type: 'id', nullable: false)]
#[ORM\Id]
private Id $id;
#[ORM\Column(type: 'string', nullable: false)]
private ?string $string;
#[ORM\Column(type: 'string', nullable: true)]
private ?string $value;
/**
* @param Id<static> $id
*/
private function __construct(Id $id, string $string)
{
$this->id = $id;
$this->string = $string;
}
public static function configureHowToCreateSaga(SagaCreateMapper $mapper): void
{
$mapper
->toMessage(
// do not forget to create handling function in a case if saga exists and to let saga know that we wait this message
function (TestSagaCreateCommand $command, MessageSagaContext $context) {
return new self(new Id($command->id), $command->string);
}
);
}
public static function configureHowToFindSaga(SagaPropertyMapper $mapper): void
{
$mapper
// Find saga by id
->mapSaga(new \ReflectionProperty(TestSaga::class, 'id'))
->toMessage(
function (TestSagaCommand $command, MessageSagaContext $context) {
return new Id($command->id);
}
)
->toMessage(
function (TestsSagaInEvent $message, MessageSagaContext $context) {
return new Id($message->string);
}
);
$mapper
// Find saga by string propery
->mapSaga(new \ReflectionProperty(TestSaga::class, 'string'))
->toMessage(
function (TestSagaMapStringCommand $command, MessageSagaContext $context) {
return $command->string;
}
);
}
/** We have to tell saga we wait this message, or saga could already exist */
#[Handle('memory', 3)]
public function testSagaCreateCommand(TestSagaCreateCommand $command, SagaContext $context)
{
$this->string = $command->string;
}
/** We can remove saga after all */
#[Handle('memory', 3)]
public function testRemove(TestSagaRemoveCommand $command, SagaContext $context)
{
$this->markAsComplete();
}
#[Handle('memory', 3)]
public function testHandlerFunction(TestSagaCommand $command, SagaContext $context)
{
$this->string = $command->string;
$context->timeout(new TestsSagaOutputEvent('testHandlerFunction'), TimeSpan::fromSeconds(0));
}
#[Handle('memory', 3)]
public function testListeningFunction(TestsSagaInEvent $event, SagaContext $context)
{
$this->string = $event->string;
$this->value = $event->value;
$context->publish(new TestsSagaOutputEvent('testListeningFunction'));
}
#[Handle('memory', 3)]
public function handleTestSagaMapStringCommand(
TestSagaMapStringCommand $command,
SagaContext $context
) {
$context->publish(new TestsSagaOutputEvent($this->id->toString()));
}
}
其中
configureHowToCreateSaga
描述了如何创建saga(警告:所有创建消息都应该有处理器)configureHowToFindSaga
描述了如何查找saga#[Handle('memory', 3)]
将方法设置为处理器
您可以通过使用 #[SagaFind]
属性创建自定义查找器,例如
<?php declare(strict_types=1);
use Doctrine\ORM\EntityManager;
use GDXbsv\PServiceBus\Message\MessageOptions;
use GDXbsv\PServiceBus\Saga\SagaFind;
use GDXbsv\PServiceBusTestApp\Saga\CustomDoctrineSearchEvent;
use GDXbsv\PServiceBusTestApp\Saga\TestSaga;
/**
* @internal
* @immutable
* @psalm-immutable
*/
final class CustomDoctrineSagaFinder
{
public function __construct(
private EntityManager $em
) {
}
#[SagaFind]
public function findByMultipleFields(
CustomDoctrineSearchEvent $event,
MessageOptions $messageOptions
): TestSaga {
$qb = $this->em->createQueryBuilder();
$qb
->from(TestSaga::class, 'saga')
->select('saga')
->where($qb->expr()->eq('saga.string', ':propertyValue'))
->setParameter(':propertyValue', $event->string);
$saga = $qb->getQuery()->getSingleResult();
return $saga;
}
}
传输
到目前为止,仅实现了InMemoryTransport、RabbitMq(BunnyTransport)和SQS以及SNS。但您可以通过实现2个接口来适配自己的任何传输方式。
interface Transport
{
/**
* @return \Generator<int, void, Envelope|null, void>
*/
public function sending(): \Generator;
/**
* @return \Generator<int, Envelope, Result\Ok<null, mixed>|Result\Err<mixed, \Exception>, void>
*/
public function receive(int $limit = 0): \Generator;
public function stop(): void;
}
interface TransportSynchronisation
{
public function sync(): void;
}
请注意,如果希望外部总线的 sync()
方法自动发生,则必须在外部消息名称上订阅您。
SQS传输
示例DSN或配置 sqs+http://key:secret@aws:4100/123456789012?region=eu-west-1&retries=3&visibilityTimeout=30&waitSeconds=20&waitBetweenLoopsSeconds=40&messagesBatch=10&preload=true&tags[name]=value&tags[name2]=value2&assume=arn%3Aaws%3Aiam%3A%3A123456789012%3Arole%2Fxaccounts3access&queue=QueueName"
选项
- region - AWS区域
- retries - 在创建期间,在死信队列之前重试的次数
- visibilityTimeout - 消费消息将被阻止多长时间才能进行下一次尝试
- waitSeconds - 长轮询期间,AWS将等待多长时间来收集一批消息
- waitBetweenLoopsSeconds - 如果没有消息,我们将睡眠多长时间
- messagesBatch - 我们将使用一个请求从AWS消费多少条消息
- preload - 在处理前一条消息的同时异步加载下一条消息
- tags[name] - 在创建时向队列添加标签
- assume - 我们想要承担的AWS角色
- queue - 实际的队列名称
bunny传输
bunny传输内部和外部传输不同。外部使用交换和发布/订阅。
请参阅 https://gitlab.com/GDXbsv/pservicebus/-/tree/master/src/Transport/Bunny
SQS-SNS传输
对于SQS-SNS传输,SNS仅用于外部消息。
请参阅 https://gitlab.com/GDXbsv/pservicebus/-/tree/master/src/Transport/Sqs 请参阅 https://gitlab.com/GDXbsv/pservicebus/-/tree/master/src/Transport/Sns