typo3/jobqueue-common

此软件包已被废弃且不再维护。作者建议使用flowpack/jobqueue-common软件包。

Flow应用程序中JobQueue的通用功能。

安装: 2,956

依赖项: 1

建议者: 0

安全: 0

星标: 26

关注者: 10

分支: 25

开放问题: 5

类型:neos-package

3.4.0 2024-07-15 13:16 UTC

README

Neos Flow软件包,允许异步和分布式执行任务。

目录

快速入门(总结)

  1. 使用composer安装此软件包
composer require flowpack/jobqueue-common

(或将依赖项添加到已安装软件包的composer清单中)

  1. 通过在您的 Settings.yaml 中添加以下内容来配置基本队列
Flowpack:
  JobQueue:
    Common:
      queues:
        'some-queue':
          className: 'Flowpack\JobQueue\Common\Queue\FakeQueue'
  1. (如果需要)初始化队列

使用

./flow queue:setup some-queue

您可以使用此命令设置队列和/或验证其配置。对于 FakeQueue,此步骤不是必需的。

注意: queue:setup 命令不会删除任何现有消息,多次调用它没有害处。

  1. 注释任何您想异步执行的方法
use Flowpack\JobQueue\Common\Annotations as Job;

class SomeClass {

    /**
     * @Job\Defer(queueName="some-queue")
     */
    public function sendEmail($emailAddress)
    {
        // send some email to $emailAddress
    }
}

或使用属性而不是注释(PHP 8.0及以后版本)

use Flowpack\JobQueue\Common\Annotations as Job;

class SomeClass {

    #[Job\Defer(queueName: "some-queue")]
    public function sendEmail($emailAddress)
    {
        // send some email to $emailAddress
    }
}

注意: 方法必须是 公共的,并且它不能返回任何内容

  1. (如果需要)启动工作进程

如果有上述代码,每当调用 SomeClass::sendEmail() 方法时,该方法调用将转换为异步执行的任务[1]。

除非您像示例中那样使用 FakeQueue,否则必须启动一个所谓的 worker,以监听新作业并执行它们:

./flow flowpack.jobqueue.common:job:work some-queue --verbose

简介

首先让我们定义一些术语

消息
在程序或系统之间传递的信息片段,有时也称为“事件”。
在JobQueue软件包中,我们使用消息来传输“作业”。
消息队列
根据维基百科,“消息队列 [...] 是软件工程组件,用于进程间通信(IPC)或同一进程内的线程间通信”。
在JobQueue软件包的上下文中,我们称“消息队列”为一个 FIFO 缓冲区,它将消息分发到一个或多个消费者,以确保每条消息只被处理一次。
作业
要执行的工作单元(异步)。
在JobQueue软件包中,我们使用消息队列来存储序列化的作业,因此它充当“作业流”。
作业管理器
允许向消息队列添加和检索作业的中心权威机构。
工作进程
工作进程监视队列并触发作业执行。
此软件包包含一个 `job:work` 命令,用于执行此操作(见下文)
提交
将新消息 *提交* 到队列,以便由工作进程处理
预留
在消息可以处理之前,必须先 *预留*。
队列保证单个消息永远不会被两个工作进程预留(除非它再次被释放)
释放
预留的消息可以 *释放* 到队列,以便稍后处理。
如果作业执行失败且队列的 `maximumNumberOfReleases` 设置大于零,则 *作业管理器* 执行此操作
中止
如果消息无法成功处理,则将其 *中止*,在相应的队列中标记为 *失败*,因此不能再次预留。
如果作业执行失败且消息无法释放(再次),*JobManager* 将终止消息。
结束
如果消息成功处理,则将其标记为*完成*。
如果作业执行成功,*JobManager* 将完成消息。

消息队列

Flowpack.JobQueue.Common 包附带了一个非常基础的 消息队列实现 Flowpack\JobQueue\Common\Queue\FakeQueue,允许使用子请求执行作业。它不需要任何第三方工具或服务器循环,适用于基本场景。但它有一些限制需要注意。

  1. 它实际上不是一个队列,而是在消息入队时立即调度作业。因此,无法将工作分配给多个工作者。

  2. JobManager 不参与作业处理,因此作业需要自行处理错误。

  3. 出于同样的原因,对于 FakeQueue信号 是不发出的。

  4. 从 Flow 3.3+ 开始,FakeQueue 支持一个 async 标志。如果没有设置此标志,执行作业将 阻塞 主线程!

对于高级使用,建议使用以下实现包之一:

配置

这是队列的最简单配置。

Flowpack:
  JobQueue:
    Common:
      queues:
        'test':
          className: 'Flowpack\JobQueue\Common\Queue\FakeQueue'

这样,将会有一个名为 test 的队列可用。

注意:对于可重用的包,应考虑添加供应商特定的前缀以避免冲突。我们建议使用类名或包名加函数名(例如,Flowpack.ElasticSearch.ContentRepositoryQueueIndexer)。

队列参数

以下参数由所有队列支持

一个更复杂的例子可能如下所示

