spoonwep/dtm-client-php

A PHP 协程客户端,用于分布式事务管理器 DTM。分布式事务管理器 DTM 的 PHP 协程客户端

dev-master / 1.0.x-dev 2022-12-15 14:05 UTC

This package is auto-updated.

Last update: 2024-09-15 18:19:29 UTC


README

英文 | 中文

DTM Logo

Stable Version Php Version dtm-client License

PHPUnit for dtm-client Total Downloads Monthly Downloads

这是 https://github.com/dtm-php/dtm-client 的一个分支版本,适用于低于 8.0 版本的 PHP

简介

dtm/dtm-client 是分布式事务管理器 DTM 的 PHP 客户端。它支持分布式事务模式,包括 TCC 模式、Saga 模式、XA 模式和两阶段消息模式。在通信协议方面,它支持通过 HTTP 协议或 gRPC 协议与 DTM 服务器通信。此外,客户端可以安全地在 PHP-FPM 和 Swoole 协程环境中运行,并且它还为 Hyperf 框架提供了更简单的支持。

关于 DTM

DTM 是一个基于 Go 语言的开源分布式事务管理器,它提供了跨语言和存储引擎结合事务的强大功能。DTM 精妙地解决了分布式事务问题,如接口幂等性、空补偿和事务挂起,并提供了一种易于使用、高性能且易于水平扩展的分布式事务解决方案。

优势

  • 易于启动
    • 零配置启动服务,提供一个非常简单且清晰的 HTTP 接口,极大地降低了分布式事务入门的难度
  • 跨编程语言
    • 适用于具有多个语言堆栈的公司。它可以在 Go、Python、PHP、NodeJs、Ruby、C# 等各种语言中使用。
  • 易于使用
    • 开发者不再需要担心事务挂起、空补偿、接口幂等性问题,第一子事务屏障技术为您处理这些问题
  • 易于部署和扩展
    • 仅依赖于 MySQL/Redis,易于部署,易于集群,且易于水平扩展
  • 支持多种分布式事务协议
    • TCC、SAGA、XA、两阶段消息,一站式解决各种分布式事务问题

比较

在非 Java 语言中,除了 DTM 之外,还没有成熟的分布式事务管理器,因此这里将 DTM 与 Java 中最成熟的开源项目 Seata 进行比较

从上述比较的特征来看,DTM 在许多方面都有很大的优势。如果您考虑多语言支持和多存储引擎支持,那么 DTM 无疑是您的首选。

安装

通过 Composer 安装 dtm-client 非常方便

composer require dtm/dtm-client
  • 在使用之前,不要忘记启动 DTM 服务器

配置

配置文件

如果您使用的是 Hyperf 框架,在安装组件后,可以使用以下 vendor:publish 命令将配置文件发布到 ./config/autoload/dtm.php

php bin/hyperf.php vendor:publish dtm/dtm-client

如果您使用的是非 Hyperf 框架,请将 ./vendor/dtm/dtm-client/publish/dtm.php 文件复制到相应的配置目录。

use DtmClient\Constants\Protocol;
use DtmClient\Constants\DbType;

return [
    // The communication protocol between the client and the DTM Server, supports Protocol::HTTP and Protocol::GRPC
    'protocol' => Protocol::HTTP,
    // DTM Server address
    'server' => '127.0.0.1',
    // DTM Server port
    'port' => [
        'http' => 36789,
        'grpc' => 36790,
    ],
    // Sub-transaction barrier
    'barrier' => [
        // Subtransaction barrier configuration in DB mode 
        'db' => [
            'type' => DbType::MySQL
        ],
        // Subtransaction barrier configuration in Redis mode
        'redis' => [
            // Timeout for subtransaction barrier records
            'expire_seconds' => 7 * 86400,
        ],
        // Classes that apply sub-transaction barriers in non-Hyperf frameworks or without annotation usage
        'apply' => [],
    ],
    // Options of Guzzle client under HTTP protocol
    'guzzle' => [
        'options' => [],
    ],
];

配置中间件

在使用它之前,您需要将 DtmClient\Middleware\DtmMiddleware 中间件配置为服务器的全局中间件。该中间件支持PSR-15规范,适用于所有支持此规范的框架。关于Hyperf中的中间件配置,请参阅Hyperf文档 - 中间件章节。

用法

dtm-client的用法非常简单,我们提供了一个示例项目dtm-php/dtm-sample,以帮助您更好地理解和调试。在使用此组件之前,强烈建议您阅读DTM官方文档,以获得更详细的理解。

TCC模式

