symiote/silverstripe-queuedjobs

一个用于定义和按队列方式运行后台作业的框架

安装次数: 644 885

依赖项: 79

建议者: 18

安全性: 1

星标: 57

关注者: 11

分支: 73

开放性问题: 51

类型:silverstripe-vendormodule

5.2.0-beta1 2024-08-14 23:46 UTC

README

CI Silverstripe supported module

概览

队列作业模块为 Silverstripe 开发者提供了一个框架,用于定义应作为后台任务运行的长时间运行的过程。这种异步处理允许用户在允许的时候继续使用系统,同时长时间运行的任务也在进行。

模块包括

  • CMS 中用于查看当前运行作业或计划作业列表的部分。
  • 用于定义您自己的作业的抽象骨架类。
  • 作为 cronjob 执行的作业,用于收集和执行作业。
  • 用于清理队列作业描述数据库表的预配置作业。

安装

composer require symbiote/silverstripe-queuedjobs

现在设置一个 cron 作业

*/1 * * * * /path/to/silverstripe/vendor/bin/sake dev/tasks/ProcessJobQueueTask
  • 要计划在未来的某个时间点执行作业,请通过调用 queueJob 传递一个日期。以下将在现在开始后 1 天运行发布作业。
use SilverStripe\ORM\FieldType\DBDatetime;
use Symbiote\QueuedJobs\Services\QueuedJobService;

$publish = new PublishItemsJob(21);
QueuedJobService::singleton()
    ->queueJob($publish, DBDatetime::create()->setValue(DBDatetime::now()->getTimestamp() + 86400)->Rfc2822());

使用 Doorman 运行作业

默认包含 Doorman,允许异步任务处理。

这需要您运行一个基于 Unix 的系统,或者在某些类型的环境模拟器(如 cygwin)中。

为了启用此功能,配置 ProcessJobQueueTask 以使用此后端。

在您的 YML 中设置以下内容

---
Name: localproject
After: '#queuedjobsettings'
---
SilverStripe\Core\Injector\Injector:
  Symbiote\QueuedJobs\Services\QueuedJobService:
    properties:
      queueRunner: '%$DoormanRunner'

使用 Gearman 运行作业

---
Name: localproject
After: '#queuedjobsettings'
---
SilverStripe\Core\Injector\Injector:
  QueueHandler:
    class: Symbiote\QueuedJobs\Services\GearmanQueueHandler
  • 使用 php gearman/gearman_runner.php 在您的 SS 根目录中运行 gearman 工作进程

这将导致所有队列作业通过 gearman 工作进程(src/workers/JobWorker.php)立即触发,除非设置了 StartAfter 日期,对于这些作业,您仍然需要上面的 cron 设置

使用 QueuedJob::IMMEDIATE 作业

队列作业可以通过使用基于文件的通知系统立即执行(而不是受 cron 的 1 分钟间隔限制)。这依赖于类似于 inotifywait 的东西来监视一个文件夹(默认情况下为 SILVERSTRIPE_CACHE_DIR/queuedjobs)并触发 ProcessJobQueueTask,如上所述,但传递 job=$filename 作为参数。在 queuedjobs/scripts 中有一个示例脚本,它将运行 inotifywait 然后当新的作业准备好运行时调用 ProcessJobQueueTask。

注意 - 如果您没有运行此功能,请确保设置 QueuedJobService::$use_shutdown_function = true;,以便立即模式作业不会停滞。通过将其设置为 true,立即作业将在请求完成后作为 php 脚本结束时执行。

默认作业

某些作业应该始终处于运行或队列运行状态,例如数据刷新或定期清理作业,我们称这些作业为默认作业。在每次作业队列处理结束时检查默认作业,使用作业类型和任何筛选器字段创建 SQL 查询,例如

ArbitraryName:
  type: 'ScheduledExternalImportJob'
  filter:
    JobTitle: 'Scheduled import from Services'

将变为

QueuedJobDescriptor::get()->filter([
  'type' => 'ScheduledExternalImportJob',
  'JobTitle' => 'Scheduled import from Services'
]);

此查询检查是否存在至少1个符合过滤条件且处于健康状态(新、运行、等待或暂停)的工作任务。如果没有且yml配置中的recreate为true,我们使用构造数组作为参数传递给新的工作任务对象,例如

ArbitraryName:
  type: 'ScheduledExternalImportJob'
  filter:
    JobTitle: 'Scheduled import from Services'
  recreate: 1
  construct:
    repeat: 300
    contentItem: 100
      target: 157

如果上述工作任务缺失,它将被重新创建为

Injector::inst()->createWithArgs(ScheduledExternalImportJob::class, $construct[])

暂停默认工作任务

