profi-tech/tsqm-php

简单可靠的任务执行器

v0.7.0 2024-09-11 05:46 UTC

README

CI

什么是TSQM?

TSQM是一个低级PHP库,用于涉及外部调用、服务请求、数据库查询等的代码的事务性和可靠执行。如果发生错误,代码可以被重试,如果重试不可行,则可以进行“补偿”。

“低级”有多低?

对该库的主要要求之一是它能够从7.4版本开始集成到任何PHP代码库中。TSQM提供了一些基本类和方法,可以将它们嵌入到几乎任何项目或框架中。主要类包括

  • Task:PHP代码的类包装器,允许指定重试策略、参数和其他执行选项。
  • TSQM引擎:调度、执行、重试和错误处理任务。

⚠️ 注意! TSQM不是现成的;它需要一个集成和配置的过程。

基本用法

1. 安装

composer require profi-tech/tsqm-php

2. 数据库初始化

TSQM需要MySQL或SQLite数据库中的一个表。您可以使用以下SQL获取创建此表的SQL语句

vendor/bin/tsqm-db

或者通过指定供应商和表名

vendor/bin/tsqm-db mysql my_tsqm_table
vendor/bin/tsqm-db sqlite my_tsqm_table

在您希望TSQM工作的数据库上运行生成的SQL。

3. 配置TSQM引擎

$dsn = "<PDO dsn of database where you created a table>";
$username = "<your username>";
$password = "<your password>";
$pdo = new PDO($dsn, $username, $password);

$tsqm = new Tsqm\Tsqm($pdo);
...

您可以通过向构造函数传递一个Tsqm\Options实例来调整TSQM引擎。

$tsqm = new Tsqm\Tsqm(
  $pdo,
  (new Tsqm\Options())
    ->setTable("my_tsqm_table") // Name of the table where tasks are stored
    ->setLogger(new MyLogger()) // PSR-3 compatible logger
    ->setContainer(new MyContainer()) // DI container
    ->setQueue(new MyQueue()) // Queue implementation
    ->setForceSyncRuns(true) // Force synchronous runs for debugging and unit testing
    ->setMaxNestingLevel(10) // Maximum number of nested transactions
    ->setMaxGeneratorTasks(10) // Maximum number of tasks in a generator
);

4. 创建任务

要创建任务,您需要创建一个新的Task对象并设置必要的字段

$task = (new Tsqm\Task())
  ->setCallable("greet")
  ->setArgs("John Doe");

setCallable的参数可以是

  • 具有__invoke方法的类的可调用对象(推荐)。
  • 静态方法的名称及其类名,例如MyClass::myMethod
  • 全局函数的名称,例如MyGlobalFunction(强烈不推荐)。

⚠️ 如果您使用可调用对象,则需要为TSQM引擎设置一个实现Tsqm\Container\ContainerInterface的DI容器。可调用对象必须在容器中通过其类名可访问。

任务支持以下选项

  • setScheduledFor — DateTime对象,表示计划执行的时间。
  • setWaitInterval — 在开始任务之前等待的时间间隔。
  • setIsSecret — 如果为true,则任务参数和结果将被记录为机密。
  • setTrace — 跟踪对象以通过日志跟踪任务执行。

您还可以通过setRetryPolicyRetryPolicy对象指定重试策略

  • setMaxRetries — 最大重试次数。
  • setMinInterval — 重试之间的最小间隔(以毫秒为单位或可以由DateTime::modify()解析的字符串)。
  • setBackoffFactor — 乘以重试之间间隔的系数。
  • setUseJitter — 如果为true,则在重试之间的间隔中添加一个随机值。

示例

class Greeter {
  public function __invoke(string $name): string {
    return "Hello, $name!";
  }
}

class MyContainer implements Tsqm\Container\ContainerInterface {
  ...
}

$tsqm = new Tsqm\Tsqm(
  $pdo,
  (new Tsqm\Options())
    ->setContainer(new MyContainer())
);

$task = (new Tsqm\Task())
  ->setCallable(new Greeter())
  ->setArgs("John Doe")
  ->setRetryPolicy(
    (new Tsqm\RetryPolicy())
      ->setMaxRetries(3)
      ->setMinInterval(5000)
  );

...

5. 运行任务

要执行任务,需要调用run方法

$task = $tsqm->run($task);

执行结果将在$task对象中可用

