activecollab/jobsqueue

简单的任务队列

5.0.1 2024-01-11 18:38 UTC

README

Build Status

存在理由:轻量级,依赖很少。它可以与cron + 数据库驱动的队列一起使用,适用于不允许运行适当的消息或工作管理服务器的人。或者,您可以使用它通过适当的消息或工作管理器执行工作。

安装

要安装它,请使用Composer

{
    "require": {
        "activecollab/jobsqueue": "^1.0.0"
    }
}

基本用法

此库使用三个元素

  1. 分发器用于分发任务
  2. 队列确保任务可以被排队
  3. 任务执行实际工作

作为演示,我们将创建一个简单的任务,该任务增加一个数字

<?php

use ActiveCollab\JobsQueue\Jobs\Job;

class Inc extends Job
{
    /**
     * Increment a number
     */
    public function execute(): mixed
    {
      return $this->getData()['number'] + 1;
    }
}

提示:要失败一个尝试,只需在 execute() 方法内抛出异常。

现在,让我们创建一个管理一个MySQL驱动队列的分发器实例

<?php

use ActiveCollab\JobsQueue\JobsDispatcher;
use ActiveCollab\JobsQueue\Queue\MySqlQueue;
use mysqli;
use RuntimeException;

$database_link = new MySQLi('localhost', 'root', '', 'activecollab_jobs_queue_test');

if ($database_link->connect_error) {
    throw new RuntimeException('Failed to connect to database. MySQL said: ' . $database_link->connect_error);
}

$queue = new MySqlQueue($database_link);

// Not required but gives you flexibility with failure handling
$queue->onJobFailure(function(Job $job, Exception $reason) {
    throw new Exception('Job ' . get_class($job) . ' failed', 0, $reason);
});

$dispatcher = new JobsDispatcher($queue);

让我们将一个任务添加到队列中

$dispatcher->dispatch(new Inc([ 'number' => 123 ]));

执行队列中任务的代码将获得下一个可用的任务

$next_in_line = $dispatcher->getQueue()->nextInLine();
$dispatcher->getQueue()->execute($next_in_line);

要运行任务并等待结果,请使用 execute() 而不是 dispatch()

$result = $dispatcher->execute(new Inc([ 'number' => 123 ]));

当以这种方式调用时,任务将立即执行。execute() 默认情况下会抑制异常,因此如果您想异常冒泡出来,应将 $silent 设置为 false

$result = $dispatcher->execute(new Inc([ ‘number’ => 123 ]), false);

任务属性

在构造新的 Job 实例时,您可以设置一个任务数据数组以及以下任务属性

  1. priority - 介于 0 和 4294967295 之间的值,用于确定任务的重要性(值较高的任务优先级更高)。默认为 0(任务不是优先级),
  2. attempts - 在将任务视为失败并被从队列中删除之前要尝试的次数。值可以是 1 到 256 之间的任何值。默认为 1(尝试一次,如果不好,则失败),
  3. delay - 在第一次执行之前要等待的秒数(如果未设置 first_attempt_delay),以及如果任务失败需要重试时的重试。值可以是 1 到 7776000(90 天)之间的任何值。默认为 0(无延迟),
  4. first_attempt_delay - 在第一次任务执行之前要等待的秒数。
$job = new Inc([
    'number'              => 123,
    'priority'            => Job::HAS_HIGHEST_PRIORITY,
    'attempts'            => 5,
    'delay'               => 5,
    'first_attempt_delay' => 1
]);

在任务中访问属性

一旦在一个任务的 execute() 方法中,您可以使用 getData() 方法访问任务属性

public function execute(): mixed
{
    print_r($this->getData()); // Print all job properties
    print $this->getData('number') . "\n"; // Print only number
}

批处理

可以将任务批量添加到队列中。一旦在批处理中,任务队列将像其他任何任务一样执行它们,但您将能够跟踪批处理的进度

$batch = $dispatcher->batch('Testing batch', function(BatchInterface &$batch) {
    for ($i = 1; $i <= 1000; $i++) {
        $batch->dispatch(new Inc(['number' => $i]));
    }
});

sleep(1);

print $batch->countJobs() . " jobs in a batch\n";
print $batch->countPendingJobs() . " batch jobs still pending for execution\n";
print $batch->countFailedJobs() . " batch jobs have failed to complete\n";
print $batch->countCompletedJobs() . " batch jobs were completed successfully\n";

所有批处理都有名称,因此可以使用命令行工具轻松找到它们。

通道

在某些情况下,拥有多个通道并且有消费者监听它们是有用的。例如,您可以在仅监听 mail 通道的邮件服务器上有一个消费者,而不监听其他通道(这些通道不适合执行的任务)。

默认情况下,所有任务都发送到主通道(QueueInterface::MAIN_CHANNEL),但在将任务添加到队列时可以指定通道

$dispatcher->registerChannels('new');
$dispatcher->execute(new Inc(['number' => 123]), 'new');

默认情况下,如果尝试将任务添加到未知通道,分发器将抛出异常。这可以被关闭

$dispatcher->exceptionOnUnregisteredChannel(false);

// This job will end up in the 'main' channel, but exception will not be thrown
$dispatcher->execute(new Inc(['number' => 123]), 'unknown channel');

后台进程

任务可以报告它们启动了进程

class ListAndForget extends Job
{
    /**
     * Report that we launched a background process
     */
    public function execute(): mixed
    {
        $output = [];
        exec("nohup ls -la > /dev/null 2>&1 & echo $!", $output);

        $pid = (integer) $output[1];

        if ($pid > 0) {
            $this->reportBackgroundProcess($pid);
        }
    }
}

当它们这样做时,队列清理和维护程序只要给定的PID进程正在运行,就不会将此工作视为挂起。当进程完成时(我们找不到它),则认为工作已完成。

可以使用QueueInterface::getBackgroundProcesses()方法查找启动进程的工作信息。此方法返回一个数组,其中数组中的每个记录包含一个工作ID、工作类型和进程ID。

print_r($dispatcher->getQueue()->getBackgroundProcesses());

将输出类似以下内容:

Array
(
    [0] => Array
        (
            [id] => 1
            [type] => ActiveCollab\JobsQueue\Test\Jobs\ProcessLauncher
            [process_id] => 12345
        )

)

注意:目前不支持在Windows系统上报告和监视进程。

从作业执行CLI命令

如果您需要让工作简单地运行CLI命令,有一个方便的特性ExecuteCliCommand。您可以传递命令调用签名、参数和环境变量到命令中。

示例用法

class RunCommand extends Job
{
    use ExecuteCliCommand;
    
    public function __construct(array $data = null)
    {
        $this->validateCommand($data);

        parent::__construct($data);
    }
    
    public function execute(): mixed
    {
        $data['command'] = 'php foobar.php'
        $data['command_environement_variables'] = ['foo' => 'bar'];
        $data['command_arguments'] = ['--baz' => 1];
        return $this->prepareCommandFromData($this->getData());
    }
} 

将生成一个CLI命令

export FOO='bar' && php foobar.php --baz=1 

版本5.0升级

向后兼容性说明

  1. 检查所有对JobsDispatcher::batch()方法的调用,并确保回调不期望通过引用传递$batch
  2. 如果客户端代码中使用了,请移除ActiveCollab\JobsQueue\DispatcherInterfaceActiveCollab\JobsQueue\Dispatcher

待办事项

  1. 将记录添加到MySQL队列中的所有相关方法。
  2. 实现工作隔离