activecollab / jobsqueue
简单的任务队列
Requires
- php: >=8.0
- ext-json: *
- activecollab/databaseconnection: ^5.0
- doctrine/inflector: ^2.0
- psr/log: ^1.0
Requires (Dev)
- ext-mysqli: *
- friendsofphp/php-cs-fixer: ^2.0
- monolog/monolog: ~1.0
- phpunit/phpunit: ^9.0
- pimple/pimple: ~3.0
Suggests
- ext-mysqli: *
- ext-posix: *
- symfony/console: Use Symfony console to easily create CLI jobs consumer
- dev-master
- 5.0.1
- 5.0.0
- 4.1.1
- 4.1.0
- 4.0.0
- 3.0.3
- 3.0.2
- 3.0.1
- 3.0.0
- 2.1.1
- v2.0.x-dev
- 2.0.1
- 2.0.0
- 1.2.0
- 1.1.1
- 1.1.0
- 1.0.12
- 1.0.11
- 1.0.10
- 1.0.9
- 1.0.8
- 1.0.7
- 1.0.6
- 1.0.5
- 1.0.4
- 1.0.3
- 1.0.2
- 1.0.1
- 1.0.0
- 0.1.7
- 0.1.6
- 0.1.5
- 0.1.4
- 0.1.3
- 0.1.2
- 0.1.1
- 0.1.0
- dev-set-locale-for-escapeshellargs
- dev-array-in-argument
- dev-increase-delay-limit
- dev-fix-get-channels
- dev-release/0.1
This package is auto-updated.
Last update: 2024-09-11 19:57:36 UTC
README
存在理由:轻量级,依赖很少。它可以与cron + 数据库驱动的队列一起使用,适用于不允许运行适当的消息或工作管理服务器的人。或者,您可以使用它通过适当的消息或工作管理器执行工作。
安装
要安装它,请使用Composer
{ "require": { "activecollab/jobsqueue": "^1.0.0" } }
基本用法
此库使用三个元素
- 分发器用于分发任务
- 队列确保任务可以被排队
- 任务执行实际工作
作为演示,我们将创建一个简单的任务,该任务增加一个数字
<?php use ActiveCollab\JobsQueue\Jobs\Job; class Inc extends Job { /** * Increment a number */ public function execute(): mixed { return $this->getData()['number'] + 1; } }
提示:要失败一个尝试,只需在 execute()
方法内抛出异常。
现在,让我们创建一个管理一个MySQL驱动队列的分发器实例
<?php use ActiveCollab\JobsQueue\JobsDispatcher; use ActiveCollab\JobsQueue\Queue\MySqlQueue; use mysqli; use RuntimeException; $database_link = new MySQLi('localhost', 'root', '', 'activecollab_jobs_queue_test'); if ($database_link->connect_error) { throw new RuntimeException('Failed to connect to database. MySQL said: ' . $database_link->connect_error); } $queue = new MySqlQueue($database_link); // Not required but gives you flexibility with failure handling $queue->onJobFailure(function(Job $job, Exception $reason) { throw new Exception('Job ' . get_class($job) . ' failed', 0, $reason); }); $dispatcher = new JobsDispatcher($queue);
让我们将一个任务添加到队列中
$dispatcher->dispatch(new Inc([ 'number' => 123 ]));
执行队列中任务的代码将获得下一个可用的任务
$next_in_line = $dispatcher->getQueue()->nextInLine(); $dispatcher->getQueue()->execute($next_in_line);
要运行任务并等待结果,请使用 execute()
而不是 dispatch()
$result = $dispatcher->execute(new Inc([ 'number' => 123 ]));
当以这种方式调用时,任务将立即执行。execute()
默认情况下会抑制异常,因此如果您想异常冒泡出来,应将 $silent
设置为 false
$result = $dispatcher->execute(new Inc([ ‘number’ => 123 ]), false);
任务属性
在构造新的 Job
实例时,您可以设置一个任务数据数组以及以下任务属性
priority
- 介于 0 和 4294967295 之间的值,用于确定任务的重要性(值较高的任务优先级更高)。默认为 0(任务不是优先级),attempts
- 在将任务视为失败并被从队列中删除之前要尝试的次数。值可以是 1 到 256 之间的任何值。默认为 1(尝试一次,如果不好,则失败),delay
- 在第一次执行之前要等待的秒数(如果未设置first_attempt_delay
),以及如果任务失败需要重试时的重试。值可以是 1 到 7776000(90 天)之间的任何值。默认为 0(无延迟),first_attempt_delay
- 在第一次任务执行之前要等待的秒数。
$job = new Inc([ 'number' => 123, 'priority' => Job::HAS_HIGHEST_PRIORITY, 'attempts' => 5, 'delay' => 5, 'first_attempt_delay' => 1 ]);
在任务中访问属性
一旦在一个任务的 execute()
方法中,您可以使用 getData()
方法访问任务属性
public function execute(): mixed { print_r($this->getData()); // Print all job properties print $this->getData('number') . "\n"; // Print only number }
批处理
可以将任务批量添加到队列中。一旦在批处理中,任务队列将像其他任何任务一样执行它们,但您将能够跟踪批处理的进度
$batch = $dispatcher->batch('Testing batch', function(BatchInterface &$batch) { for ($i = 1; $i <= 1000; $i++) { $batch->dispatch(new Inc(['number' => $i])); } }); sleep(1); print $batch->countJobs() . " jobs in a batch\n"; print $batch->countPendingJobs() . " batch jobs still pending for execution\n"; print $batch->countFailedJobs() . " batch jobs have failed to complete\n"; print $batch->countCompletedJobs() . " batch jobs were completed successfully\n";
所有批处理都有名称,因此可以使用命令行工具轻松找到它们。
通道
在某些情况下,拥有多个通道并且有消费者监听它们是有用的。例如,您可以在仅监听 mail
通道的邮件服务器上有一个消费者,而不监听其他通道(这些通道不适合执行的任务)。
默认情况下,所有任务都发送到主通道(QueueInterface::MAIN_CHANNEL
),但在将任务添加到队列时可以指定通道
$dispatcher->registerChannels('new'); $dispatcher->execute(new Inc(['number' => 123]), 'new');
默认情况下,如果尝试将任务添加到未知通道,分发器将抛出异常。这可以被关闭
$dispatcher->exceptionOnUnregisteredChannel(false); // This job will end up in the 'main' channel, but exception will not be thrown $dispatcher->execute(new Inc(['number' => 123]), 'unknown channel');
后台进程
任务可以报告它们启动了进程
class ListAndForget extends Job { /** * Report that we launched a background process */ public function execute(): mixed { $output = []; exec("nohup ls -la > /dev/null 2>&1 & echo $!", $output); $pid = (integer) $output[1]; if ($pid > 0) { $this->reportBackgroundProcess($pid); } } }
当它们这样做时,队列清理和维护程序只要给定的PID进程正在运行,就不会将此工作视为挂起。当进程完成时(我们找不到它),则认为工作已完成。
可以使用QueueInterface::getBackgroundProcesses()
方法查找启动进程的工作信息。此方法返回一个数组,其中数组中的每个记录包含一个工作ID、工作类型和进程ID。
print_r($dispatcher->getQueue()->getBackgroundProcesses());
将输出类似以下内容:
Array
(
[0] => Array
(
[id] => 1
[type] => ActiveCollab\JobsQueue\Test\Jobs\ProcessLauncher
[process_id] => 12345
)
)
注意:目前不支持在Windows系统上报告和监视进程。
从作业执行CLI命令
如果您需要让工作简单地运行CLI命令,有一个方便的特性ExecuteCliCommand
。您可以传递命令调用签名、参数和环境变量到命令中。
示例用法
class RunCommand extends Job { use ExecuteCliCommand; public function __construct(array $data = null) { $this->validateCommand($data); parent::__construct($data); } public function execute(): mixed { $data['command'] = 'php foobar.php' $data['command_environement_variables'] = ['foo' => 'bar']; $data['command_arguments'] = ['--baz' => 1]; return $this->prepareCommandFromData($this->getData()); } }
将生成一个CLI命令
export FOO='bar' && php foobar.php --baz=1
版本5.0升级
向后兼容性说明
- 检查所有对
JobsDispatcher::batch()
方法的调用,并确保回调不期望通过引用传递$batch
。 - 如果客户端代码中使用了,请移除
ActiveCollab\JobsQueue\DispatcherInterface
和ActiveCollab\JobsQueue\Dispatcher
。
待办事项
- 将记录添加到MySQL队列中的所有相关方法。
- 实现工作隔离