Flowpack:
  JobQueue:
    Common:
      queues:
        'email':
          className: 'Flowpack\JobQueue\Beanstalkd\Queue\BeanstalkdQueue'
          maximumNumberOfReleases: 5
          executeIsolated: true
          outputResults: true
          queueNamePrefix: 'staging-'
          options:
            client:
              host: 127.0.0.11
              port: 11301
            defaultTimeout: 50
          releaseOptions:
            priority: 512
            delay: 120
        'log':
          className: 'Flowpack\JobQueue\Redis\Queue\RedisQueue'
          options:
            defaultTimeout: 10

如你所见,一个安装中可以有多个队列。这允许根据需求使用不同的后端/选项。

预设

如果多个查询共享相同的配置,可以使用 预设 以简化可读性和可维护性。

Flowpack:
  JobQueue:
    Common:
      presets:
        'staging-default':
          className: 'Flowpack\JobQueue\Doctrine\Queue\DoctrineQueue'
          queueNamePrefix: 'staging-'
          options:
            pollInterval: 2
      queues:
        'email':
          preset: 'staging-default'
          options:
            tableName: 'queue_email' # default table name would be "flowpack_jobqueue_messages_email"
        'log':
          preset: 'staging-default'
          options:
            pollInterval: 1 # overrides "pollInterval" of the preset

这将配置两个 DoctrineQueue,名为 "email" 和 "log",并使用一些共同选项,但不同的表名和轮询间隔。

作业队列

作业是一个实现了 Flowpack\JobQueue\Common\Job\JobInterface 的任意类。此包附带一个实现 StaticMethodCallJob,允许调用公共方法(见 快速入门),但通常有创建自定义作业的必要。

<?php
use Flowpack\JobQueue\Common\Job\JobInterface;
use Flowpack\JobQueue\Common\Queue\Message;
use Flowpack\JobQueue\Common\Queue\QueueInterface;

class SendEmailJob implements JobInterface
{
    protected $emailAddress;

    public function __construct($emailAddress)
    {
        $this->emailAddress = $emailAddress;
    }


    public function execute(QueueInterface $queue, Message $message)
    {
        // TODO: send the email to $this->emailAddress
        return true;
    }

    public function getIdentifier()
    {
        return 'SendEmailJob';
    }

    public function getLabel()
    {
        return sprintf('SendEmailJob (email: "%S")', $this->emailAddress);
    }
}

注意:至关重要的是,execute() 方法在成功时返回 TRUE,否则相应的消息将被再次释放并/或标记为 失败

在此基础上,可以将新的作业添加到队列中,如下所示:

use Flowpack\JobQueue\Common\Job\JobInterface;
use Flowpack\JobQueue\Common\Job\JobManager;
use Neos\Flow\Annotations as Flow;

class SomeClass {

    /**
     * @Flow\Inject
     * @var JobManager
     */
    protected $jobManager;

    /**
     * @return void
     */
    public function queueJob()
    {
        $job = new SendEmailJob('some@email.com');
        $this->jobManager->queue('queue-name', $job);
    }
}

命令行界面

使用 flowpack.jobqueue.common:queue:*flowpack.jobqueue.common:job:* 命令与作业队列交互。

信号 & 插槽

与作业队列一起工作时,适当的监控至关重要,因为失败可能不会立即显现。JobManager 会发出所有相关事件的信号,包括

  • messageSubmitted
  • messageTimeout
  • messageReserved
  • messageFinished
  • messageReleased
  • messageFailed

这些可以用于实现更复杂的日志记录,例如

<?php
namespace Your\Package;

use Flowpack\JobQueue\Common\Job\JobManager;
use Flowpack\JobQueue\Common\Queue\Message;
use Flowpack\JobQueue\Common\Queue\QueueInterface;
use Neos\Flow\Core\Bootstrap;
use Neos\Flow\Log\SystemLoggerInterface;
use Neos\Flow\Package\Package as BasePackage;

class Package extends BasePackage
{

    /**
     * @param Bootstrap $bootstrap
     * @return void
     */
    public function boot(Bootstrap $bootstrap)
    {
        $dispatcher = $bootstrap->getSignalSlotDispatcher();

        $dispatcher->connect(
            JobManager::class, 'messageFailed',
            function(QueueInterface $queue, Message $message, \Exception $jobExecutionException = null) use ($bootstrap) {
                $additionalData = [
                    'queue' => $queue->getName(),
                    'message' => $message->getIdentifier()
                ];
                if ($jobExecutionException !== null) {
                    $additionalData['exception'] = $jobExecutionException->getMessage();
                }
                $bootstrap->getObjectManager()->get(SystemLoggerInterface::class)->log('Job failed', LOG_ERR, $additionalData);
            }
        );
    }
}

这将记录每个失败的消息到系统日志。

许可证

此包采用 MIT 许可证。

贡献

欢迎拉取请求。请确保阅读 行为准则

[1] 除非设置 async 标志(需要 Flow 3.3+),否则 FakeQueue 实际上会同步执行作业。