dtm/dtm-client

分布式事务管理器 DTM 的 PHP 协程客户端。一个用于 DTM 的 PHP 协程客户端

v0.3.5 2023-07-11 02:01 UTC

README

英文 | 中文

DTM Logo

Stable Version Php Version dtm-client License

PHPUnit for dtm-client Total Downloads Monthly Downloads

介绍

dtm/dtm-client 是分布式事务管理器 DTM 的 PHP 客户端。它支持 TCC 模式、 Saga 模式、 XA 模式和两阶段消息模式的分布式事务模式。在通信协议方面,它支持通过 HTTP 协议或 gRPC 协议与 DTM 服务器进行通信。此外,客户端可以安全地在 PHP-FPM 和 Swoole 协程环境中运行,并且还简化了 Hyperf 框架的支持。

关于 DTM

DTM 是一个基于 Go 语言的分布式事务管理器开源项目,它提供了跨语言和存储引擎组合事务的强大功能。DTM 精美地解决了分布式事务问题,如接口幂等性、空补偿和事务悬挂,并提供了一种易于使用、高性能且易于水平扩展的分布式事务解决方案。

优势

  • 易于启动
    • 零配置启动服务,提供非常简单、清晰的 HTTP 接口,极大地降低了分布式事务入门的难度
  • 跨编程语言
    • 适用于拥有多种语言堆栈的公司。它可以在 Go、Python、PHP、NodeJs、Ruby、C# 等各种语言中使用。
  • 易于使用
    • 开发者不再需要担心事务悬挂、空补偿、接口幂等性等问题,第一子事务屏障技术将为您处理这些问题
  • 易于部署和扩展
    • 仅依赖于 MySQL/Redis,易于部署,易于集群,易于水平扩展
  • 支持多种分布式事务协议
    • TCC、SAGA、XA、两阶段消息,一站式解决各种分布式事务问题

比较

在非 Java 语言中,除了 DTM 之外,还没有成熟的分布式事务管理器,因此这里将 DTM 与 Java 中最成熟的开源项目 Seata 进行比较

从上述比较的特点来看,DTM 在许多方面具有明显优势。如果您考虑多语言支持和多存储引擎支持,那么 DTM 无疑是您的首选。

安装

通过 Composer 安装 dtm-client 非常方便

composer require dtm/dtm-client
  • 在开始使用之前,请务必启动 DTM 服务器

配置

配置文件

如果您正在使用 Hyperf 框架,在安装组件后,可以使用以下 vendor:publish 命令将配置文件发布到 ./config/autoload/dtm.php

php bin/hyperf.php vendor:publish dtm/dtm-client

如果您使用的是非 Hyperf 框架,请将 ./vendor/dtm/dtm-client/publish/dtm.php 文件复制到相应的配置目录。

use DtmClient\Constants\Protocol;
use DtmClient\Constants\DbType;

return [
    // The communication protocol between the client and the DTM Server, supports Protocol::HTTP and Protocol::GRPC
    'protocol' => Protocol::HTTP,
    // DTM Server address
    'server' => '127.0.0.1',
    // DTM Server port
    'port' => [
        'http' => 36789,
        'grpc' => 36790,
    ],
    // Sub-transaction barrier
    'barrier' => [
        // Subtransaction barrier configuration in DB mode 
        'db' => [
            'type' => DbType::MySQL
        ],
        // Subtransaction barrier configuration in Redis mode
        'redis' => [
            // Timeout for subtransaction barrier records
            'expire_seconds' => 7 * 86400,
        ],
        // Classes that apply sub-transaction barriers in non-Hyperf frameworks or without annotation usage
        'apply' => [],
    ],
    // Options of Guzzle client under HTTP protocol
    'guzzle' => [
        'options' => [],
    ],
];

配置中间件

在使用之前,您需要将 DtmClient\Middleware\DtmMiddleware 中间件配置为服务器的全局中间件。此中间件支持 PSR-15 规范,适用于支持此规范的所有框架。有关 Hyperf 中中间件配置的详细信息,请参阅 Hyperf 文档 - 中间件 章节。

用法

dtm-client 的使用非常简单,我们提供了一个示例项目 dtm-php/dtm-sample,以帮助您更好地理解和调试。在使用此组件之前,强烈建议您阅读 DTM 官方文档,以获得更详细的了解。

TCC 模式

TCC 模式是一个非常流行的灵活分布式事务解决方案。TCC 的概念由三个词的首字母组成:Try-Confirm-Cancel。它首次在 2007 年 Pat Helland 发表的一篇名为 Life beyond Distributed Transactions:an Apostate’s Opinion 的论文中提出。

TCC 的三个阶段