TCC模式是一个非常流行的灵活分布式事务解决方案。TCC的概念由三个词的缩写组成:Try-Confirm-Cancel。它首次发表在2007年Pat Helland发表的一篇名为超越分布式事务:一个异端者的观点的论文中。

TCC的三个阶段

Try阶段:尝试执行,完成所有业务检查(一致性),预留必要业务资源(预隔离)。Confirm阶段:如果Try的所有分支都成功,则进入Confirm阶段。Confirm实际上执行业务,无需任何业务检查,仅使用Try阶段预留的业务资源。Cancel阶段:如果Try的任何一个分支失败,则进入Cancel阶段。释放Try阶段预留的业务资源。

如果我们想进行类似于银行间转账的业务,转出(TransOut)和转入(TransIn)在不同的微服务中,一个成功完成TCC事务的典型序列图如下

示例

以下展示了在Hyperf框架中如何使用它,其他框架类似

<?php
namespace App\Controller;

use DtmClient\TCC;
use DtmClient\TransContext;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\GetMapping;
use Throwable;

#[Controller(prefix: '/tcc')]
class TccController
{

    protected string $serviceUri = 'http://127.0.0.1:9501';

    #[Inject]
    protected TCC $tcc;

    #[GetMapping(path: 'successCase')]
    public function successCase()
    {
        try {
            
            $this->tcc->globalTransaction(function (TCC $tcc) {
                // Create call data for subtransaction A
                $tcc->callBranch(
                    // Arguments for calling the Try method
                    ['amount' => 30],
                    // URL of Try stage
                    $this->serviceUri . '/tcc/transA/try',
                    // URL of Confirm stage
                    $this->serviceUri . '/tcc/transA/confirm',
                    // URL of Cancel stage
                    $this->serviceUri . '/tcc/transA/cancel'
                );
                // Create call data for subtransaction B, and so on
                $tcc->callBranch(
                    ['amount' => 30],
                    $this->serviceUri . '/tcc/transB/try',
                    $this->serviceUri . '/tcc/transB/confirm',
                    $this->serviceUri . '/tcc/transB/cancel'
                );
            });
        } catch (Throwable $e) {
            var_dump($e->getMessage(), $e->getTraceAsString());
        }
        // Get the global transaction ID through TransContext::getGid() and return it to the client
        return TransContext::getGid();
    }
}

Saga模式

Saga模式是分布式事务领域最著名的解决方案之一,它也在主要系统中非常受欢迎。它首次出现在Hector Garcaa-Molrna & Kenneth Salem在1987年发表的SAGAS论文中。

Saga是一个最终一致性事务,也是一个灵活事务,也称为长运行事务。Saga由一系列本地事务组成。在每次本地事务更新数据库后,它将发布一个消息或事件来触发下一个本地事务在全局事务中的执行。如果本地事务由于某些业务规则无法满足而失败,则Saga对所有在失败事务之前成功提交的事务执行补偿操作。因此,当将Saga模式与TCC模式进行比较时,由于缺乏资源预留步骤,通常实施回滚逻辑会更加麻烦。

Saga子事务拆分

例如,我们想要进行类似于银行间转账的业务,并将账户A中的30美元转入账户B。根据Saga事务的原则,我们将整个全局事务拆分为以下服务

  • 转出(TransOut)服务,账户A将扣除30美元
  • 转出补偿(TransOutCompensate)服务,回滚上述转出操作,即增加账户A的30美元
  • 转入(TransIn)服务,账户B将增加30美元
  • 转入补偿(TransInCompensate)服务,回滚上述转入操作,即账户B减少30美元

整个事务的逻辑如下

执行转出成功 => 执行转入成功 => 全局事务完成

如果在中间发生错误,例如从A账户转移出错,则会调用已执行分支的补偿操作,即

执行转账出成功 => 执行转账入失败 => 执行转账入补偿成功 => 执行转账出补偿成功 => 全局事务回滚完成

以下是一个成功完成的SAGA事务的典型序列图

示例

以下展示了在Hyperf框架中如何使用它,其他框架类似

namespace App\Controller;

use DtmClient\Saga;
use DtmClient\TransContext;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\GetMapping;

#[Controller(prefix: '/saga')]
class SagaController
{

    protected string $serviceUri = 'http://127.0.0.1:9501';
    
    #[Inject]
    protected Saga $saga;

    #[GetMapping(path: 'successCase')]
    public function successCase(): string
    {
        $payload = ['amount' => 50];
        // Init Saga global transaction
        $this->saga->init();
        // Add TransOut sub-transaction
        $this->saga->add(
            $this->serviceUri . '/saga/transOut', 
            $this->serviceUri . '/saga/transOutCompensate', 
            $payload
        );
        // Add TransIn sub-transaction
        $this->saga->add(
            $this->serviceUri . '/saga/transIn', 
            $this->serviceUri . '/saga/transInCompensate', 
            $payload
        );
        // Submit Saga global transaction
        $this->saga->submit();
        // Get the global transaction ID through TransContext::getGid() and return it to the client
        return TransContext::getGid();
    }
}

