flowpack/jobqueue-common

Flow应用程序中JobQueue的通用功能。

安装: 355,252

依赖项: 23

建议者: 0

安全性: 0

星标: 27

关注者: 9

分支: 25

开放问题: 4

类型:neos-package

3.4.0 2024-07-15 13:16 UTC

README

Neos Flow包,允许异步和分布式执行任务。

目录

快速入门(TL;DR)

  1. 使用composer安装此包
composer require flowpack/jobqueue-common

(或通过将依赖项添加到已安装包的composer声明中)

  1. 在您的Settings.yaml中添加以下内容以配置基本队列
Flowpack:
  JobQueue:
    Common:
      queues:
        'some-queue':
          className: 'Flowpack\JobQueue\Common\Queue\FakeQueue'
  1. (如果需要)初始化队列

使用

./flow queue:setup some-queue

您可以为设置队列和/或验证其配置。在FakeQueue的情况下,该步骤不是必需的。

注意: queue:setup命令不会删除任何现有消息,多次调用它不会有任何坏处。

  1. 注释任何希望异步执行的方法
use Flowpack\JobQueue\Common\Annotations as Job;

class SomeClass {

    /**
     * @Job\Defer(queueName="some-queue")
     */
    public function sendEmail($emailAddress)
    {
        // send some email to $emailAddress
    }
}

或使用属性代替注解(PHP 8.0及以后版本)

use Flowpack\JobQueue\Common\Annotations as Job;

class SomeClass {

    #[Job\Defer(queueName: "some-queue")]
    public function sendEmail($emailAddress)
    {
        // send some email to $emailAddress
    }
}

注意: 该方法需要是公共的,并且它不能返回任何内容

  1. (如果需要)启动工作进程

在上面的代码就绪后,每当调用SomeClass::sendEmail()方法时,该方法调用将被转换为异步执行的作业[1]。

除非您像示例中那样使用FakeQueue,否则需要启动一个所谓的worker来监听新作业并执行它们:

./flow flowpack.jobqueue.common:job:work some-queue --verbose

简介

为了开始,我们首先定义一些术语

消息
在程序或系统之间传递的信息片段,有时也称为“事件”。
在JobQueue包中,我们使用消息来传输“作业”。
消息队列
根据维基百科“消息队列 [...] 是软件工程组件,用于进程间通信 (IPC),或用于同一进程内的线程间通信”
在JobQueue包的上下文中,我们称“消息队列”为一个FIFO缓冲区,它将消息分配给一个或多个消费者,以确保每条消息只被处理一次。
作业
要执行的工作单元(异步)。
在JobQueue包中,我们使用消息队列来存储序列化的作业,因此它充当“作业流”。
作业管理器
允许向消息队列添加和检索作业的中央权威机构。
工作进程
工作进程监视队列并触发作业执行。
此包包含一个`job:work`命令来执行此操作(见下文)
提交
将新消息*提交*到队列中,以便由工作进程处理
保留
在消息可以处理之前,必须先*保留*。
队列保证单个消息永远不会被两个工作进程保留(除非它已经被释放)
释放
保留的消息可以*释放*到队列中,以便稍后处理。
如果作业执行失败并且队列的`maximumNumberOfReleases`设置大于零,则*作业管理器*会执行此操作
中止
如果消息无法成功处理,则将其 *中止* 并在相应的队列中标记为 *失败*,这样就不能再次保留它。
当作业执行失败且消息无法释放(再次)时,*作业管理器* 会中止消息。
完成
如果消息成功处理,则将其标记为 *完成*。
如果作业执行成功,则作业管理器会完成消息。

消息队列

Flowpack.JobQueue.Common 包提供了一个 非常基础的 消息队列实现 Flowpack\JobQueue\Common\Queue\FakeQueue,允许使用子请求执行作业。它不需要任何第三方工具或服务器循环,适用于基本场景。但请注意它有一些限制。

  1. 它实际上不是一个队列,而是在消息入队后立即调度作业。因此,无法将工作分配给多个工作者。

  2. 作业管理器不参与作业的处理,因此作业需要自行处理错误。

  3. 出于同样的原因,对于 FakeQueue信号 不会被 发出。

  4. 在 Flow 3.3+ 中,FakeQueue 支持一个标志 async。如果没有设置该标志,执行作业将 阻塞 主线程!

对于高级使用,建议使用以下实现包之一

配置

这是队列最简单的配置。

Flowpack:
  JobQueue:
    Common:
      queues:
        'test':
          className: 'Flowpack\JobQueue\Common\Queue\FakeQueue'

使用此配置将有一个名为 test 的队列可用。

注意: 对于可重用的包,应考虑添加供应商特定的前缀以避免冲突。我们建议使用类名或包名加上函数名(例如,Flowpack.ElasticSearch.ContentRepositoryQueueIndexer)。

队列参数

以下参数被所有队列支持

一个更复杂的例子可能看起来像这样