尝试阶段:尝试执行,完成所有业务检查(一致性),预留必要的业务资源(预隔离)。确认阶段:如果所有 Try 分支都成功,则进入确认阶段。确认实际上执行业务而不进行任何业务检查,并且仅使用 Try 阶段中预留的业务资源。取消阶段:如果所有分支的其中一个 Try 失败,则进入取消阶段。释放 Try 阶段中预留的业务资源。

如果我们想执行类似银行间转账的业务,转出(TransOut)和转入(TransIn)在不同的微服务中,一个成功完成的 TCC 事务的典型序列图如下

示例

以下展示了如何在 Hyperf 框架中使用它,其他框架类似

<?php
namespace App\Controller;

use DtmClient\TCC;
use DtmClient\TransContext;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\GetMapping;
use Throwable;

#[Controller(prefix: '/tcc')]
class TccController
{

    protected string $serviceUri = 'http://127.0.0.1:9501';

    #[Inject]
    protected TCC $tcc;

    #[GetMapping(path: 'successCase')]
    public function successCase()
    {
        try {
            
            $this->tcc->globalTransaction(function (TCC $tcc) {
                // Create call data for subtransaction A
                $tcc->callBranch(
                    // Arguments for calling the Try method
                    ['amount' => 30],
                    // URL of Try stage
                    $this->serviceUri . '/tcc/transA/try',
                    // URL of Confirm stage
                    $this->serviceUri . '/tcc/transA/confirm',
                    // URL of Cancel stage
                    $this->serviceUri . '/tcc/transA/cancel'
                );
                // Create call data for subtransaction B, and so on
                $tcc->callBranch(
                    ['amount' => 30],
                    $this->serviceUri . '/tcc/transB/try',
                    $this->serviceUri . '/tcc/transB/confirm',
                    $this->serviceUri . '/tcc/transB/cancel'
                );
            });
        } catch (Throwable $e) {
            var_dump($e->getMessage(), $e->getTraceAsString());
        }
        // Get the global transaction ID through TransContext::getGid() and return it to the client
        return TransContext::getGid();
    }
}

Saga 模式

Saga 模式是分布式事务领域中广为人知的一种解决方案,并且在主要系统中也非常流行。它首次出现在 1987 年由 Hector Garcaa-Molrna 和 Kenneth Salem 发表的论文 SAGAS 中。

Saga 是一种最终一致性事务,也是一种灵活事务,也称为长运行事务。Saga 由一系列本地事务组成。在每个本地事务更新数据库后,它将发布一个消息或事件来触发 Saga 全球事务中下一个本地事务的执行。如果一个本地事务由于某些业务规则无法满足而失败,Saga 将对所有在失败事务之前成功提交的事务执行补偿操作。因此,当将 Saga 模式与 TCC 模式进行比较时,由于缺乏资源预留步骤,通常实现回滚逻辑会更麻烦。

Saga 的子事务拆分

例如,我们想要执行类似银行间转账的业务,将 30 美元从账户 A 转入账户 B。根据 Saga 事务的原则,我们将整个全局事务拆分为以下服务

  • 转出(TransOut)服务,账户 A 将扣除 30 美元
  • 转出补偿(TransOutCompensate)服务,回滚上述转出操作,即增加账户 A 30 美元
  • 转入(TransIn)服务,账户 B 将增加 30 美元
  • 转入补偿(TransInCompensate)服务,回滚上述转入操作,即账户 B 减少美元 30 美元

整个事务的逻辑是

执行转出成功 => 执行转入成功 => 全球事务完成

如果在中间发生错误,例如将资金转入账户 B 时出现错误,则将调用已执行的分支的补偿操作,即

执行转出成功 => 执行转入失败 => 执行转入补偿成功 => 执行转出补偿成功 => 全球事务回滚完成

以下是一个成功完成的 SAGA 事务的典型序列图

示例

以下展示了如何在 Hyperf 框架中使用它,其他框架类似

namespace App\Controller;

use DtmClient\Saga;
use DtmClient\TransContext;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\GetMapping;

#[Controller(prefix: '/saga')]
class SagaController
{

    protected string $serviceUri = 'http://127.0.0.1:9501';
    
    #[Inject]
    protected Saga $saga;

    #[GetMapping(path: 'successCase')]
    public function successCase(): string
    {
        $payload = ['amount' => 50];
        // Init Saga global transaction
        $this->saga->init();
        // Add TransOut sub-transaction
        $this->saga->add(
            $this->serviceUri . '/saga/transOut', 
            $this->serviceUri . '/saga/transOutCompensate', 
            $payload
        );
        // Add TransIn sub-transaction
        $this->saga->add(
            $this->serviceUri . '/saga/transIn', 
            $this->serviceUri . '/saga/transInCompensate', 
            $payload
        );
        // Submit Saga global transaction
        $this->saga->submit();
        // Get the global transaction ID through TransContext::getGid() and return it to the client
        return TransContext::getGid();
    }
}