如果您需要停止默认工作任务以避免触发警报和重新创建,请在CMS中将现有工作任务副本设置为“暂停”。

YML配置

默认工作任务在yml配置中定义,以下示例涵盖了选项和预期值

SilverStripe\Core\Injector\Injector:
  Symbiote\QueuedJobs\Services\QueuedJobService:
    properties:
      defaultJobs:
        # This key is used as the title for error logs and alert emails
        ArbitraryName:
          # The job type should be the class name of a job REQUIRED
          type: 'ScheduledExternalImportJob'
          # This plus the job type is used to create the SQL query REQUIRED
          filter:
            # 1 or more Fieldname: 'value' sets that will be queried on REQUIRED
            #  These can be valid ORM filter
            JobTitle: 'Scheduled import from Services'
          # Parameters set on the recreated object OPTIONAL
          construct:
            # 1 or more Fieldname: 'value' sets be passed to the constructor REQUIRED
            # If your constructor needs none, put something arbitrary
            repeat: 300
            title: 'Scheduled import from Services'
          # A date/time format string for the job's StartAfter field REQUIRED
          # The shown example generates strings like "2020-02-27 01:00:00"
          startDateFormat: 'Y-m-d H:i:s'
          # A string acceptable to PHP's date() function for the job's StartAfter field REQUIRED
          startTimeString: 'tomorrow 01:00'
          # Sets whether the job will be recreated or not OPTIONAL
          recreate: 1
          # Set the email address to send the alert to if not set site admin email is used OPTIONAL
          email: '[email protected]'
        # Minimal implementation will send alerts but not recreate
        AnotherTitle:
          type: 'AJob'
          filter:
            JobTitle: 'A job'

配置CleanupJob

默认情况下,CleanupJob被禁用。要启用它,请在您的YML中设置以下内容

Symbiote\QueuedJobs\Jobs\CleanupJob:
  is_enabled: true

您需要手动在UI中触发首次运行。之后,CleanupJob将每天运行一次。

您可以将此工作任务配置为根据工作任务的数目或年龄进行清理。这通过cleanup_method设置进行配置 - 当前有效值是“age”(默认)和“number”。每种方法都有一个关联值 - 这是一个整数,通过cleanup_value设置。对于“age”,它将被转换为天数;对于“number”,它是按照LastEdited排序的保留的最小记录数。默认值是30,因为我们期望的是天数。

您可以确定哪些JobStatuses允许被清理。默认设置是清理“Broken”和“Complete”工作任务。所有其他状态都可以通过cleanup_statuses进行配置。您还可以定义query_limit以限制清理工作查询/删除的行数(默认为100k)。

默认配置如下所示

Symbiote\QueuedJobs\Jobs\CleanupJob:
  is_enabled: false
  query_limit: 100000
  cleanup_method: "age"
  cleanup_value: 30
  cleanup_statuses:
    - Broken
    - Complete

工作任务队列暂停设置

可以启用一个设置,允许暂停队列工作任务的处理。要启用它,将以下代码添加到您的配置YAML文件中

Symbiote\QueuedJobs\Services\QueuedJobService:
  lock_file_enabled: true
  lock_file_path: '/shared-folder-path'

队列设置标签将出现在CMS设置中,并将有一个选项来暂停队列工作任务的处理。如果启用,则不会启动新的工作任务,但已运行的工作任务将被允许完成。这在计划停机时间,如与第三方服务相关的队列工作任务维护或数据库恢复/备份操作时非常有用。

请注意,此维护锁定状态存储在文件中。故意不使用数据库作为存储,因为在某些维护操作期间可能不可用。请确保lock_file_path指向一个共享驱动器上的文件夹,如果您正在运行具有多个实例的服务器。

文件锁定的一个好处是在关键故障(例如:网站崩溃且CMS不可用)的情况下,您仍然可以访问文件系统并手动更改文件锁定。这为您在运行的工作任务导致问题的情形下提供了额外的灾难恢复选项。

健康检查

工作任务通过步骤跟踪其执行情况 - 当工作任务运行时,它会增加已运行的“步骤”数。定期检查工作任务以确保它们处于健康状态。这确保了在健康检查之间,工作任务的步骤计数始终在增加。默认情况下,当工作员开始运行队列时,会执行健康检查。

在多工作员环境中,这可能导致健康检查过于频繁时出现问题。您可以通过以下配置禁用自动健康检查

Symbiote\QueuedJobs\Services\QueuedJobService:
  disable_health_check: true

除了配置设置外,还有一个任务可以与cron一起使用,以确保检测到不健康的工作任务

*/5 * * * * /path/to/silverstripe/vendor/bin/sake dev/tasks/CheckJobHealthTask

特殊工作任务变量

