madesimple / task-worker
通用后台任务工作库
Requires
- php: >=7.2
- ext-json: *
- psr/log: ~1.0
- psr/simple-cache: ~1.0.0
Requires (Dev)
- cache/cache: ~1.0
- monolog/monolog: ^1.23
- php-amqplib/php-amqplib: ^2.7
- phpunit/phpunit: ^8
- predis/predis: ^1.1
- symfony/console: ~3.2
Suggests
- cache/cache: Library of all the php-cache adapters
- php-amqplib/php-amqplib: Allow RabbitMQ Queue
- predis/predis: Allow Redis Queue
- symfony/console: Symfony console commands more robust CLI
This package is auto-updated.
Last update: 2024-08-27 08:19:09 UTC
README
task-worker 包是一个通用的任务工作库,主要用于后台任务。
工作器耐心等待从其队列中预留任务。当它们收到一个任务时,准备并执行该任务。任务对象本身存储执行任务的逻辑。如果任务成功执行,即没有抛出异常,则该任务将从队列中删除。如果任务抛出异常,则将其放回队列以便再次执行。
安装
通过 composer 安装
{
"require": {
"madesimple/task-worker": "~2.0"
}
}
然后运行 composer install
或运行 composer require madesimple/task-worker
。
任务
注意 Task
必须可以通过空构造函数进行构造,即:new ExampleTask()
。
任务是由执行它们所需的数据和业务逻辑的组合。任务必须 扩展
抽象类 \MadeSimple\TaskWorker\Task
并仅需要实现 public function perform()
。可以使用 \ArrayAccess
(例如 $task['foo'] ='bar'
)或直接在类内部(例如 $this->data['foo'] = 'bar';
)设置任务数据。
当任务被放入队列时,任务将序列化为 JSON 消息
{ "identifier": "38213-5a4f5275644c2", "register": "\\Namespace\\ClassName", "queue": "task_queue", "attempts": 0, "data": { "foo": "bar" } }
当调用 identifier()
时,任务的 identifier
将自动生成,并使用 uniqid(getmypid() . '-')
创建;克隆的任务 不 共享其 identifier
或 attempts
。任务的 register
默认为 static::class
,并在 Task::deserialize
中使用,以确定应使用哪个任务类;deserialize
首先检查其 register
(从 Worker
传递给它)是否存在匹配项,然后尝试自动加载,如果这些都不返回任务,则最终失败。您可以在任务内部将 register
设置为任何字符串,只需确保如果它不可自动加载,则将其值与 Worker
注册即可
<?php class MyTask extends \MadeSimple\TaskWorker\Task { protected $register = 'my_task'; // ... rest of the class goes here }
任务的 queue
简单地是它应该放置的队列,通常设置为 $task->onQueue('queue_name')
。任务的 attempts
是一个简单的计数器,每次 Worker
尝试执行它时都会增加。任务的 data
应该是一组 JSON 可序列化的信息,用于执行任务。
工作器
工作器耐心等待来自队列的任务。接收到任务后执行任务,如果没有抛出异常,则从队列中删除任务。
在检查队列中的下一个消息之前,工作器会询问是否应该继续工作。如果它已经超过了它应该存活的时间长度,或者已经广播了重启信号,则工作器将再次检查。
选项
可供工作器使用的选项有
OPT_SLEEP
:在再次检查队列之前,在没有任务时等待多长时间(以秒为单位)OPT_ATTEMPTS
:任务在失败之前允许尝试的次数(零为无限期)OPT_ALIVE
:工作器将存活多长时间(以秒为单位)(零为无限期)OPT_REST
:任务之间的休息时间(以毫秒为单位)OPT_MAX_TASKS
:工作器在停止之前应该执行的任务的最大数量(零为无限期)OPT_UNTIL_EMPTY
:设置工作器在队列为空时停止
处理器
处理器是您可以添加到工作进程中的闭包,以帮助准备要执行的任务。在执行每个任务之前,都会调用已添加到工作进程的每个处理器。此功能常用的场景是依赖注入。以下是一个处理器必须具备的签名(可以是匿名函数或可调用类):
function (Task $task) { // logic goes here }
队列
目前支持四种类型的队列,分别是 MySQL
、RabbitMQ
、Redis
和 同步
。可以创建自定义队列,并必须实现 Queue
接口。在实现队列时,惯例是提供一个队列名称列表。如果未提供队列名称,队列实现可能会抛出 MadeSimple\TaskWorker\Exception\QueueNameRequiredException
异常。
MySQL
MySQL 队列使用与 MySQL 兼容的数据库中的一个单独的表。必须存在具有以下模式的表(表名可以在 MysqlQueue
选项中更改):
CREATE TABLE `worker_task` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `queue` char(255) DEFAULT '', `payload` longtext NOT NULL, `reservedAt` int(10) unsigned DEFAULT NULL, `releasedAt` int(10) unsigned NOT NULL, `failedAt` int(10) unsigned DEFAULT NULL, PRIMARY KEY (`id`), KEY `queue` (`queue`,`releasedAt`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
MysqlQueue 可用的选项包括
OPT_TABLE_NAME
:从该表中读取任务的表名
RabbitMQ
RabbitMQ 队列使用 "php-amqplib/php-amqplib": "^2.7"
库连接到 RabbitMQ 实例(或集群)。使用 default
交换来路由任务到指定的队列。死信用于延迟任务,队列名称生成如下:delayed_<seconds>_<queue_name>
。
Redis
Redis 队列使用 "predis/predis": "^1.1"
库连接到 redis 实例。直接使用队列名称,并存在以下虚拟队列:
<queue_name>-processing
用于保存工作进程当前任务,以及<queue_name>-delayed
用于保存延迟任务。
同步
同步队列是一个虚拟队列,它在任务被添加时立即执行任何任务。如果设置了延迟,它将尊重延迟通过使调用 add
的线程休眠指定的时间。
命令
为 symfony/console
定义了一些命令。
示例
同名目录中有一些示例。要使它们正常工作,您需要使用开发需求进行 composer install
。将 examples/.env.example
复制到 examples/.env
,然后更新环境变量值,然后可以使用以下命令运行示例:
export $(cat examples/.env | xargs) && php examples/
外部文档
外部依赖项文档的链接
仅用于开发的外部依赖项文档的链接