XA 模式

XA 是由 X/Open 组织提出的一种分布式事务规范。X/Open 分布式事务处理(DTP)模型设想了三个软件组件

应用程序(AP)定义事务边界并指定构成事务的操作。

资源管理器(RM,如数据库或文件访问系统)提供对共享资源的访问。

一个名为事务管理器(TM)的独立组件为事务分配标识符,监控其进度,并负责事务完成和故障恢复。

以下图展示了 X/Open DTP 模型定义的接口。

XA 分为两个阶段。

第一阶段(准备):所有参与的资源管理器(RM)准备执行其事务并锁定所需的资源。当每个参与者准备就绪时,它向 TM 报告。

第二阶段(提交/回滚):当事务管理器(TM)收到所有参与者(RM)准备就绪的信号时,它向所有参与者发送提交命令。否则,它向所有参与者发送回滚命令。

目前,几乎所有流行的数据库都支持 XA 事务,包括 MySQL、Oracle、SqlServer 和 Postgres。

示例代码

以下是在 Hyperf 框架中展示的代码,与其他框架类似

<?php

namespace App\Controller;

use App\Grpc\GrpcClient;
use DtmClient\DbTransaction\DBTransactionInterface;
use DtmClient\TransContext;
use DtmClient\XA;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\GetMapping;
use Hyperf\HttpServer\Annotation\RequestMapping;
use Hyperf\HttpServer\Contract\RequestInterface;
use Psr\Http\Message\ResponseInterface;

#[Controller(prefix: '/xa')]
class XAController
{

    private GrpcClient $grpcClient;

    protected string $serviceUri = 'http://127.0.0.1:9502';

    public function __construct(
        private XA $xa,
        protected ConfigInterface $config,
    ) {
        $server = $this->config->get('dtm.server', '127.0.0.1');
        $port = $this->config->get('dtm.port.grpc', 36790);
        $hostname = $server . ':' . $port;
        $this->grpcClient = new GrpcClient($hostname);
    }


    #[GetMapping(path: 'successCase')]
    public function successCase(): string
    {
        $payload = ['amount' => 50];
        // Open the Xa, the global thing
        $gid = $this->xa->generateGid();
        $this->xa->globalTransaction($gid, function () use ($payload) {
            // Call the subthings interface
            $respone = $this->xa->callBranch($this->serviceUri . '/xa/api/transIn', $payload);
            // Get subthings return structure in XA http mode
            /* @var ResponseInterface $respone */
            $respone->getBody()->getContents();
            // Call the subthings interface
            $payload = ['amount' => 10];
            $this->xa->callBranch($this->serviceUri . '/xa/api/transOut', $payload);
        });
        // Return the global transaction ID via TransContext:: getGid()
        return TransContext::getGid();
    }

    #[RequestMapping(methods: ["GET", "POST", "PUT"], path: 'api/transIn')]
    public function transIn(RequestInterface $request): array
    {
        $content = $request->post('amount');
        $amount = $content['amount'] ?? 50;
        // The transIn method under the simulated distributed system
        $this->xa->localTransaction(function (DBTransactionInterface $dbTransaction) use ($amount) {
            // Please use the DBTransactionInterface to handle the local Mysql things
            $dbTransaction->xaExecute('UPDATE `order` set `amount` = `amount` + ? where id = 1', [$amount]);
        });

        return ['status' => 0, 'message' => 'ok'];
    }

    /**
     * @param RequestInterface $request
     * @return array
     */
    #[RequestMapping(methods: ["GET", "POST", "PUT"], path: 'api/transOut')]
    public function transOut(RequestInterface $request): array
    {
        $content = $request->post('amount');
        $amount = $content['amount'] ?? 10;
        // The transOut method under the simulated distributed system
        $this->xa->localTransaction(function (DBTransactionInterface $dbTransaction) use ($amount) {
            // Please use the DBTransactionInterface to handle the local Mysql things
            $dbTransaction->xaExecute('UPDATE `order` set `amount` = `amount` - ? where id = 2', [$amount]);
        });

        return ['status' => 0, 'message' => 'ok'];
    }
}

上述代码首先注册了一个全局 XA 事务,然后调用两个子事务 TransOut 和 TransIn。所有子事务执行成功后,全局 XA 事务提交给 DTM。DTM 接收到 XA 全局事务的提交,然后调用所有子事务的 XA 提交,并最终将全局事务的状态更改为成功。