madesimple/task-worker

通用后台任务工作库

v2.1.0 2020-06-24 12:12 UTC

This package is auto-updated.

Last update: 2024-08-27 08:19:09 UTC


README

Build Status

task-worker 包是一个通用的任务工作库,主要用于后台任务。

工作器耐心等待从其队列中预留任务。当它们收到一个任务时,准备并执行该任务。任务对象本身存储执行任务的逻辑。如果任务成功执行,即没有抛出异常,则该任务将从队列中删除。如果任务抛出异常,则将其放回队列以便再次执行。

安装

通过 composer 安装

{
    "require": {
        "madesimple/task-worker": "~2.0"
    }
}

然后运行 composer install 或运行 composer require madesimple/task-worker

任务

注意 Task 必须可以通过空构造函数进行构造,即:new ExampleTask()

任务是由执行它们所需的数据和业务逻辑的组合。任务必须 扩展 抽象类 \MadeSimple\TaskWorker\Task 并仅需要实现 public function perform()。可以使用 \ArrayAccess(例如 $task['foo'] ='bar')或直接在类内部(例如 $this->data['foo'] = 'bar';)设置任务数据。

当任务被放入队列时,任务将序列化为 JSON 消息

{
    "identifier": "38213-5a4f5275644c2",
    "register": "\\Namespace\\ClassName",
    "queue": "task_queue",
    "attempts": 0,
    "data": {
        "foo": "bar"
    }
}

当调用 identifier() 时,任务的 identifier 将自动生成,并使用 uniqid(getmypid() . '-') 创建;克隆的任务 共享其 identifierattempts。任务的 register 默认为 static::class,并在 Task::deserialize 中使用,以确定应使用哪个任务类;deserialize 首先检查其 register(从 Worker 传递给它)是否存在匹配项,然后尝试自动加载,如果这些都不返回任务,则最终失败。您可以在任务内部将 register 设置为任何字符串,只需确保如果它不可自动加载,则将其值与 Worker 注册即可

<?php
class MyTask extends \MadeSimple\TaskWorker\Task
{
    protected $register = 'my_task';

    // ... rest of the class goes here
}

任务的 queue 简单地是它应该放置的队列,通常设置为 $task->onQueue('queue_name')。任务的 attempts 是一个简单的计数器,每次 Worker 尝试执行它时都会增加。任务的 data 应该是一组 JSON 可序列化的信息,用于执行任务。

工作器

工作器耐心等待来自队列的任务。接收到任务后执行任务,如果没有抛出异常,则从队列中删除任务。

在检查队列中的下一个消息之前,工作器会询问是否应该继续工作。如果它已经超过了它应该存活的时间长度,或者已经广播了重启信号,则工作器将再次检查。

选项

可供工作器使用的选项有

  • OPT_SLEEP:在再次检查队列之前,在没有任务时等待多长时间(以秒为单位)
  • OPT_ATTEMPTS:任务在失败之前允许尝试的次数(零为无限期)
  • OPT_ALIVE:工作器将存活多长时间(以秒为单位)(零为无限期)
  • OPT_REST:任务之间的休息时间(以毫秒为单位)
  • OPT_MAX_TASKS:工作器在停止之前应该执行的任务的最大数量(零为无限期)
  • OPT_UNTIL_EMPTY:设置工作器在队列为空时停止

处理器

处理器是您可以添加到工作进程中的闭包,以帮助准备要执行的任务。在执行每个任务之前,都会调用已添加到工作进程的每个处理器。此功能常用的场景是依赖注入。以下是一个处理器必须具备的签名(可以是匿名函数或可调用类):

function (Task $task) {
    // logic goes here
}

队列

目前支持四种类型的队列,分别是 MySQLRabbitMQRedis同步。可以创建自定义队列,并必须实现 Queue 接口。在实现队列时,惯例是提供一个队列名称列表。如果未提供队列名称,队列实现可能会抛出 MadeSimple\TaskWorker\Exception\QueueNameRequiredException 异常。

MySQL

MySQL 队列使用与 MySQL 兼容的数据库中的一个单独的表。必须存在具有以下模式的表(表名可以在 MysqlQueue 选项中更改):

CREATE TABLE `worker_task` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `queue` char(255) DEFAULT '',
  `payload` longtext NOT NULL,
  `reservedAt` int(10) unsigned DEFAULT NULL,
  `releasedAt` int(10) unsigned NOT NULL,
  `failedAt` int(10) unsigned DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `queue` (`queue`,`releasedAt`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

MysqlQueue 可用的选项包括

  • OPT_TABLE_NAME:从该表中读取任务的表名

RabbitMQ

RabbitMQ 队列使用 "php-amqplib/php-amqplib": "^2.7" 库连接到 RabbitMQ 实例(或集群)。使用 default 交换来路由任务到指定的队列。死信用于延迟任务,队列名称生成如下:delayed_<seconds>_<queue_name>

Redis

Redis 队列使用 "predis/predis": "^1.1" 库连接到 redis 实例。直接使用队列名称,并存在以下虚拟队列:

  • <queue_name>-processing 用于保存工作进程当前任务,以及
  • <queue_name>-delayed 用于保存延迟任务。

同步

同步队列是一个虚拟队列,它在任务被添加时立即执行任何任务。如果设置了延迟,它将尊重延迟通过使调用 add 的线程休眠指定的时间。

命令

symfony/console 定义了一些命令。

示例

同名目录中有一些示例。要使它们正常工作,您需要使用开发需求进行 composer install。将 examples/.env.example 复制到 examples/.env,然后更新环境变量值,然后可以使用以下命令运行示例:

export $(cat examples/.env | xargs) && php examples/

外部文档

外部依赖项文档的链接

仅用于开发的外部依赖项文档的链接