了解应在您的工作任务实现中使用的特殊变量是好的。

  • totalSteps(整数)- 默认为0,映射到TotalSteps数据库列,仅信息
  • currentStep(整数)- 默认为0,映射到StepsProcessed数据库列,队列运行者使用此值来确定工作任务是否停滞
  • isComplete (布尔型) - 默认为 false,与 JobStatus 数据库列相关,队列运行程序使用此变量来判断作业是否已完成。

有关 JobJobDescriptor 之间的映射的更多详细信息,请参阅 copyJobToDescriptor

总步骤数

表示完成作业所需的步骤总数。

  • 此变量应设置为在作业执行期间预计要经历的步骤数量
  • 这需要在您的作业的 setup() 函数中完成,之后不应更改此值
  • 此变量不被队列运行程序使用,仅用于指示所需的步骤数(信息性)
  • 建议不要在您的作业的 process() 函数内部使用此变量,而是根据作业数据确定您的作业是否已完成(如果还有剩余项要处理)

当前步骤

表示已处理的步骤数。

  • 您的作业应在每次成功完成一个作业步骤时递增此变量
  • 队列运行程序将读取此变量以确定您的作业是否停滞不前
  • 建议在每次递增此变量时从 process() 函数返回
  • 这允许队列运行程序通过将您的作业进度保存到存储在数据库中的作业描述符来创建检查点

是否完成

表示作业状态(完成或未完成)。

  • 将此变量设置为 true 将向队列运行程序发出信号,将其标记为成功完成
  • 您的作业应仅将此变量设置为 true 一次

示例

此示例说明了如何在您的作业实现中使用每个特殊变量。

<?php

namespace App\Jobs;

use Symbiote\QueuedJobs\Services\AbstractQueuedJob;

/**
 * Class MyJob
 *
 * @property array $items
 * @property array $remaining
 */
class MyJob extends AbstractQueuedJob
{
    public function hydrate(array $items): void
    {
        $this->items = $items;
    }

    /**
     * @return string
     */
    public function getTitle(): string
    {
        return 'My awesome job';
    }

    public function setup(): void
    {
        $this->remaining = $this->items;

        // Set the total steps to the number of items we want to process
        $this->totalSteps = count($this->items);
    }

    public function process(): void
    {
        $remaining = $this->remaining;

        // check for trivial case
        if (count($remaining) === 0) {
            $this->isComplete = true;

            return;
        }

        $item = array_shift($remaining);

        // code that will process your item goes here
        $this->doSomethingWithTheItem($item);

        $this->remaining = $remaining;

        // Updating current step tells the Queue runner that the job is progressing
        $this->currentStep += 1;

        // check for job completion
        if (count($remaining) > 0) {
            // Note that we do not process more than one item at a time
            // this makes the Queue runner save the job progress into DB
            // in case something goes wrong the job will be resumed from the last checkpoint
            return;
        }

        // Queue runner will mark this job as finished
        $this->isComplete = true;
    }
}

故障排除

为了确保您的作业正常工作,您首先可以尝试在不使用队列框架的情况下直接执行作业 - 这可以通过手动调用 setup()process() 方法来完成。如果在这种情况下工作正常,请尝试让 getJobType() 返回 QueuedJob::IMMEDIATE 以立即执行,而无需持久化或通过 cron 执行。如果这行得通,请确保您的 cronjob 已正确配置并正在执行。

如果您正在定义自己的作业类,请注意,当作业在队列上启动时,作业类是 没有 参数传递而构建的;这意味着如果您接受构造函数参数,您 必须 在使用之前检测它们是否存在。有关更多信息,请参阅 此问题此维基页面

如果您正在定义自己的作业,请确保您遵循 PSR约定,即使用 "YourVendor" 而不是 "SilverStripe"。

确保已配置通知,以便您可以收到关于停滞或损坏的作业的更新。您可以在配置中设置以下通知电子邮件地址

SilverStripe\Control\Email\Email:
  queued_job_admin_email: [email protected]

文档

显示作业数据

如果您需要通过 CMS 以调试为目的轻松访问额外的作业数据,请通过包含以下配置启用 show_job_data 选项。

Symbiote\QueuedJobs\DataObjects\QueuedJobDescriptor:
  show_job_data: true

这将向作业描述符编辑表单添加作业数据和消息原始选项卡。显示的信息是只读的。

贡献

翻译

自然语言字符串的翻译通过第三方翻译接口 transifex.com 管理。新添加的字符串将定期上传到那里进行翻译,任何新的翻译都将合并回项目源代码。

请使用 https://www.transifex.com/projects/p/silverstripe-queuedjobs 来贡献翻译,而不是发送包含 YAML 文件的拉取请求。