dtm / dtm-client
分布式事务管理器 DTM 的 PHP 协程客户端。一个用于 DTM 的 PHP 协程客户端
Requires
- php: >=8.0
- ext-json: *
- guzzlehttp/guzzle: ^7.4
- hyperf/context: ^2.2|^3.0
- psr/http-server-middleware: ^1.0
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.0
- mockery/mockery: ^1.0
- phpstan/phpstan: ^1.0
- phpunit/phpunit: ^9.5
Suggests
- ext-openssl: Required to use HTTPS.
- ext-pdo: Required to use MySQL Client.
- ext-pdo_mysql: Required to use MySQL Client.
- ext-redis: Required to use Redis Client.
- hyperf/config: ^2.2|^3.0
- hyperf/db: ^2,2|^3.0
- hyperf/di: ^2.2|^3.0
- hyperf/grpc-client: ^2.2|^3.0
- hyperf/json-rpc: ^2.2|^3.0
- hyperf/redis: ^2,2|^3.0
- hyperf/rpc-client: ^2.2|^3.0
README
英文 | 中文
介绍
dtm/dtm-client 是分布式事务管理器 DTM 的 PHP 客户端。它支持 TCC 模式、 Saga 模式、 XA 模式和两阶段消息模式的分布式事务模式。在通信协议方面,它支持通过 HTTP 协议或 gRPC 协议与 DTM 服务器进行通信。此外,客户端可以安全地在 PHP-FPM 和 Swoole 协程环境中运行,并且还简化了 Hyperf 框架的支持。
关于 DTM
DTM 是一个基于 Go 语言的分布式事务管理器开源项目,它提供了跨语言和存储引擎组合事务的强大功能。DTM 精美地解决了分布式事务问题,如接口幂等性、空补偿和事务悬挂,并提供了一种易于使用、高性能且易于水平扩展的分布式事务解决方案。
优势
- 易于启动
- 零配置启动服务,提供非常简单、清晰的 HTTP 接口,极大地降低了分布式事务入门的难度
- 跨编程语言
- 适用于拥有多种语言堆栈的公司。它可以在 Go、Python、PHP、NodeJs、Ruby、C# 等各种语言中使用。
- 易于使用
- 开发者不再需要担心事务悬挂、空补偿、接口幂等性等问题,第一子事务屏障技术将为您处理这些问题
- 易于部署和扩展
- 仅依赖于 MySQL/Redis,易于部署,易于集群,易于水平扩展
- 支持多种分布式事务协议
- TCC、SAGA、XA、两阶段消息,一站式解决各种分布式事务问题
比较
在非 Java 语言中,除了 DTM 之外,还没有成熟的分布式事务管理器,因此这里将 DTM 与 Java 中最成熟的开源项目 Seata 进行比较
从上述比较的特点来看,DTM 在许多方面具有明显优势。如果您考虑多语言支持和多存储引擎支持,那么 DTM 无疑是您的首选。
安装
通过 Composer 安装 dtm-client 非常方便
composer require dtm/dtm-client
- 在开始使用之前,请务必启动 DTM 服务器
配置
配置文件
如果您正在使用 Hyperf 框架,在安装组件后,可以使用以下 vendor:publish
命令将配置文件发布到 ./config/autoload/dtm.php
php bin/hyperf.php vendor:publish dtm/dtm-client
如果您使用的是非 Hyperf 框架,请将 ./vendor/dtm/dtm-client/publish/dtm.php
文件复制到相应的配置目录。
use DtmClient\Constants\Protocol; use DtmClient\Constants\DbType; return [ // The communication protocol between the client and the DTM Server, supports Protocol::HTTP and Protocol::GRPC 'protocol' => Protocol::HTTP, // DTM Server address 'server' => '127.0.0.1', // DTM Server port 'port' => [ 'http' => 36789, 'grpc' => 36790, ], // Sub-transaction barrier 'barrier' => [ // Subtransaction barrier configuration in DB mode 'db' => [ 'type' => DbType::MySQL ], // Subtransaction barrier configuration in Redis mode 'redis' => [ // Timeout for subtransaction barrier records 'expire_seconds' => 7 * 86400, ], // Classes that apply sub-transaction barriers in non-Hyperf frameworks or without annotation usage 'apply' => [], ], // Options of Guzzle client under HTTP protocol 'guzzle' => [ 'options' => [], ], ];
配置中间件
在使用之前,您需要将 DtmClient\Middleware\DtmMiddleware
中间件配置为服务器的全局中间件。此中间件支持 PSR-15 规范,适用于支持此规范的所有框架。有关 Hyperf 中中间件配置的详细信息,请参阅 Hyperf 文档 - 中间件 章节。
用法
dtm-client 的使用非常简单,我们提供了一个示例项目 dtm-php/dtm-sample,以帮助您更好地理解和调试。在使用此组件之前,强烈建议您阅读 DTM 官方文档,以获得更详细的了解。
TCC 模式
TCC 模式是一个非常流行的灵活分布式事务解决方案。TCC 的概念由三个词的首字母组成:Try-Confirm-Cancel。它首次在 2007 年 Pat Helland 发表的一篇名为 Life beyond Distributed Transactions:an Apostate’s Opinion 的论文中提出。
TCC 的三个阶段
尝试阶段:尝试执行,完成所有业务检查(一致性),预留必要的业务资源(预隔离)。确认阶段:如果所有 Try 分支都成功,则进入确认阶段。确认实际上执行业务而不进行任何业务检查,并且仅使用 Try 阶段中预留的业务资源。取消阶段:如果所有分支的其中一个 Try 失败,则进入取消阶段。释放 Try 阶段中预留的业务资源。
如果我们想执行类似银行间转账的业务,转出(TransOut)和转入(TransIn)在不同的微服务中,一个成功完成的 TCC 事务的典型序列图如下
示例
以下展示了如何在 Hyperf 框架中使用它,其他框架类似
<?php namespace App\Controller; use DtmClient\TCC; use DtmClient\TransContext; use Hyperf\Di\Annotation\Inject; use Hyperf\HttpServer\Annotation\Controller; use Hyperf\HttpServer\Annotation\GetMapping; use Throwable; #[Controller(prefix: '/tcc')] class TccController { protected string $serviceUri = 'http://127.0.0.1:9501'; #[Inject] protected TCC $tcc; #[GetMapping(path: 'successCase')] public function successCase() { try { $this->tcc->globalTransaction(function (TCC $tcc) { // Create call data for subtransaction A $tcc->callBranch( // Arguments for calling the Try method ['amount' => 30], // URL of Try stage $this->serviceUri . '/tcc/transA/try', // URL of Confirm stage $this->serviceUri . '/tcc/transA/confirm', // URL of Cancel stage $this->serviceUri . '/tcc/transA/cancel' ); // Create call data for subtransaction B, and so on $tcc->callBranch( ['amount' => 30], $this->serviceUri . '/tcc/transB/try', $this->serviceUri . '/tcc/transB/confirm', $this->serviceUri . '/tcc/transB/cancel' ); }); } catch (Throwable $e) { var_dump($e->getMessage(), $e->getTraceAsString()); } // Get the global transaction ID through TransContext::getGid() and return it to the client return TransContext::getGid(); } }
Saga 模式
Saga 模式是分布式事务领域中广为人知的一种解决方案,并且在主要系统中也非常流行。它首次出现在 1987 年由 Hector Garcaa-Molrna 和 Kenneth Salem 发表的论文 SAGAS 中。
Saga 是一种最终一致性事务,也是一种灵活事务,也称为长运行事务。Saga 由一系列本地事务组成。在每个本地事务更新数据库后,它将发布一个消息或事件来触发 Saga 全球事务中下一个本地事务的执行。如果一个本地事务由于某些业务规则无法满足而失败,Saga 将对所有在失败事务之前成功提交的事务执行补偿操作。因此,当将 Saga 模式与 TCC 模式进行比较时,由于缺乏资源预留步骤,通常实现回滚逻辑会更麻烦。
Saga 的子事务拆分
例如,我们想要执行类似银行间转账的业务,将 30 美元从账户 A 转入账户 B。根据 Saga 事务的原则,我们将整个全局事务拆分为以下服务
- 转出(TransOut)服务,账户 A 将扣除 30 美元
- 转出补偿(TransOutCompensate)服务,回滚上述转出操作,即增加账户 A 30 美元
- 转入(TransIn)服务,账户 B 将增加 30 美元
- 转入补偿(TransInCompensate)服务,回滚上述转入操作,即账户 B 减少美元 30 美元
整个事务的逻辑是
执行转出成功 => 执行转入成功 => 全球事务完成
如果在中间发生错误,例如将资金转入账户 B 时出现错误,则将调用已执行的分支的补偿操作,即
执行转出成功 => 执行转入失败 => 执行转入补偿成功 => 执行转出补偿成功 => 全球事务回滚完成
以下是一个成功完成的 SAGA 事务的典型序列图
示例
以下展示了如何在 Hyperf 框架中使用它,其他框架类似
namespace App\Controller; use DtmClient\Saga; use DtmClient\TransContext; use Hyperf\Di\Annotation\Inject; use Hyperf\HttpServer\Annotation\Controller; use Hyperf\HttpServer\Annotation\GetMapping; #[Controller(prefix: '/saga')] class SagaController { protected string $serviceUri = 'http://127.0.0.1:9501'; #[Inject] protected Saga $saga; #[GetMapping(path: 'successCase')] public function successCase(): string { $payload = ['amount' => 50]; // Init Saga global transaction $this->saga->init(); // Add TransOut sub-transaction $this->saga->add( $this->serviceUri . '/saga/transOut', $this->serviceUri . '/saga/transOutCompensate', $payload ); // Add TransIn sub-transaction $this->saga->add( $this->serviceUri . '/saga/transIn', $this->serviceUri . '/saga/transInCompensate', $payload ); // Submit Saga global transaction $this->saga->submit(); // Get the global transaction ID through TransContext::getGid() and return it to the client return TransContext::getGid(); } }
XA 模式
XA 是由 X/Open 组织提出的一种分布式事务规范。X/Open 分布式事务处理(DTP)模型设想了三个软件组件
应用程序(AP)定义事务边界并指定构成事务的操作。
资源管理器(RM,如数据库或文件访问系统)提供对共享资源的访问。
一个名为事务管理器(TM)的独立组件为事务分配标识符,监控其进度,并负责事务完成和故障恢复。
以下图展示了 X/Open DTP 模型定义的接口。
XA 分为两个阶段。
第一阶段(准备):所有参与的资源管理器(RM)准备执行其事务并锁定所需的资源。当每个参与者准备就绪时,它向 TM 报告。
第二阶段(提交/回滚):当事务管理器(TM)收到所有参与者(RM)准备就绪的信号时,它向所有参与者发送提交命令。否则,它向所有参与者发送回滚命令。
目前,几乎所有流行的数据库都支持 XA 事务,包括 MySQL、Oracle、SqlServer 和 Postgres。
示例代码
以下是在 Hyperf 框架中展示的代码,与其他框架类似
<?php namespace App\Controller; use App\Grpc\GrpcClient; use DtmClient\DbTransaction\DBTransactionInterface; use DtmClient\TransContext; use DtmClient\XA; use Hyperf\Contract\ConfigInterface; use Hyperf\Di\Annotation\Inject; use Hyperf\HttpServer\Annotation\Controller; use Hyperf\HttpServer\Annotation\GetMapping; use Hyperf\HttpServer\Annotation\RequestMapping; use Hyperf\HttpServer\Contract\RequestInterface; use Psr\Http\Message\ResponseInterface; #[Controller(prefix: '/xa')] class XAController { private GrpcClient $grpcClient; protected string $serviceUri = 'http://127.0.0.1:9502'; public function __construct( private XA $xa, protected ConfigInterface $config, ) { $server = $this->config->get('dtm.server', '127.0.0.1'); $port = $this->config->get('dtm.port.grpc', 36790); $hostname = $server . ':' . $port; $this->grpcClient = new GrpcClient($hostname); } #[GetMapping(path: 'successCase')] public function successCase(): string { $payload = ['amount' => 50]; // Open the Xa, the global thing $gid = $this->xa->generateGid(); $this->xa->globalTransaction($gid, function () use ($payload) { // Call the subthings interface $respone = $this->xa->callBranch($this->serviceUri . '/xa/api/transIn', $payload); // Get subthings return structure in XA http mode /* @var ResponseInterface $respone */ $respone->getBody()->getContents(); // Call the subthings interface $payload = ['amount' => 10]; $this->xa->callBranch($this->serviceUri . '/xa/api/transOut', $payload); }); // Return the global transaction ID via TransContext:: getGid() return TransContext::getGid(); } #[RequestMapping(methods: ["GET", "POST", "PUT"], path: 'api/transIn')] public function transIn(RequestInterface $request): array { $content = $request->post('amount'); $amount = $content['amount'] ?? 50; // The transIn method under the simulated distributed system $this->xa->localTransaction(function (DBTransactionInterface $dbTransaction) use ($amount) { // Please use the DBTransactionInterface to handle the local Mysql things $dbTransaction->xaExecute('UPDATE `order` set `amount` = `amount` + ? where id = 1', [$amount]); }); return ['status' => 0, 'message' => 'ok']; } /** * @param RequestInterface $request * @return array */ #[RequestMapping(methods: ["GET", "POST", "PUT"], path: 'api/transOut')] public function transOut(RequestInterface $request): array { $content = $request->post('amount'); $amount = $content['amount'] ?? 10; // The transOut method under the simulated distributed system $this->xa->localTransaction(function (DBTransactionInterface $dbTransaction) use ($amount) { // Please use the DBTransactionInterface to handle the local Mysql things $dbTransaction->xaExecute('UPDATE `order` set `amount` = `amount` - ? where id = 2', [$amount]); }); return ['status' => 0, 'message' => 'ok']; } }
上述代码首先注册了一个全局 XA 事务,然后调用两个子事务 TransOut 和 TransIn。所有子事务执行成功后,全局 XA 事务提交给 DTM。DTM 接收到 XA 全局事务的提交,然后调用所有子事务的 XA 提交,并最终将全局事务的状态更改为成功。