typo3 / jobqueue-common
Requires
- php: ^7.4 || ^8.0
- neos/cache: *
- neos/flow: ^6.3 || ^7.0 || ^8.0 || ^9.0
This package is auto-updated.
Last update: 2024-07-16 08:09:17 UTC
README
Neos Flow软件包,允许异步和分布式执行任务。
目录
快速入门(总结)
- 使用composer安装此软件包
composer require flowpack/jobqueue-common
(或将依赖项添加到已安装软件包的composer清单中)
- 通过在您的
Settings.yaml
中添加以下内容来配置基本队列
Flowpack: JobQueue: Common: queues: 'some-queue': className: 'Flowpack\JobQueue\Common\Queue\FakeQueue'
- (如果需要)初始化队列
使用
./flow queue:setup some-queue
您可以使用此命令设置队列和/或验证其配置。对于 FakeQueue
,此步骤不是必需的。
注意: queue:setup
命令不会删除任何现有消息,多次调用它没有害处。
- 注释任何您想异步执行的方法
use Flowpack\JobQueue\Common\Annotations as Job; class SomeClass { /** * @Job\Defer(queueName="some-queue") */ public function sendEmail($emailAddress) { // send some email to $emailAddress } }
或使用属性而不是注释(PHP 8.0及以后版本)
use Flowpack\JobQueue\Common\Annotations as Job; class SomeClass { #[Job\Defer(queueName: "some-queue")] public function sendEmail($emailAddress) { // send some email to $emailAddress } }
注意: 方法必须是 公共的,并且它不能返回任何内容
- (如果需要)启动工作进程
如果有上述代码,每当调用 SomeClass::sendEmail()
方法时,该方法调用将转换为异步执行的任务[1]。
除非您像示例中那样使用 FakeQueue
,否则必须启动一个所谓的 worker
,以监听新作业并执行它们:
./flow flowpack.jobqueue.common:job:work some-queue --verbose
简介
首先让我们定义一些术语
- 消息
- 在程序或系统之间传递的信息片段,有时也称为“事件”。
在JobQueue软件包中,我们使用消息来传输“作业”。 - 消息队列
- 根据维基百科,“消息队列 [...] 是软件工程组件,用于进程间通信(IPC)或同一进程内的线程间通信”。
在JobQueue软件包的上下文中,我们称“消息队列”为一个 FIFO 缓冲区,它将消息分发到一个或多个消费者,以确保每条消息只被处理一次。 - 作业
- 要执行的工作单元(异步)。
在JobQueue软件包中,我们使用消息队列来存储序列化的作业,因此它充当“作业流”。 - 作业管理器
- 允许向消息队列添加和检索作业的中心权威机构。
- 工作进程
- 工作进程监视队列并触发作业执行。
此软件包包含一个 `job:work` 命令,用于执行此操作(见下文) - 提交
- 将新消息 *提交* 到队列,以便由工作进程处理
- 预留
- 在消息可以处理之前,必须先 *预留*。
队列保证单个消息永远不会被两个工作进程预留(除非它再次被释放) - 释放
- 预留的消息可以 *释放* 到队列,以便稍后处理。
如果作业执行失败且队列的 `maximumNumberOfReleases` 设置大于零,则 *作业管理器* 执行此操作 - 中止
- 如果消息无法成功处理,则将其 *中止*,在相应的队列中标记为 *失败*,因此不能再次预留。
如果作业执行失败且消息无法释放(再次),*JobManager* 将终止消息。 - 结束
- 如果消息成功处理,则将其标记为*完成*。
如果作业执行成功,*JobManager* 将完成消息。
消息队列
Flowpack.JobQueue.Common
包附带了一个非常基础的 消息队列实现 Flowpack\JobQueue\Common\Queue\FakeQueue
,允许使用子请求执行作业。它不需要任何第三方工具或服务器循环,适用于基本场景。但它有一些限制需要注意。
-
它实际上不是一个队列,而是在消息入队时立即调度作业。因此,无法将工作分配给多个工作者。
-
JobManager
不参与作业处理,因此作业需要自行处理错误。 -
出于同样的原因,对于
FakeQueue
,信号 是不发出的。 -
从 Flow 3.3+ 开始,
FakeQueue
支持一个async
标志。如果没有设置此标志,执行作业将 阻塞 主线程!
对于高级使用,建议使用以下实现包之一:
配置
这是队列的最简单配置。
Flowpack: JobQueue: Common: queues: 'test': className: 'Flowpack\JobQueue\Common\Queue\FakeQueue'
这样,将会有一个名为 test
的队列可用。
注意:对于可重用的包,应考虑添加供应商特定的前缀以避免冲突。我们建议使用类名或包名加函数名(例如,Flowpack.ElasticSearch.ContentRepositoryQueueIndexer)。
队列参数
以下参数由所有队列支持
一个更复杂的例子可能如下所示
Flowpack: JobQueue: Common: queues: 'email': className: 'Flowpack\JobQueue\Beanstalkd\Queue\BeanstalkdQueue' maximumNumberOfReleases: 5 executeIsolated: true outputResults: true queueNamePrefix: 'staging-' options: client: host: 127.0.0.11 port: 11301 defaultTimeout: 50 releaseOptions: priority: 512 delay: 120 'log': className: 'Flowpack\JobQueue\Redis\Queue\RedisQueue' options: defaultTimeout: 10
如你所见,一个安装中可以有多个队列。这允许根据需求使用不同的后端/选项。
预设
如果多个查询共享相同的配置,可以使用 预设 以简化可读性和可维护性。
Flowpack: JobQueue: Common: presets: 'staging-default': className: 'Flowpack\JobQueue\Doctrine\Queue\DoctrineQueue' queueNamePrefix: 'staging-' options: pollInterval: 2 queues: 'email': preset: 'staging-default' options: tableName: 'queue_email' # default table name would be "flowpack_jobqueue_messages_email" 'log': preset: 'staging-default' options: pollInterval: 1 # overrides "pollInterval" of the preset
这将配置两个 DoctrineQueue
,名为 "email" 和 "log",并使用一些共同选项,但不同的表名和轮询间隔。
作业队列
作业是一个实现了 Flowpack\JobQueue\Common\Job\JobInterface
的任意类。此包附带一个实现 StaticMethodCallJob
,允许调用公共方法(见 快速入门),但通常有创建自定义作业的必要。
<?php use Flowpack\JobQueue\Common\Job\JobInterface; use Flowpack\JobQueue\Common\Queue\Message; use Flowpack\JobQueue\Common\Queue\QueueInterface; class SendEmailJob implements JobInterface { protected $emailAddress; public function __construct($emailAddress) { $this->emailAddress = $emailAddress; } public function execute(QueueInterface $queue, Message $message) { // TODO: send the email to $this->emailAddress return true; } public function getIdentifier() { return 'SendEmailJob'; } public function getLabel() { return sprintf('SendEmailJob (email: "%S")', $this->emailAddress); } }
注意:至关重要的是,execute()
方法在成功时返回 TRUE,否则相应的消息将被再次释放并/或标记为 失败。
在此基础上,可以将新的作业添加到队列中,如下所示:
use Flowpack\JobQueue\Common\Job\JobInterface; use Flowpack\JobQueue\Common\Job\JobManager; use Neos\Flow\Annotations as Flow; class SomeClass { /** * @Flow\Inject * @var JobManager */ protected $jobManager; /** * @return void */ public function queueJob() { $job = new SendEmailJob('some@email.com'); $this->jobManager->queue('queue-name', $job); } }
命令行界面
使用 flowpack.jobqueue.common:queue:*
和 flowpack.jobqueue.common:job:*
命令与作业队列交互。
信号 & 插槽
与作业队列一起工作时,适当的监控至关重要,因为失败可能不会立即显现。JobManager
会发出所有相关事件的信号,包括
- messageSubmitted
- messageTimeout
- messageReserved
- messageFinished
- messageReleased
- messageFailed
这些可以用于实现更复杂的日志记录,例如
<?php namespace Your\Package; use Flowpack\JobQueue\Common\Job\JobManager; use Flowpack\JobQueue\Common\Queue\Message; use Flowpack\JobQueue\Common\Queue\QueueInterface; use Neos\Flow\Core\Bootstrap; use Neos\Flow\Log\SystemLoggerInterface; use Neos\Flow\Package\Package as BasePackage; class Package extends BasePackage { /** * @param Bootstrap $bootstrap * @return void */ public function boot(Bootstrap $bootstrap) { $dispatcher = $bootstrap->getSignalSlotDispatcher(); $dispatcher->connect( JobManager::class, 'messageFailed', function(QueueInterface $queue, Message $message, \Exception $jobExecutionException = null) use ($bootstrap) { $additionalData = [ 'queue' => $queue->getName(), 'message' => $message->getIdentifier() ]; if ($jobExecutionException !== null) { $additionalData['exception'] = $jobExecutionException->getMessage(); } $bootstrap->getObjectManager()->get(SystemLoggerInterface::class)->log('Job failed', LOG_ERR, $additionalData); } ); } }
这将记录每个失败的消息到系统日志。
许可证
此包采用 MIT 许可证。
贡献
欢迎拉取请求。请确保阅读 行为准则。
[1] 除非设置 async
标志(需要 Flow 3.3+),否则 FakeQueue
实际上会同步执行作业。