profi-tech / tsqm-php
简单可靠的任务执行器
Requires
- php: >=7.4
- ext-json: *
- ext-pcntl: *
- ext-pdo: *
Requires (Dev)
- monolog/monolog: ^2.9
- php-di/php-di: ^6.4
- phpstan/phpstan: ^1.11
- phpunit/phpunit: ^9.6
- squizlabs/php_codesniffer: ^3.10
- symfony/console: ^5.4
- vlucas/phpdotenv: ^5.6
This package is auto-updated.
Last update: 2024-09-11 05:47:33 UTC
README
什么是TSQM?
TSQM是一个低级PHP库,用于涉及外部调用、服务请求、数据库查询等的代码的事务性和可靠执行。如果发生错误,代码可以被重试,如果重试不可行,则可以进行“补偿”。
“低级”有多低?
对该库的主要要求之一是它能够从7.4版本开始集成到任何PHP代码库中。TSQM提供了一些基本类和方法,可以将它们嵌入到几乎任何项目或框架中。主要类包括
- Task:PHP代码的类包装器,允许指定重试策略、参数和其他执行选项。
- TSQM引擎:调度、执行、重试和错误处理任务。
⚠️ 注意! TSQM不是现成的;它需要一个集成和配置的过程。
基本用法
1. 安装
composer require profi-tech/tsqm-php
2. 数据库初始化
TSQM需要MySQL或SQLite数据库中的一个表。您可以使用以下SQL获取创建此表的SQL语句
vendor/bin/tsqm-db
或者通过指定供应商和表名
vendor/bin/tsqm-db mysql my_tsqm_table vendor/bin/tsqm-db sqlite my_tsqm_table
在您希望TSQM工作的数据库上运行生成的SQL。
3. 配置TSQM引擎
$dsn = "<PDO dsn of database where you created a table>"; $username = "<your username>"; $password = "<your password>"; $pdo = new PDO($dsn, $username, $password); $tsqm = new Tsqm\Tsqm($pdo); ...
您可以通过向构造函数传递一个Tsqm\Options
实例来调整TSQM引擎。
$tsqm = new Tsqm\Tsqm( $pdo, (new Tsqm\Options()) ->setTable("my_tsqm_table") // Name of the table where tasks are stored ->setLogger(new MyLogger()) // PSR-3 compatible logger ->setContainer(new MyContainer()) // DI container ->setQueue(new MyQueue()) // Queue implementation ->setForceSyncRuns(true) // Force synchronous runs for debugging and unit testing ->setMaxNestingLevel(10) // Maximum number of nested transactions ->setMaxGeneratorTasks(10) // Maximum number of tasks in a generator );
4. 创建任务
要创建任务,您需要创建一个新的Task
对象并设置必要的字段
$task = (new Tsqm\Task()) ->setCallable("greet") ->setArgs("John Doe");
setCallable
的参数可以是
- 具有
__invoke
方法的类的可调用对象(推荐)。 - 静态方法的名称及其类名,例如
MyClass::myMethod
- 全局函数的名称,例如
MyGlobalFunction
(强烈不推荐)。
⚠️ 如果您使用可调用对象,则需要为TSQM引擎设置一个实现Tsqm\Container\ContainerInterface
的DI容器。可调用对象必须在容器中通过其类名可访问。
任务支持以下选项
setScheduledFor
— DateTime对象,表示计划执行的时间。setWaitInterval
— 在开始任务之前等待的时间间隔。setIsSecret
— 如果为true,则任务参数和结果将被记录为机密。setTrace
— 跟踪对象以通过日志跟踪任务执行。
您还可以通过setRetryPolicy
和RetryPolicy
对象指定重试策略
setMaxRetries
— 最大重试次数。setMinInterval
— 重试之间的最小间隔(以毫秒为单位或可以由DateTime::modify()解析的字符串)。setBackoffFactor
— 乘以重试之间间隔的系数。setUseJitter
— 如果为true,则在重试之间的间隔中添加一个随机值。
示例
class Greeter { public function __invoke(string $name): string { return "Hello, $name!"; } } class MyContainer implements Tsqm\Container\ContainerInterface { ... } $tsqm = new Tsqm\Tsqm( $pdo, (new Tsqm\Options()) ->setContainer(new MyContainer()) ); $task = (new Tsqm\Task()) ->setCallable(new Greeter()) ->setArgs("John Doe") ->setRetryPolicy( (new Tsqm\RetryPolicy()) ->setMaxRetries(3) ->setMinInterval(5000) ); ...
5. 运行任务
要执行任务,需要调用run
方法
$task = $tsqm->run($task);
执行结果将在$task
对象中可用
echo "Task id: ".$task->getId(); if ($task->isFinished()) { if (!$task->hasError()) { $result = $task->getResult(); } else { $error = $task->getError(); } }
6. 重试和计划任务
如果以下任一条件成立,则任务不会完成
- 发生错误并且任务设置了重试策略。
- 任务通过
setScheduleTime
选项设置了未来的执行时间。
需要重试的任务可以通过poll
方法运行
$tsqm->poll( 100, // Number of tasks to poll 30, // Time in seconds to "step back" from the current time (usefull for the fallback mode) 10 // Idle time in seconds if no tasks found );
尽管,poll
方法可以执行计划运行,但在生产中,它应仅作为基于队列的主要方法的后备使用
7. 队列
要将队列集成到TSQM中,您需要实现 Tsqm\Queue\QueueInterface
并在TSQM引擎初始化期间添加实现类
class MyQueue implements Tsqm\Queue\QueueInterface { public function enqueue(string $taskName, string $taskId, DateTime $scheduledFor): void { ... put $taskId to your favorite message broker like RabbitMQ, Apache Kafka etc. } /** * @param callable(string $taskId): ?Task $callback */ public function listen(string $taskName, callable $callback): void { ... listen your favorite message broker like RabbitMQ, Apache Kafka etc ... recieve $taskId and call $callback with it } } $tsqm = new Tsqm\Tsqm( $pdo, (new Tsqm\Options()) ->setQueue(new MyQueue()) );
如果任务需要稍后执行,TSQM引擎将自动调用您类的 enqueue
方法
要接收和处理任务,请在单独的脚本中调用 listen
方法
$tsqm->listen($taskName);
8. 事务
除了简单任务外,TSQM还支持事务;您可以实现一个返回任务生成器的可调用的任务。所有标准逻辑都将适用,例如错误处理、重试等。以下是一个实现事务的示例
class Greet { ... public function __invoke(string $name): Generator { $valid = yield (new Task()) ->setCallable($this->validateName) ->setArgs($name); if (!$valid) { return false; } $greeting = yield (new Task()) ->setCallable($this->createGreeting) ->setArgs($name); try { $greeting = yield (new Task()) ->setCallable($this->purchase) ->setArgs($greeting) ->setIsSecret(true) ->setRetryPolicy( (new RetryPolicy()) ->setMaxRetries(3) ->setMinInterval(5000) ); } catch (Exception $e) { yield (new Task()) ->setCallable($this->revertGreeting) ->setArgs($greeting); return false; } return $greeting; } } ... $task = (new Task()) ->setCallable(new Greet(...)) ->setArgs("John Doe"); $task = $tsqm->run($task);
如果 purchase
任务失败,事务执行将停止,并根据“每5秒尝试3次”的策略重试。如果所有尝试都失败,将执行 rollback
任务。
⚠️ 注意! TSQM缓存已完成任务的输出,并在执行期间检查事务的确定性,这意味着可以安全地多次重试调用。
9. 日志记录
TSQM记录任务和事务执行的每个步骤。要访问这些日志,您需要连接一个实现 Tsqm\Logger\LoggerInterface
接口的类
class MyLogger implements Tsqm\Logger\LoggerInterface { ... public function log($level, string $message, array $context = []): void { // Your log implementation here } } $tsqm = new Tsqm\Tsqm( $pdo, (new Tsqm\Options()) ->setLogger(new MyLogger()) );
限制和警告
-
TSQM通常不是一个工作流引擎,而是一个用于可靠执行带有外部调用的PHP代码的库。但是,如果所有任务代码都存储和执行在同一个代码库中,您可以尝试将该库用作工作流引擎。
-
任务数据在执行后从数据库中删除,因为持久化存储仅用于确保事务一致性。
-
对于错误,仅存储类、消息和代码。
-
TSQM轻量级且快速,但尚未在重负载下进行测试。
-
TSQM使用其自己的接口(如容器、队列、日志记录器等)来避免对外部库的依赖,这些库大多已迁移到PHP 8。TSQM需要支持从7.4开始的所有PHP版本。