apinstein/jqjobs

PHP 的异步作业管理器。


README

JQJobs 是 PHP 的作业队列基础设施。

特性

  • 非常轻量级且易于使用。
  • 支持多种作业类型。
  • 支持多个队列并将工作绑定到特定队列。
  • 支持代理工作到第三方服务的作业。这允许其他作业进行处理,但通过 JQJobs 跟踪第三方作业的状态。
  • 跟踪作业入队时间、开始时间和完成时间。
  • 优先级调度。
  • 合并作业支持(如果有相同 coalesceId 的作业入队,则不会创建重复作业;返回原始作业)。这基本上是一个轻量级内置互斥锁,有助于防止为相同的“事物”创建重复作业。
  • 在超过 3M 作业处理了 2 年以上的高度并发生产环境中进行了测试。
  • 队列存储在架构上独立于 JQJobs;使用我们的 JQStore_Array 或 JQStore_Propel(数据库)或编写自己的。
  • 工作进程自动预检查内存可用性,并在内存不足的情况下优雅地重启,以避免作业处理期间的 OOM。
  • 工作进程自动检查所有正在使用的源代码文件,如果任何底层代码已修改,则优雅地重启。
  • 可以调整工作进程的优先级(例如,对于网络服务器上的后台任务)。
  • 记录失败的作业消息。
  • 工作进程设计为在 runit 或类似进程管理器下运行,以实现无需维护的操作。
  • 自动重试失败的作业。
  • 良好的测试覆盖率。
  • 实用类 JQDelayedJob 可以在脚本退出后轻松运行 PHP 代码。这是一种将日志等操作延迟到请求处理之后的方法,从而提高应用程序的性能。
  • 在高并发情况下可选抖动
  • 健壮的信号处理,包括 SIGTERM 上的优雅关闭
  • 健壮的自动缩放器,具有 Heroku 和 AWS(自动缩放组)驱动程序。
  • 检测挂起的作业(将重新入队)。
  • 内置对 ASYNC 作业的支持(将工作发送“离站”的作业,将通过 JQManagedJob::STATUS_WAIT_ASYNC 和 JQManagedJob::resolveWaitAsyncJob() 异步关闭)。

路线图

  • 队列管理工具(命令行和图形界面)

作业系统只有几个核心部分

  • JQJob 是实际执行工作的类的接口。
  • JQManagedJob 是 JQJob 的包装器,其中包含用于管理作业的元数据(状态、优先级等)。
  • JQStore 是 JQManagedJob 的持久化位置。应用程序将作业排队到 JQStore 以供后续处理。
  • JQWorker 从队列中运行作业。通常在后台进程中运行。

附加实用工具

  • JQAutoscaler 是一个可以管理自动缩放工作池的实用工具。
  • JQDelayedJob 是一个用于注册在脚本退出后运行的函数或作业的实用类。

JQStore 管理队列和 JQManagedJob 的持久化。

JQStore 是一个接口,但作业系统附带了一些具体的实现。系统以这种方式构建,以便将作业存储迁移到不同的后端存储(memcache、数据库、Amazon SQS 等)。JQStore 实现非常简单。

成功完成的作业会立即从队列中移除。失败的作业会重试直到达到最大尝试次数,然后被标记为“失败”并留在队列中。清理失败的条目由应用程序负责。

如果应用程序需要审计日志或作业历史存档,应在每个作业的run()/cleanup()中实现,或使用自定义的JQStore子类。

使用JQJobs所需的最小工作量包括:1) 创建至少一个作业;2) 创建一个队列存储;3) 添加作业;4) 启动一个工作线程。

  1. 创建作业

    class SampleJob implements JQJob { function __construct($info) { $this->info = $info; } function run() { print $this->description() . "\n"; } // no-op function cleanup() { print "cleanup() {$this->description()}\n"; } function statusDidChange(JQManagedJob $mJob, $oldStatus, $message) { print "SampleJob [Job {$mJob->getJobId()}] {$oldStatus} ==> {$mJob->getStatus()} {$message}\n"; } function description() { return "Sample job {$this->info}"; } }

  2. 创建队列存储

    $q = new JQStore_Array();

    // 或者,使用Propel创建基于数据库的队列: $con = Propel::getConnection(JQStoreManagedJobPeer::DATABASE_NAME); $q = new JQStore_Propel('JQStoreManagedJob', $con);

  3. 添加作业

    foreach (range(1,10) as $i) { $q->enqueue(new SampleJob($i)); }

  4. 启动一个工作线程来运行作业。

    declare(ticks = 1); // 使JQJobs能够优雅地处理SIGKILL(或其他立即终止信号) // declare(ticks=1)必须在全局范围内。 $w = new JQWorker($q); $w->start();

  5. 如果您想检测挂起作业,且没有使用JQAutoscaler,则需要安排一个任务来运行JQStore::detectHungJobs()。

=======================

JQDelayedJob 示例

JQDelayedJob::doLater(new MyJob('data'));
JQDelayedJob::doLater(function() { print "Hello, World. I am running from a delayed job after the script exits!"; });

安装

pear install apinstein.pearfarm.org/jqjobs

http://apinstein.pearfarm.org/apinstein/jqjobs

源代码

https://github.com/apinstein/jqjobs

JQStore 后端

Propel

目前,唯一支持数据库的JQStore实现是针对Propel ORM。JQJobs/Propel所需的所有迁移都在migrations/目录中,需要使用mp (github.com/apinstein/mp) 运行。这可以轻松适配为Propel插件,但尚未实现。

无论如何,只需确保在安装/升级JQJobs时,根据需要复制并重新排序迁移即可。