flowpack / jobqueue-common
Flow应用程序中JobQueue的通用功能。
Requires
- php: ^7.4 || ^8.0
- neos/cache: *
- neos/flow: ^6.3 || ^7.0 || ^8.0 || ^9.0
This package is auto-updated.
Last update: 2024-09-16 08:26:58 UTC
README
Neos Flow包,允许异步和分布式执行任务。
目录
快速入门(TL;DR)
- 使用composer安装此包
composer require flowpack/jobqueue-common
(或通过将依赖项添加到已安装包的composer声明中)
- 在您的
Settings.yaml
中添加以下内容以配置基本队列
Flowpack: JobQueue: Common: queues: 'some-queue': className: 'Flowpack\JobQueue\Common\Queue\FakeQueue'
- (如果需要)初始化队列
使用
./flow queue:setup some-queue
您可以为设置队列和/或验证其配置。在FakeQueue
的情况下,该步骤不是必需的。
注意: queue:setup
命令不会删除任何现有消息,多次调用它不会有任何坏处。
- 注释任何希望异步执行的方法
use Flowpack\JobQueue\Common\Annotations as Job; class SomeClass { /** * @Job\Defer(queueName="some-queue") */ public function sendEmail($emailAddress) { // send some email to $emailAddress } }
或使用属性代替注解(PHP 8.0及以后版本)
use Flowpack\JobQueue\Common\Annotations as Job; class SomeClass { #[Job\Defer(queueName: "some-queue")] public function sendEmail($emailAddress) { // send some email to $emailAddress } }
注意: 该方法需要是公共的,并且它不能返回任何内容
- (如果需要)启动工作进程
在上面的代码就绪后,每当调用SomeClass::sendEmail()
方法时,该方法调用将被转换为异步执行的作业[1]。
除非您像示例中那样使用FakeQueue
,否则需要启动一个所谓的worker
来监听新作业并执行它们:
./flow flowpack.jobqueue.common:job:work some-queue --verbose
简介
为了开始,我们首先定义一些术语
- 消息
- 在程序或系统之间传递的信息片段,有时也称为“事件”。
在JobQueue包中,我们使用消息来传输“作业”。 - 消息队列
- 根据维基百科“消息队列 [...] 是软件工程组件,用于进程间通信 (IPC),或用于同一进程内的线程间通信”。
在JobQueue包的上下文中,我们称“消息队列”为一个FIFO缓冲区,它将消息分配给一个或多个消费者,以确保每条消息只被处理一次。 - 作业
- 要执行的工作单元(异步)。
在JobQueue包中,我们使用消息队列来存储序列化的作业,因此它充当“作业流”。 - 作业管理器
- 允许向消息队列添加和检索作业的中央权威机构。
- 工作进程
- 工作进程监视队列并触发作业执行。
此包包含一个`job:work`命令来执行此操作(见下文) - 提交
- 将新消息*提交*到队列中,以便由工作进程处理
- 保留
- 在消息可以处理之前,必须先*保留*。
队列保证单个消息永远不会被两个工作进程保留(除非它已经被释放) - 释放
- 保留的消息可以*释放*到队列中,以便稍后处理。
如果作业执行失败并且队列的`maximumNumberOfReleases`设置大于零,则*作业管理器*会执行此操作 - 中止
- 如果消息无法成功处理,则将其 *中止* 并在相应的队列中标记为 *失败*,这样就不能再次保留它。
当作业执行失败且消息无法释放(再次)时,*作业管理器* 会中止消息。 - 完成
- 如果消息成功处理,则将其标记为 *完成*。
如果作业执行成功,则作业管理器会完成消息。
消息队列
Flowpack.JobQueue.Common
包提供了一个 非常基础的 消息队列实现 Flowpack\JobQueue\Common\Queue\FakeQueue
,允许使用子请求执行作业。它不需要任何第三方工具或服务器循环,适用于基本场景。但请注意它有一些限制。
-
它实际上不是一个队列,而是在消息入队后立即调度作业。因此,无法将工作分配给多个工作者。
-
作业管理器不参与作业的处理,因此作业需要自行处理错误。
-
出于同样的原因,对于
FakeQueue
,信号 不会被 发出。 -
在 Flow 3.3+ 中,
FakeQueue
支持一个标志async
。如果没有设置该标志,执行作业将 阻塞 主线程!
对于高级使用,建议使用以下实现包之一
配置
这是队列最简单的配置。
Flowpack: JobQueue: Common: queues: 'test': className: 'Flowpack\JobQueue\Common\Queue\FakeQueue'
使用此配置将有一个名为 test
的队列可用。
注意: 对于可重用的包,应考虑添加供应商特定的前缀以避免冲突。我们建议使用类名或包名加上函数名(例如,Flowpack.ElasticSearch.ContentRepositoryQueueIndexer)。
队列参数
以下参数被所有队列支持
一个更复杂的例子可能看起来像这样
Flowpack: JobQueue: Common: queues: 'email': className: 'Flowpack\JobQueue\Beanstalkd\Queue\BeanstalkdQueue' maximumNumberOfReleases: 5 executeIsolated: true outputResults: true queueNamePrefix: 'staging-' options: client: host: 127.0.0.11 port: 11301 defaultTimeout: 50 releaseOptions: priority: 512 delay: 120 'log': className: 'Flowpack\JobQueue\Redis\Queue\RedisQueue' options: defaultTimeout: 10
如你所见,你可以在一个安装中拥有多个队列。这允许你根据需求使用不同的后端/选项进行队列。
预设
如果多个查询共享相同的配置,可以使用 预设 来简化可读性和可维护性。
Flowpack: JobQueue: Common: presets: 'staging-default': className: 'Flowpack\JobQueue\Doctrine\Queue\DoctrineQueue' queueNamePrefix: 'staging-' options: pollInterval: 2 queues: 'email': preset: 'staging-default' options: tableName: 'queue_email' # default table name would be "flowpack_jobqueue_messages_email" 'log': preset: 'staging-default' options: pollInterval: 1 # overrides "pollInterval" of the preset
这将配置两个 DoctrineQueue
,"email" 和 "log",具有一些通用选项,但不同的表名和轮询间隔。
作业队列
作业是任意实现 Flowpack\JobQueue\Common\Job\JobInterface
接口的类。此包提供了一个实现 StaticMethodCallJob
,允许调用公共方法(请参阅 快速入门),但通常创建一个自定义作业更有意义。
<?php use Flowpack\JobQueue\Common\Job\JobInterface; use Flowpack\JobQueue\Common\Queue\Message; use Flowpack\JobQueue\Common\Queue\QueueInterface; class SendEmailJob implements JobInterface { protected $emailAddress; public function __construct($emailAddress) { $this->emailAddress = $emailAddress; } public function execute(QueueInterface $queue, Message $message) { // TODO: send the email to $this->emailAddress return true; } public function getIdentifier() { return 'SendEmailJob'; } public function getLabel() { return sprintf('SendEmailJob (email: "%S")', $this->emailAddress); } }
注意: 确保成功时 execute()
方法返回 TRUE,否则相应的消息将再次释放并/或标记为 失败。
有了这些,新的作业可以这样添加到队列中
use Flowpack\JobQueue\Common\Job\JobInterface; use Flowpack\JobQueue\Common\Job\JobManager; use Neos\Flow\Annotations as Flow; class SomeClass { /** * @Flow\Inject * @var JobManager */ protected $jobManager; /** * @return void */ public function queueJob() { $job = new SendEmailJob('some@email.com'); $this->jobManager->queue('queue-name', $job); } }
命令行界面
使用 flowpack.jobqueue.common:queue:*
和 flowpack.jobqueue.common:job:*
命令与作业队列交互
信号 & 插槽
当与作业队列一起工作时,适当的监控至关重要,因为失败可能不会立即可见。作业管理器会发出所有相关事件的信号,包括
- messageSubmitted
- messageTimeout
- messageReserved
- messageFinished
- messageReleased
- messageFailed
这些可以用于实现更复杂的日志记录,例如
<?php namespace Your\Package; use Flowpack\JobQueue\Common\Job\JobManager; use Flowpack\JobQueue\Common\Queue\Message; use Flowpack\JobQueue\Common\Queue\QueueInterface; use Neos\Flow\Core\Bootstrap; use Neos\Flow\Log\SystemLoggerInterface; use Neos\Flow\Package\Package as BasePackage; class Package extends BasePackage { /** * @param Bootstrap $bootstrap * @return void */ public function boot(Bootstrap $bootstrap) { $dispatcher = $bootstrap->getSignalSlotDispatcher(); $dispatcher->connect( JobManager::class, 'messageFailed', function(QueueInterface $queue, Message $message, \Exception $jobExecutionException = null) use ($bootstrap) { $additionalData = [ 'queue' => $queue->getName(), 'message' => $message->getIdentifier() ]; if ($jobExecutionException !== null) { $additionalData['exception'] = $jobExecutionException->getMessage(); } $bootstrap->getObjectManager()->get(SystemLoggerInterface::class)->log('Job failed', LOG_ERR, $additionalData); } ); } }
这将记录每个失败的作业到系统日志。
许可证
此包受 MIT 许可证的许可。
贡献
欢迎 Pull-Requests。请确保阅读 行为准则。
[1] 除非设置了 async
标志(需要 Flow 3.3+),否则 FakeQueue
实际上是 同步 执行作业的。