phpsagas / orchestrator
主组件控制并协调saga参与者
Requires
- php: ^7.1 || ^8.0
- ext-json: *
- phpsagas/contracts: ^0.0
- psr/log: ^1.1
Requires (Dev)
- codeception/codeception: ^4.0
- codeception/mockery-module: ^0.4.0
- codeception/module-asserts: ^1.3
- roave/security-advisories: dev-master
- squizlabs/php_codesniffer: ^3.3
This package is not auto-updated.
Last update: 2024-09-18 13:32:21 UTC
README
目录
简介
A saga 是一种用于分布式系统(例如基于微服务应用程序)的数据一致性维护机制,每个系统都有自己的数据库,这使得无法使用ACID。
Saga模式表示通过异步消息通信执行本地事务序列。
它可以看作是 2PC协议 的替代方案,它保留了重要的微服务架构优势,例如为具体微服务性质使用适当的存储的可能性,以及异步消息用于高可用性和松耦合。它还通过避免参与者阻塞,增加了灵活性和可伸缩性。
有两种方法可以协调saga执行
- 编舞 - 服务发布和订阅领域事件;
- 编排 - 执行由特殊服务管理,该服务控制事务序列以及它们必须执行的操作。此框架实现了这种方法。
有关saga的更多详细信息,您可以在 Chris Richardson网站 或他的伟大著作 Microservices Patterns 中找到。
要求
- php: >= 7.1
- ext-json
- phpsagas/contracts
- psr/log: ^1.1
关于包
该组件是 phpsagas框架 的核心,负责协调每个saga参与者的本地事务执行。实施灵感来自 eventuate-tram-sagas框架。您可以通过以下方法之一使用orchestrator:
- 作为您的项目的一部分(只是一个供应商包),通过将其包含在一个服务中来实现,该服务是分布式业务事务的所有者;
- 作为一个独立项目,拥有自己的数据库。
此选择完全取决于您的偏好。每种选择都有其优势和缺点,例如将orchestrator用作独立服务可以提供使用其他数据库的可能性,以及在高性能硬件上的部署。然而,也有一些缺点,例如不希望控制逻辑集中化,以及单点故障。
将orchestrator用作项目包更简单,允许减少消息(使用当前服务事务作为本地命令)。
安装
您可以使用 Composer 安装该包
composer require phpsagas/orchestrator
入门
配置
您必须实现(或使用现有实现)一些接口
SagaFactoryInterface
- 用于创建saga(测试示例)SagaInstanceRepositoryInterface
- saga存储库(doctrine实现)MessagePayloadSerializerInterface
- 序列化参与者命令数据消息(symfony/serializer 实现)SagaSerializerInterface
- 序列化 Saga 数据(symfony/serializer 实现)MessageProducerInterface
- 消息发送者(symfony/messenger 实现)MessageIdGeneratorInterface
- 生成消息 ID(uuid 实现)。
接下来,需要配置基本编排服务 - Saga Creator
和 SagaReplyHandler
。您可以使用您喜欢的服务容器(symfony 自动注入、pimple、PHP-DI 等)或手动(见下文)。
之后,编排器就准备好使用了。让我们看看它是如何工作的。
Saga创建
Saga 必须通过提供由步骤定义的类型来实现 SagaInterface
,每个分布式事务参与者都必须执行这些步骤。例如,让我们考虑 Travel Tour Service。旅游购买可能由某些服务分布式执行以下阶段
- 酒店预订 // 酒店服务
- 票务预订 // 票务服务
- 签证获取 // 签证服务
- 旅游购买 // 带编排器的旅游服务
因此,BuyTourSaga
定义可能如下所示
class BuyTourSaga { // ... public function getSagaDefinition(): SagaDefinition { $steps = $this ->step() ->localCommand($buyTourCommand) // <-- compensatable transaction ->withCompensation($rejectTourCommand) // <-- compensating transaction ->step() ->remoteCommand($bookTicketsCommand) ->withCompensation($rejectTicketsBookingCommand) ->step() ->remoteCommand($bookHotelCommand) ->withCompensation($rejectHotelBookingCommand) ->step() ->remoteCommand($obtainVisaCommand) // <-- pivot transaction ->onReply($obtainVisaReplyHandler) ->step() ->remoteCommand($confirmHotelBookingCommand) // <-- retryable transaction ->step() ->remoteCommand($confirmTicketsBookingCommand) ->step() ->localCommand($confirmTourCommand); ; return $steps->build(); } public function onFinished(string $sagaId, SagaDataInterface $data): void { // notify request initiator about successful outcome } public function getSagaType(): string { return 'buy_tour_saga'; } private function step(): StepBuilder { return new StepBuilder(new SagaDefinitionBuilder()); } }
在创建 saga 定义步骤序列时请务必小心!
假设 Visa 服务是仅提供 VisaObtain API 的第三方项目(没有取消的可能性)。在这种情况下,所有可补偿的命令都应放置在签证获取命令之前,作为一个定义 saga 结果的 枢纽事务(关于事务类别的更多详细信息)。
此外,每个 saga 生命周期包括创建、成功执行或失败,这可以包括某些逻辑实现(例如,使用 事件调度器)作为 saga 完成后请求发起者的通知。
Saga 可以执行本地(与编排器相同的项目)和远程命令。命令的主要目的是将业务逻辑执行委托给应用程序服务,并更新 saga 数据(对于本地命令)或为另一个微服务的逻辑执行提供数据(对于远程命令)。
本地命令 示例
class BuyTourCommand implements LocalCommandInterface { private $tourService; // inject your project service public function __construct(TourService $tourService) { $this->tourService = $tourService; } /** * @param SagaDataInterface|BuyTourSagaData $sagaData */ public function execute(SagaDataInterface $sagaData): void { // buyTour logic incapsulated behind project service, not here $tour = $this->tourService->buyTour( $sagaData->getCountry(), $sagaData->getCity(), $sagaData->getDateFrom(), $sagaData->getDateTill() ); // set created tour id to saga data for next commands usage $sagaData->setTourId($tour->getId()); } public function getSagaDataType(): string { return BuyTourSagaData::class; } }
远程命令 示例
class BookTicketsCommand implements RemoteCommandInterface { public function getCommandType(): string { return 'book_tickets_command'; } public function getSagaDataClassName(): string { return BuyTourSagaData::class; } /** * Returns data using by another application services. * * @param SagaDataInterface|BuyTourSagaData $sagaData * * @return CommandDataInterface */ public function getCommandData(SagaDataInterface $sagaData): CommandDataInterface { return new BookTicketsData( $sagaData->getCountry(), $sagaData->getCountry(), $sagaData->getDateFrom(), $sagaData->getDateTill() ); }
本地命令结果的处理可以在当前应用程序服务调用后立即进行(见上文)。为了处理远程命令执行结果,您必须使用 ReplyHandler
class BookHotelReplyHandler implements ReplyHandlerInterface { /** * @param ReplyMessage $message * @param SagaDataInterface|BuyTourSagaData $sagaData */ public function handle(ReplyMessage $message, SagaDataInterface $sagaData): void { if ($message->isSuccess()) { $payload = json_decode($message->getPayload(), true); $sagaData->setHotelBookingId($payload['hotelBookingId']); } } }
内部
编排器主要有三个部分
BuildEngine
- 负责Saga定义,表示执行步骤的状态;InstantiationEngine
- 提供创建 saga 和 saga 实例的方法;ExecutionEngine
- 控制 saga 执行,管理 saga 状态更改;SagaCreator
- 开始 saga 执行;SagaReplyHandler
- 执行远程命令处理(由消费者使用);SagaActionsProcessor
- 控制 saga 执行、更新和保存 saga 状态。
Saga 执行顺序: 更多 Saga 使用细节可在 测试包 中找到。
许可证
Saga Orchestrator 采用 MIT 许可协议 发布。