netlogix/jobqueue-scheduled

允许安排作业并去重

2.0.0 2024-08-21 12:34 UTC

README

此包提供基于PDO的JobQueue作业调度器。

有两个主要目标

  1. 为后续执行安排Flowpack JobQueue作业。
  2. 确保单个作业只安排一次。

安装

composer require netlogix/jobqueue-scheduled

安排作业

本包的第一个目标是提供一种方法,可以将任何类型的JobQueue作业挂起,并标记为稍后执行。与例如“延迟”注解方法或JobQueue作业的默认行为相比,它具有时间方面。安排的作业不是立即执行,而是在安排时指定的某个时间执行。

常规作业队列作业需要是可序列化的。这只是Flowpack FakeQueue和t3n RabbitQueue的实现细节。

要安排现有的Flowpack作业,只需将其包装在安排作业中,并将其传递给调度器。

use Netlogix\JobQueue\Scheduled\Domain\Model\ScheduledJob;
use Netlogix\JobQueue\Scheduled\Domain\Scheduler;
use Flowpack\JobQueue\Common\Job\JobInterface;

assert($scheduler instanceof Scheduler);
assert($jobqueueJob instanceof JobInterface);

$dueDate = new \DateTimeImmutable('now + 1 minute');
$queueName = 'default';

$schedulerJob = new ScheduledJob(
    $jobqueueJob,
    $queueName,
    $dueDate
);
$scheduler->schedule($schedulerJob);

具有唯一标识符的作业

本包的第二个目标是避免重复安排。

某些作业不计算特定的计算,而是“一直运行”。Neos.EventSourcing包的catchup作业就是这样的例子。排队会触发某些事件监听器的catchup调用。

在第一个catchup作业运行时排队另一个catchup作业是好的,因为可能会有额外的更改。

当上一个catchup作业甚至尚未开始时,排队另一个catchup作业是不必要的,因为第一个作业将已经赶上到末端。

use Netlogix\JobQueue\Scheduled\Domain\Model\ScheduledJob;
use Netlogix\JobQueue\Scheduled\Domain\Scheduler;
use Flowpack\JobQueue\Common\Job\JobInterface;

assert($scheduler instanceof Scheduler);
assert($jobqueueJob instanceof JobInterface);

$dueDate = new \DateTimeImmutable('now + 1 minute');
$queueName = 'default';

$jobIdentifier = 'event-sourcing-catchup-' . $eventListenerName;

$schedulerJob = new ScheduledJob(
    $jobqueueJob,
    $queueName,
    $dueDate,
    $jobIdentifier
);
$scheduler->schedule($schedulerJob);

“安排”只会安排一个新作业,如果指定的标识符尚未安排。如果现有到期日与新作业提供的到期日存在冲突,则取最早值。

排队安排作业

安排的作业存在于数据库中,直到排队发生,否则不会进一步处理。

目前这是通过cron作业完成的。

* * * * *   ./flow scheduler:queueduejobs

内部调度机制将确保只有那些根据其各自的到期日值“到期”的作业才从调度器传递到作业队列。

自动安排作业

某些作业来自外部应用程序。一个例子是一个流程应用将作业放入RabbitMQ,另一个流程应用消费它。

之前唯一的实现是常规作业队列工作者,它既不提供延迟执行的方法,也没有去重功能。

现在每个作业都可以简单地实现ScheduledJobInterface。当触发execute()时,它现在被移动到安排作业队列。

作业本身必须提供有关如何安排其执行的所有必要详细信息。

abstract class AutoScheduledJob implements ScheduledJobInterface, JobInterface {
    public function getSchedulingInformation(): ?SchedulingInformation
    {
        return SchedulingInformation(
            '97528fab-c199-4f87-b1a5-4074f1e98749',
            'default-group',
            new DateTimeImmutable('now')
        );
    }
}