phpsagas/orchestrator

主组件控制并协调saga参与者

0.0.0 2021-06-01 21:44 UTC

This package is not auto-updated.

Last update: 2024-09-18 13:32:21 UTC


README

codecov

目录

简介

A saga 是一种用于分布式系统(例如基于微服务应用程序)的数据一致性维护机制,每个系统都有自己的数据库,这使得无法使用ACID。
Saga模式表示通过异步消息通信执行本地事务序列。
它可以看作是 2PC协议 的替代方案,它保留了重要的微服务架构优势,例如为具体微服务性质使用适当的存储的可能性,以及异步消息用于高可用性和松耦合。它还通过避免参与者阻塞,增加了灵活性和可伸缩性。

有两种方法可以协调saga执行

  • 编舞 - 服务发布和订阅领域事件;
  • 编排 - 执行由特殊服务管理,该服务控制事务序列以及它们必须执行的操作。此框架实现了这种方法。

有关saga的更多详细信息,您可以在 Chris Richardson网站 或他的伟大著作 Microservices Patterns 中找到。

要求

关于包

该组件是 phpsagas框架 的核心,负责协调每个saga参与者的本地事务执行。实施灵感来自 eventuate-tram-sagas框架。您可以通过以下方法之一使用orchestrator:

  • 作为您的项目的一部分(只是一个供应商包),通过将其包含在一个服务中来实现,该服务是分布式业务事务的所有者;
  • 作为一个独立项目,拥有自己的数据库。

此选择完全取决于您的偏好。每种选择都有其优势和缺点,例如将orchestrator用作独立服务可以提供使用其他数据库的可能性,以及在高性能硬件上的部署。然而,也有一些缺点,例如不希望控制逻辑集中化,以及单点故障。
将orchestrator用作项目包更简单,允许减少消息(使用当前服务事务作为本地命令)。

安装

您可以使用 Composer 安装该包

composer require phpsagas/orchestrator

入门

配置

您必须实现(或使用现有实现)一些接口

接下来,需要配置基本编排服务 - Saga CreatorSagaReplyHandler。您可以使用您喜欢的服务容器(symfony 自动注入pimplePHP-DI 等)或手动(见下文)。
之后,编排器就准备好使用了。让我们看看它是如何工作的。

Saga创建

Saga 必须通过提供由步骤定义的类型来实现 SagaInterface,每个分布式事务参与者都必须执行这些步骤。例如,让我们考虑 Travel Tour Service。旅游购买可能由某些服务分布式执行以下阶段

  • 酒店预订 // 酒店服务
  • 票务预订 // 票务服务
  • 签证获取 // 签证服务
  • 旅游购买 // 带编排器的旅游服务

因此,BuyTourSaga 定义可能如下所示

class BuyTourSaga
{
    // ...
    public function getSagaDefinition(): SagaDefinition
    {
        $steps = $this
            ->step()
            ->localCommand($buyTourCommand) // <-- compensatable transaction
            ->withCompensation($rejectTourCommand) // <-- compensating transaction
            ->step()
            ->remoteCommand($bookTicketsCommand)
            ->withCompensation($rejectTicketsBookingCommand)
            ->step()
            ->remoteCommand($bookHotelCommand)
            ->withCompensation($rejectHotelBookingCommand)
            ->step()
            ->remoteCommand($obtainVisaCommand) // <-- pivot transaction
            ->onReply($obtainVisaReplyHandler)
            ->step()
            ->remoteCommand($confirmHotelBookingCommand) // <-- retryable transaction
            ->step()
            ->remoteCommand($confirmTicketsBookingCommand)
            ->step()
            ->localCommand($confirmTourCommand);
        ;

        return $steps->build();
    }

    public function onFinished(string $sagaId, SagaDataInterface $data): void
    {
        // notify request initiator about successful outcome
    }

    public function getSagaType(): string
    {
        return 'buy_tour_saga';
    }
    
    private function step(): StepBuilder
    {
        return new StepBuilder(new SagaDefinitionBuilder());
    }
}

状态机 Saga 表示:Buy tour saga

Saga 执行
Buy tour saga

在创建 saga 定义步骤序列时请务必小心!
假设 Visa 服务是仅提供 VisaObtain API 的第三方项目(没有取消的可能性)。在这种情况下,所有可补偿的命令都应放置在签证获取命令之前,作为一个定义 saga 结果的 枢纽事务关于事务类别的更多详细信息)。
此外,每个 saga 生命周期包括创建、成功执行或失败,这可以包括某些逻辑实现(例如,使用 事件调度器)作为 saga 完成后请求发起者的通知。
Saga 可以执行本地(与编排器相同的项目)和远程命令。命令的主要目的是将业务逻辑执行委托给应用程序服务,并更新 saga 数据(对于本地命令)或为另一个微服务的逻辑执行提供数据(对于远程命令)。

本地命令 示例

class BuyTourCommand implements LocalCommandInterface
{
    private $tourService;

    // inject your project service
    public function __construct(TourService $tourService)
    {
        $this->tourService = $tourService;
    }

    /**
     * @param SagaDataInterface|BuyTourSagaData $sagaData
     */
    public function execute(SagaDataInterface $sagaData): void
    {
        // buyTour logic incapsulated behind project service, not here
        $tour = $this->tourService->buyTour(
            $sagaData->getCountry(),
            $sagaData->getCity(),
            $sagaData->getDateFrom(),
            $sagaData->getDateTill()
        );
        // set created tour id to saga data for next commands usage
        $sagaData->setTourId($tour->getId());
    }

    public function getSagaDataType(): string
    {
        return BuyTourSagaData::class;
    }
}

远程命令 示例

class BookTicketsCommand implements RemoteCommandInterface
{
    public function getCommandType(): string
    {
        return 'book_tickets_command';
    }

    public function getSagaDataClassName(): string
    {
        return BuyTourSagaData::class;
    }

    /**
     * Returns data using by another application services.
     *
     * @param SagaDataInterface|BuyTourSagaData $sagaData
     *
     * @return CommandDataInterface
     */
    public function getCommandData(SagaDataInterface $sagaData): CommandDataInterface
    {
        return new BookTicketsData(
            $sagaData->getCountry(),
            $sagaData->getCountry(),
            $sagaData->getDateFrom(),
            $sagaData->getDateTill()
        );
}

本地命令结果的处理可以在当前应用程序服务调用后立即进行(见上文)。为了处理远程命令执行结果,您必须使用 ReplyHandler

class BookHotelReplyHandler implements ReplyHandlerInterface
{
    /**
     * @param ReplyMessage                      $message
     * @param SagaDataInterface|BuyTourSagaData $sagaData
     */
    public function handle(ReplyMessage $message, SagaDataInterface $sagaData): void
    {
        if ($message->isSuccess()) {
            $payload = json_decode($message->getPayload(), true);
            $sagaData->setHotelBookingId($payload['hotelBookingId']);
        }
    }
}

内部

编排器主要有三个部分

  • BuildEngine - 负责Saga定义,表示执行步骤的状态;
  • InstantiationEngine - 提供创建 saga 和 saga 实例的方法;
  • ExecutionEngine - 控制 saga 执行,管理 saga 状态更改;
    • SagaCreator - 开始 saga 执行;
    • SagaReplyHandler - 执行远程命令处理(由消费者使用);
    • SagaActionsProcessor - 控制 saga 执行、更新和保存 saga 状态。

Saga 执行顺序:saga internal 更多 Saga 使用细节可在 测试包 中找到。

许可证

Saga Orchestrator 采用 MIT 许可协议 发布。