echo "Task id: ".$task->getId();
if ($task->isFinished()) {
    if (!$task->hasError()) {
      $result = $task->getResult();
    } else {
      $error = $task->getError();
    }	
}

6. 重试和计划任务

如果以下任一条件成立,则任务不会完成

  • 发生错误并且任务设置了重试策略。
  • 任务通过setScheduleTime选项设置了未来的执行时间。

需要重试的任务可以通过poll方法运行

$tsqm->poll(
  100, // Number of tasks to poll
  30, // Time in seconds to "step back" from the current time (usefull for the fallback mode)
  10 // Idle time in seconds if no tasks found
);

尽管,poll 方法可以执行计划运行,但在生产中,它应仅作为基于队列的主要方法的后备使用

7. 队列

要将队列集成到TSQM中,您需要实现 Tsqm\Queue\QueueInterface 并在TSQM引擎初始化期间添加实现类

class MyQueue implements Tsqm\Queue\QueueInterface {

  public function enqueue(string $taskName, string $taskId, DateTime $scheduledFor): void {
    ... put $taskId to your favorite message broker like RabbitMQ, Apache Kafka etc.
  }

  /**
   * @param callable(string $taskId): ?Task $callback
   */
  public function listen(string $taskName, callable $callback): void {
    ... listen your favorite message broker like RabbitMQ, Apache Kafka etc
    ... recieve $taskId and call $callback with it
  }
}

$tsqm = new Tsqm\Tsqm(
  $pdo,
  (new Tsqm\Options())
    ->setQueue(new MyQueue())
);

如果任务需要稍后执行,TSQM引擎将自动调用您类的 enqueue 方法

要接收和处理任务,请在单独的脚本中调用 listen 方法

$tsqm->listen($taskName);

8. 事务

除了简单任务外,TSQM还支持事务;您可以实现一个返回任务生成器的可调用的任务。所有标准逻辑都将适用,例如错误处理、重试等。以下是一个实现事务的示例

class Greet
{
  ...
  public function __invoke(string $name): Generator
  {
    $valid = yield (new Task())
      ->setCallable($this->validateName)
      ->setArgs($name);
    if (!$valid) {
      return false;
    }

    $greeting = yield (new Task())
      ->setCallable($this->createGreeting)
      ->setArgs($name);

    try {
      $greeting = yield (new Task())
        ->setCallable($this->purchase)
        ->setArgs($greeting)
        ->setIsSecret(true)
        ->setRetryPolicy(
          (new RetryPolicy())
            ->setMaxRetries(3)
            ->setMinInterval(5000)
      );
    } catch (Exception $e) {
      yield (new Task())
        ->setCallable($this->revertGreeting)
        ->setArgs($greeting);
      return false;
    }

    return $greeting;
  }
}
...
$task = (new Task())
  ->setCallable(new Greet(...))
  ->setArgs("John Doe");

$task = $tsqm->run($task);

如果 purchase 任务失败,事务执行将停止,并根据“每5秒尝试3次”的策略重试。如果所有尝试都失败,将执行 rollback 任务。

⚠️ 注意! TSQM缓存已完成任务的输出,并在执行期间检查事务的确定性,这意味着可以安全地多次重试调用。

9. 日志记录

TSQM记录任务和事务执行的每个步骤。要访问这些日志,您需要连接一个实现 Tsqm\Logger\LoggerInterface 接口的类

class MyLogger implements Tsqm\Logger\LoggerInterface {
   ...
   public function log($level, string $message, array $context = []): void {
     // Your log implementation here
   }
}

$tsqm = new Tsqm\Tsqm(
 $pdo,
 (new Tsqm\Options())
   ->setLogger(new MyLogger())
);

限制和警告

  • TSQM通常不是一个工作流引擎,而是一个用于可靠执行带有外部调用的PHP代码的库。但是,如果所有任务代码都存储和执行在同一个代码库中,您可以尝试将该库用作工作流引擎。

  • 任务数据在执行后从数据库中删除,因为持久化存储仅用于确保事务一致性。

  • 对于错误,仅存储类、消息和代码。

  • TSQM轻量级且快速,但尚未在重负载下进行测试。

  • TSQM使用其自己的接口(如容器、队列、日志记录器等)来避免对外部库的依赖,这些库大多已迁移到PHP 8。TSQM需要支持从7.4开始的所有PHP版本。