Flowpack:
  JobQueue:
    Common:
      queues:
        'email':
          className: 'Flowpack\JobQueue\Beanstalkd\Queue\BeanstalkdQueue'
          maximumNumberOfReleases: 5
          executeIsolated: true
          outputResults: true
          queueNamePrefix: 'staging-'
          options:
            client:
              host: 127.0.0.11
              port: 11301
            defaultTimeout: 50
          releaseOptions:
            priority: 512
            delay: 120
        'log':
          className: 'Flowpack\JobQueue\Redis\Queue\RedisQueue'
          options:
            defaultTimeout: 10

如你所见,你可以在一个安装中拥有多个队列。这允许你根据需求使用不同的后端/选项进行队列。

预设

如果多个查询共享相同的配置,可以使用 预设 来简化可读性和可维护性。

Flowpack:
  JobQueue:
    Common:
      presets:
        'staging-default':
          className: 'Flowpack\JobQueue\Doctrine\Queue\DoctrineQueue'
          queueNamePrefix: 'staging-'
          options:
            pollInterval: 2
      queues:
        'email':
          preset: 'staging-default'
          options:
            tableName: 'queue_email' # default table name would be "flowpack_jobqueue_messages_email"
        'log':
          preset: 'staging-default'
          options:
            pollInterval: 1 # overrides "pollInterval" of the preset

这将配置两个 DoctrineQueue,"email" 和 "log",具有一些通用选项,但不同的表名和轮询间隔。

作业队列

作业是任意实现 Flowpack\JobQueue\Common\Job\JobInterface 接口的类。此包提供了一个实现 StaticMethodCallJob,允许调用公共方法(请参阅 快速入门),但通常创建一个自定义作业更有意义。

<?php
use Flowpack\JobQueue\Common\Job\JobInterface;
use Flowpack\JobQueue\Common\Queue\Message;
use Flowpack\JobQueue\Common\Queue\QueueInterface;

class SendEmailJob implements JobInterface
{
    protected $emailAddress;

    public function __construct($emailAddress)
    {
        $this->emailAddress = $emailAddress;
    }


    public function execute(QueueInterface $queue, Message $message)
    {
        // TODO: send the email to $this->emailAddress
        return true;
    }

    public function getIdentifier()
    {
        return 'SendEmailJob';
    }

    public function getLabel()
    {
        return sprintf('SendEmailJob (email: "%S")', $this->emailAddress);
    }
}

注意: 确保成功时 execute() 方法返回 TRUE,否则相应的消息将再次释放并/或标记为 失败

有了这些,新的作业可以这样添加到队列中

use Flowpack\JobQueue\Common\Job\JobInterface;
use Flowpack\JobQueue\Common\Job\JobManager;
use Neos\Flow\Annotations as Flow;

class SomeClass {

    /**
     * @Flow\Inject
     * @var JobManager
     */
    protected $jobManager;

    /**
     * @return void
     */
    public function queueJob()
    {
        $job = new SendEmailJob('some@email.com');
        $this->jobManager->queue('queue-name', $job);
    }
}

命令行界面

使用 flowpack.jobqueue.common:queue:*flowpack.jobqueue.common:job:* 命令与作业队列交互

信号 & 插槽

当与作业队列一起工作时,适当的监控至关重要,因为失败可能不会立即可见。作业管理器会发出所有相关事件的信号,包括

  • messageSubmitted
  • messageTimeout
  • messageReserved
  • messageFinished
  • messageReleased
  • messageFailed

这些可以用于实现更复杂的日志记录,例如

<?php
namespace Your\Package;

use Flowpack\JobQueue\Common\Job\JobManager;
use Flowpack\JobQueue\Common\Queue\Message;
use Flowpack\JobQueue\Common\Queue\QueueInterface;
use Neos\Flow\Core\Bootstrap;
use Neos\Flow\Log\SystemLoggerInterface;
use Neos\Flow\Package\Package as BasePackage;

class Package extends BasePackage
{

    /**
     * @param Bootstrap $bootstrap
     * @return void
     */
    public function boot(Bootstrap $bootstrap)
    {
        $dispatcher = $bootstrap->getSignalSlotDispatcher();

        $dispatcher->connect(
            JobManager::class, 'messageFailed',
            function(QueueInterface $queue, Message $message, \Exception $jobExecutionException = null) use ($bootstrap) {
                $additionalData = [
                    'queue' => $queue->getName(),
                    'message' => $message->getIdentifier()
                ];
                if ($jobExecutionException !== null) {
                    $additionalData['exception'] = $jobExecutionException->getMessage();
                }
                $bootstrap->getObjectManager()->get(SystemLoggerInterface::class)->log('Job failed', LOG_ERR, $additionalData);
            }
        );
    }
}

这将记录每个失败的作业到系统日志。

许可证

此包受 MIT 许可证的许可。

贡献

欢迎 Pull-Requests。请确保阅读 行为准则

[1] 除非设置了 async 标志(需要 Flow 3.3+),否则 FakeQueue 实际上是 同步 执行作业的。