XA模式

XA是X/Open组织提出的分布式事务规范。X/Open分布式事务处理(DTP)模型设想了三个软件组件

应用程序(AP)定义事务边界并指定构成事务的操作。

资源管理器(RM,例如数据库或文件访问系统)提供对共享资源的访问。

一个称为事务管理器(TM)的独立组件为事务分配标识符,监控它们的进度,并负责事务完成和故障恢复。

以下图展示了X/Open DTP模型定义的接口。

XA分为两个阶段。

第一阶段(准备):所有参与RM准备执行其事务并锁定所需资源。当每个参与者准备就绪时,它会向TM报告。

第二阶段(提交/回滚):当事务管理器(TM)收到所有参与者(RM)都准备就绪的消息时,它会向所有参与者发送提交命令。否则,它会向所有参与者发送回滚命令。

目前,几乎所有流行的数据库都支持XA事务,包括Mysql、Oracle、SqlServer和Postgres。

示例代码

以下是在Hyperf框架中展示的,与其他类似

<?php

namespace App\Controller;

use App\Grpc\GrpcClient;
use DtmClient\DbTransaction\DBTransactionInterface;
use DtmClient\TransContext;
use DtmClient\XA;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\GetMapping;
use Hyperf\HttpServer\Annotation\RequestMapping;
use Hyperf\HttpServer\Contract\RequestInterface;
use Psr\Http\Message\ResponseInterface;

#[Controller(prefix: '/xa')]
class XAController
{

    private GrpcClient $grpcClient;

    protected string $serviceUri = 'http://127.0.0.1:9502';

    public function __construct(
        private XA $xa,
        protected ConfigInterface $config,
    ) {
        $server = $this->config->get('dtm.server', '127.0.0.1');
        $port = $this->config->get('dtm.port.grpc', 36790);
        $hostname = $server . ':' . $port;
        $this->grpcClient = new GrpcClient($hostname);
    }


    #[GetMapping(path: 'successCase')]
    public function successCase(): string
    {
        $payload = ['amount' => 50];
        // Open the Xa, the global thing
        $gid = $this->xa->generateGid();
        $this->xa->globalTransaction($gid, function () use ($payload) {
            // Call the subthings interface
            $respone = $this->xa->callBranch($this->serviceUri . '/xa/api/transIn', $payload);
            // Get subthings return structure in XA http mode
            /* @var ResponseInterface $respone */
            $respone->getBody()->getContents();
            // Call the subthings interface
            $payload = ['amount' => 10];
            $this->xa->callBranch($this->serviceUri . '/xa/api/transOut', $payload);
        });
        // Return the global transaction ID via TransContext:: getGid()
        return TransContext::getGid();
    }

    #[RequestMapping(methods: ["GET", "POST", "PUT"], path: 'api/transIn')]
    public function transIn(RequestInterface $request): array
    {
        $content = $request->post('amount');
        $amount = $content['amount'] ?? 50;
        // The transIn method under the simulated distributed system
        $this->xa->localTransaction(function (DBTransactionInterface $dbTransaction) use ($amount) {
            // Please use the DBTransactionInterface to handle the local Mysql things
            $dbTransaction->xaExecute('UPDATE `order` set `amount` = `amount` + ? where id = 1', [$amount]);
        });

        return ['status' => 0, 'message' => 'ok'];
    }

    /**
     * @param RequestInterface $request
     * @return array
     */
    #[RequestMapping(methods: ["GET", "POST", "PUT"], path: 'api/transOut')]
    public function transOut(RequestInterface $request): array
    {
        $content = $request->post('amount');
        $amount = $content['amount'] ?? 10;
        // The transOut method under the simulated distributed system
        $this->xa->localTransaction(function (DBTransactionInterface $dbTransaction) use ($amount) {
            // Please use the DBTransactionInterface to handle the local Mysql things
            $dbTransaction->xaExecute('UPDATE `order` set `amount` = `amount` - ? where id = 2', [$amount]);
        });

        return ['status' => 0, 'message' => 'ok'];
    }
}

上述代码首先注册一个全局XA事务,然后调用两个子事务TransOut、TransIn。所有子事务执行成功后,全局XA事务提交给DTM。DTM收到XA全局事务的提交,然后调用所有子事务的XA提交,最后将全局事务的状态更改为成功。