wyi / dtm-client
A PHP coroutine client for distributed transaction manager DTM. 分布式事务管理器 DTM 的 PHP 协程客户端
Requires
- php: >=7.2
- ext-json: *
- hyperf/context: ^2.2|^3.0
- hyperf/guzzle: ^2.2|^3.0
- psr/http-server-middleware: ^1.0
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.0
- mockery/mockery: ^1.0
- phpstan/phpstan: ^1.0
- phpunit/phpunit: >=7.0
Suggests
- ext-openssl: Required to use HTTPS.
- ext-pdo: Required to use MySQL Client.
- ext-pdo_mysql: Required to use MySQL Client.
- ext-redis: Required to use Redis Client.
- hyperf/config: ^2.2|^3.0
- hyperf/db: ^2,2|^3.0
- hyperf/di: ^2.2|^3.0
- hyperf/grpc-client: ^2.2|^3.0
- hyperf/json-rpc: ^2.2|^3.0
- hyperf/redis: ^2,2|^3.0
- hyperf/rpc-client: ^2.2|^3.0
This package is auto-updated.
Last update: 2024-09-16 17:29:34 UTC
README
英文 | 中文
简介
dtm/dtm-client is the PHP client of Distributed Transaction Manager DTM. It has supported distributed transaction patterns of TCC pattern, Saga pattern, XA pattern, and two-phase message pattern. In communicate protocol it has supported communicate with DTM Server through HTTP protocol or gRPC protocol. Also the client can safely run in PHP-FPM and Swoole coroutine environment, and it has also make support more easier for Hyperf framework.
About DTM
DTM is an open source distributed transaction manager based on Go language, which provides the powerful function of combining transactions across languages and storage engines. DTM elegantly solves distributed transaction problems such as interface idempotent, null compensation, and transaction suspension, and also provides a distributed transaction solutions that are easy to use, high performance, and easy to scale horizontally.
优势
- 易于启动
- 使用零配置启动服务,并提供一个非常简单且清晰的 HTTP 接口,这大大降低了开始分布式事务的难度
- 跨编程语言
- 适用于具有多种语言堆栈的公司。它可以在 Go、Python、PHP、NodeJs、Ruby、C# 等各种语言中使用
- 简单易用
- 开发者不再需要担心事务悬挂、空补偿、接口幂等化等问题,第一个子事务屏障技术会为您处理这些问题
- 易于部署和扩展
- 仅依赖于 MySQL/Redis,易于部署、易于集群和易于水平扩展
- 支持多种分布式事务协议
- TCC、SAGA、XA、两阶段消息,一站式解决各种分布式事务问题
比较
在非 Java 语言中,除了 DTM 之外,还没有其他成熟的分布式事务管理器,因此这里对比了 DTM 和 Java 中最成熟的开源项目 Seata
从上面的比较特征来看,DTM 在许多方面都具有很大的优势。如果您考虑多语言支持和多存储引擎支持,那么 DTM 无疑是您的首选。
安装
通过 Composer 安装 dtm-client 非常方便
composer require dtm/dtm-client
- 在使用它之前,请务必启动 DTM Server
配置
配置文件
如果您正在使用 Hyperf 框架,在安装组件后,可以使用以下 vendor:publish
命令将配置文件发布到 ./config/autoload/dtm.php
php bin/hyperf.php vendor:publish dtm/dtm-client
如果您正在使用非 Hyperf 框架,请将 ./vendor/dtm/dtm-client/publish/dtm.php
文件复制到相应的配置目录。
use DtmClient\Constants\Protocol; use DtmClient\Constants\DbType; return [ // The communication protocol between the client and the DTM Server, supports Protocol::HTTP and Protocol::GRPC 'protocol' => Protocol::HTTP, // DTM Server address 'server' => '127.0.0.1', // DTM Server port 'port' => [ 'http' => 36789, 'grpc' => 36790, ], // Sub-transaction barrier 'barrier' => [ // Subtransaction barrier configuration in DB mode 'db' => [ 'type' => DbType::MySQL ], // Subtransaction barrier configuration in Redis mode 'redis' => [ // Timeout for subtransaction barrier records 'expire_seconds' => 7 * 86400, ], // Classes that apply sub-transaction barriers in non-Hyperf frameworks or without annotation usage 'apply' => [], ], // Options of Guzzle client under HTTP protocol 'guzzle' => [ 'options' => [], ], ];
配置中间件
在使用之前,您需要将 DtmClient\Middleware\DtmMiddleware
中间件配置为服务器的全局中间件。此中间件支持 PSR-15 规范,适用于所有支持此规范的所有框架。有关 Hyperf 中的中间件配置,请参阅 Hyperf 文档 - 中间件 章节。
使用方法
dtm-client 的使用非常简单,我们提供了一个示例项目 dtm-php/dtm-sample,帮助您更好地理解和调试。在使用此组件之前,强烈建议您阅读 DTM 官方文档,以获得更详细的理解。
TCC 模式
TCC 模式是一个非常流行的灵活分布式事务解决方案。TCC 的概念由三个词的首字母缩写组成:Try(尝试)- Confirm(确认)- Cancel(取消)。它首次在 2007 年 Pat Helland 发表的论文《Life beyond Distributed Transactions:an Apostate’s Opinion》中提出,论文链接为 Life beyond Distributed Transactions:an Apostate’s Opinion。
TCC 的三个阶段
尝试阶段:尝试执行,完成所有业务检查(一致性),预留必要的业务资源(预隔离)确认阶段:如果所有分支的尝试都成功,进入确认阶段。确认实际上执行业务而不进行任何业务检查,仅使用在尝试阶段预留的业务资源。取消阶段:如果所有分支中的任何一个尝试失败,进入取消阶段。释放在尝试阶段预留的业务资源。
如果我们想执行类似于银行之间转账的业务,转出(TransOut)和转入(TransIn)在不同的微服务中,一个成功完成的 TCC 事务的典型顺序图如下
示例
以下展示了如何在 Hyperf 框架中使用它,其他框架类似
<?php namespace App\Controller; use DtmClient\TCC; use DtmClient\TransContext; use Hyperf\Di\Annotation\Inject; use Hyperf\HttpServer\Annotation\Controller; use Hyperf\HttpServer\Annotation\GetMapping; use Throwable; #[Controller(prefix: '/tcc')] class TccController { protected string $serviceUri = 'http://127.0.0.1:9501'; #[Inject] protected TCC $tcc; #[GetMapping(path: 'successCase')] public function successCase() { try { $this->tcc->globalTransaction(function (TCC $tcc) { // Create call data for subtransaction A $tcc->callBranch( // Arguments for calling the Try method ['amount' => 30], // URL of Try stage $this->serviceUri . '/tcc/transA/try', // URL of Confirm stage $this->serviceUri . '/tcc/transA/confirm', // URL of Cancel stage $this->serviceUri . '/tcc/transA/cancel' ); // Create call data for subtransaction B, and so on $tcc->callBranch( ['amount' => 30], $this->serviceUri . '/tcc/transB/try', $this->serviceUri . '/tcc/transB/confirm', $this->serviceUri . '/tcc/transB/cancel' ); }); } catch (Throwable $e) { var_dump($e->getMessage(), $e->getTraceAsString()); } // Get the global transaction ID through TransContext::getGid() and return it to the client return TransContext::getGid(); } }
Saga 模式
Saga 模式是分布式事务领域最著名的解决方案之一,它在主要系统中也非常受欢迎。它首次出现在 Hector Garcaa-Molrna 和 Kenneth Salem 在 1987 年发表的论文 SAGAS 中。
Saga 是一个最终一致性事务,也是一个灵活的事务,也称为长事务。Saga 由一系列本地事务组成。每个本地事务更新数据库后,都会发布一条消息或事件以触发 Saga 全局事务中下一个本地事务的执行。如果本地事务由于某些业务规则无法满足而失败,Saga 将对在失败事务之前成功提交的所有事务执行补偿操作。因此,当将 Saga 模式与 TCC 模式进行比较时,由于缺少资源预留步骤,实现回滚逻辑通常会更加麻烦。
Saga 的子事务分割
例如,我们想要执行类似于银行之间转账的业务,将 30 美元从账户 A 转入账户 B。根据 Saga 事务的原则,我们将整个全局事务分割成以下服务
- 转出(TransOut)服务,账户 A 将扣除 30 美元
- 转出补偿(TransOutCompensate)服务,回滚上述转出操作,即增加账户 A 30 美元
- 转入(TransIn)服务,账户 B 将增加 30 美元
- 转入补偿(TransInCompensate)服务,回滚上述转入操作,即账户 B 减少 30 美元
整个事务的逻辑是
执行转出成功 => 执行转入成功 => 全局事务完成
如果在中间发生错误,例如 B 账户转账错误,将调用已执行的分支的补偿操作,即
执行转出成功 => 执行转入失败 => 执行转入补偿成功 => 执行转出补偿成功 => 全局事务回滚完成
以下是成功完成的SAGA事务的典型序列图
示例
以下展示了如何在 Hyperf 框架中使用它,其他框架类似
namespace App\Controller; use DtmClient\Saga; use DtmClient\TransContext; use Hyperf\Di\Annotation\Inject; use Hyperf\HttpServer\Annotation\Controller; use Hyperf\HttpServer\Annotation\GetMapping; #[Controller(prefix: '/saga')] class SagaController { protected string $serviceUri = 'http://127.0.0.1:9501'; #[Inject] protected Saga $saga; #[GetMapping(path: 'successCase')] public function successCase(): string { $payload = ['amount' => 50]; // Init Saga global transaction $this->saga->init(); // Add TransOut sub-transaction $this->saga->add( $this->serviceUri . '/saga/transOut', $this->serviceUri . '/saga/transOutCompensate', $payload ); // Add TransIn sub-transaction $this->saga->add( $this->serviceUri . '/saga/transIn', $this->serviceUri . '/saga/transInCompensate', $payload ); // Submit Saga global transaction $this->saga->submit(); // Get the global transaction ID through TransContext::getGid() and return it to the client return TransContext::getGid(); } }
XA模式
XA是由X/Open组织提出的分布式事务规范。X/Open分布式事务处理(DTP)模型设想了三个软件组件
应用程序(AP)定义事务边界并指定构成事务的操作。
资源管理器(RM,如数据库或文件访问系统)提供对共享资源的访问。
一个称为事务管理器(TM)的独立组件为事务分配标识符,监控其进度,并负责事务完成和故障恢复。
以下图展示了X/Open DTP模型定义的接口。
XA分为两个阶段。
第一阶段(准备):所有参与RM准备执行其事务并锁定所需的资源。当每个参与者准备就绪时,它向TM报告。
第二阶段(提交/回滚):当事务管理器(TM)收到所有参与者(RM)都已准备就绪的消息时,它向所有参与者发送提交命令。否则,它向所有参与者发送回滚命令。
目前,几乎所有流行的数据库都支持XA事务,包括MySQL、Oracle、SQLServer和Postgres。
示例代码
以下是在Hyperf框架中展示的,与其他框架类似
<?php namespace App\Controller; use App\Grpc\GrpcClient; use DtmClient\DbTransaction\DBTransactionInterface; use DtmClient\TransContext; use DtmClient\XA; use Hyperf\Contract\ConfigInterface; use Hyperf\Di\Annotation\Inject; use Hyperf\HttpServer\Annotation\Controller; use Hyperf\HttpServer\Annotation\GetMapping; use Hyperf\HttpServer\Annotation\RequestMapping; use Hyperf\HttpServer\Contract\RequestInterface; use Psr\Http\Message\ResponseInterface; #[Controller(prefix: '/xa')] class XAController { private GrpcClient $grpcClient; protected string $serviceUri = 'http://127.0.0.1:9502'; public function __construct( private XA $xa, protected ConfigInterface $config, ) { $server = $this->config->get('dtm.server', '127.0.0.1'); $port = $this->config->get('dtm.port.grpc', 36790); $hostname = $server . ':' . $port; $this->grpcClient = new GrpcClient($hostname); } #[GetMapping(path: 'successCase')] public function successCase(): string { $payload = ['amount' => 50]; // Open the Xa, the global thing $gid = $this->xa->generateGid(); $this->xa->globalTransaction($gid, function () use ($payload) { // Call the subthings interface $respone = $this->xa->callBranch($this->serviceUri . '/xa/api/transIn', $payload); // Get subthings return structure in XA http mode /* @var ResponseInterface $respone */ $respone->getBody()->getContents(); // Call the subthings interface $payload = ['amount' => 10]; $this->xa->callBranch($this->serviceUri . '/xa/api/transOut', $payload); }); // Return the global transaction ID via TransContext:: getGid() return TransContext::getGid(); } #[RequestMapping(methods: ["GET", "POST", "PUT"], path: 'api/transIn')] public function transIn(RequestInterface $request): array { $content = $request->post('amount'); $amount = $content['amount'] ?? 50; // The transIn method under the simulated distributed system $this->xa->localTransaction(function (DBTransactionInterface $dbTransaction) use ($amount) { // Please use the DBTransactionInterface to handle the local Mysql things $dbTransaction->xaExecute('UPDATE `order` set `amount` = `amount` + ? where id = 1', [$amount]); }); return ['status' => 0, 'message' => 'ok']; } /** * @param RequestInterface $request * @return array */ #[RequestMapping(methods: ["GET", "POST", "PUT"], path: 'api/transOut')] public function transOut(RequestInterface $request): array { $content = $request->post('amount'); $amount = $content['amount'] ?? 10; // The transOut method under the simulated distributed system $this->xa->localTransaction(function (DBTransactionInterface $dbTransaction) use ($amount) { // Please use the DBTransactionInterface to handle the local Mysql things $dbTransaction->xaExecute('UPDATE `order` set `amount` = `amount` - ? where id = 2', [$amount]); }); return ['status' => 0, 'message' => 'ok']; } }
上述代码首先注册一个全局XA事务,然后调用两个子事务TransOut和TransIn。所有子事务执行成功后,全局XA事务提交给DTM。DTM收到XA全局事务的提交后,调用所有子事务的XA提交,并最终将全局事务状态改为成功。