mcfedr / queue-manager-bundle
管理作业队列的组件
Requires
- php: >=8.0
- ext-json: *
- nesbot/carbon: ^1|^2
- ramsey/uuid: ^3.7|^4.1
- symfony/framework-bundle: ^5.0|^6.0|^7.0
Requires (Dev)
- aws/aws-sdk-php: ^3.15
- doctrine/doctrine-bundle: ^1.6|^2.0
- doctrine/orm: ^2.5
- friendsofphp/php-cs-fixer: ^3.0
- google/cloud-pubsub: ^1.13
- pda/pheanstalk: ^3.1|^v4
- phpstan/phpstan: ^0.12|^1.0
- phpstan/phpstan-doctrine: ^0.12|^1.0
- phpstan/phpstan-phpunit: ^0.12|^1.0
- phpstan/phpstan-symfony: ^0.12|^1.0
- phpunit/phpunit: ^9
- symfony/browser-kit: ^5.0|^6.0|^7.0
- symfony/dotenv: ^5.0|^6.0|^7.0
- symfony/monolog-bundle: ^3.0|^4.0
- symfony/phpunit-bridge: *
- symfony/proxy-manager-bridge: ^5.0|^6.0|^7.0
- symfony/yaml: ^5.0|^6.0|^7.0
Suggests
- ext-pcntl: For cleaner job handling in runner
- aws/aws-sdk-php: Required to use SQS driver
- doctrine/doctrine-bundle: Required to use doctrine driver
- google/cloud-pubsub: Required to use Pub/Sub driver
- pda/pheanstalk: Required to use beanstalkd driver
- symfony/proxy-manager-bridge: Required to use doctrine driver
- dev-master
- 7.2.0
- 7.1.1
- 7.1.0
- 7.0.0
- 6.8.1
- 6.8.0
- 6.7.0
- 6.6.1
- 6.6.0
- 6.5.0
- 6.4.3
- 6.4.2
- 6.4.1
- 6.4.0
- 6.3.0
- 6.2.0
- 6.1.2
- 6.1.1
- 6.1.0
- 6.0.3
- 6.0.2
- 6.0.1
- 6.0.0
- 5.10.0
- 5.9.0
- 5.8.3
- 5.8.2
- 5.8.1
- 5.8.0
- 5.7.2
- 5.7.1
- 5.7.0
- 5.6.1
- 5.6.0
- 5.5.0
- 5.4.5
- 5.4.4
- 5.4.3
- 5.4.2
- 5.4.1
- 5.4.0
- 5.3.1
- 5.3.0
- 5.2.1
- 5.2.0
- 5.1.0
- 5.0.4
- 5.0.3
- 5.0.2
- 5.0.1
- 5.0.0
- 4.1.0
- 4.0.3
- 4.0.2
- 4.0.1
- 4.0.0
- 3.3.3
- 3.3.2
- 3.3.1
- 3.3.0
- 3.2.0
- 3.1.1
- 3.1.0
- 3.0.2
- 3.0.1
- 3.0.0
- 2.3.0
- 2.2.0
- 2.1.0
- 2.0.1
- 2.0.0
- 1.2.1
- 1.2.0
- 1.1.1
- 1.1.0
- 1.0.2
- 1.0.1
- 1.0.0
- dev-dependabot/composer/guzzlehttp/psr7-2.4.5
- dev-dependabot/composer/guzzlehttp/guzzle-7.5.0
- dev-dependabot/composer/symfony/http-kernel-6.0.20
- dev-Zizitto-master
- dev-delay-no-orm
This package is auto-updated.
Last update: 2024-09-23 13:15:24 UTC
README
用于在 Symfony 中运行后台任务的组件。
此组件提供了一致的队列接口,具有可插拔的'驱动程序',可以使用多种不同的队列类型调度任务
还有许多'辅助'插件
-
此插件可以提前调度作业,并在它们应该运行时将它们移动到实时队列。与不支持调度作业的 SQS 或 Beanstalkd 结合使用。
-
周期性作业
自动调度每小时/天/周或其他周期运行的作业。随机化实际时间以保持均匀的服务器负载。
概述
作业是实现 Worker
接口的 Symfony 服务。它有一个方法 execute(array $arguments)
。作业的名称是服务名称。
您可以通过调用 $container->get(QueueManagerRegistry::class)->put($name, $arguments)
将作业添加到队列中。
请参阅您使用的驱动程序的文档,了解如何运行守护进程进程。
安装
Composer
composer require mcfedr/queue-manager-bundle
AppKernel
将组件包含到您的 AppKernel 中
public function registerBundles() { $bundles = [ ... new Mcfedr\QueueManagerBundle\McfedrQueueManagerBundle(),
配置
您必须配置一个(或多个)驱动程序来使用。通常您只有一个,并称为'default'。
Beanstalk
用法
Beanstalk 运行器是一个 Symfony 命令。如果您需要处理更多作业,可以运行多个实例。
./bin/console mcfedr:queue:{name}-runner
其中 {name}
是您在配置中使用的名称。添加 -v
或更多以获取详细日志。
配置
mcfedr_queue_manager: managers: default: driver: beanstalkd options: host: 127.0.0.1 port: 11300 default_queue: mcfedr_queue
QueueManager::put
支持的选项
queue
- 将作业放入的队列名称。priority
- 作业优先级。ttr
- 运行时间,在作业重复之前完成作业的时间。time
- 表示调度此作业的\DateTime
对象。delay
- 从现在开始到调度此作业的秒数。
AWS SQS
用法
SQS 运行器是一个 Symfony 命令。如果您需要处理更多作业,可以运行多个实例。
./bin/console mcfedr:queue:{name}-runner
其中 {name}
是您在配置中使用的名称。添加 -v
或更多以获取详细日志。
配置
mcfedr_queue_manager: managers: default: driver: sqs options: default_url: https://sqs.eu-west-1.amazonaws.com/... region: eu-west-1 credentials: key: 'my-access-key-id' secret: 'my-secret-access-key' queues: name: https://sqs.eu-west-1.amazonaws.com/... name2: https://sqs.eu-west-1.amazonaws.com/...
default_url
- 默认 SQS 队列 URL。region
- 队列所在的区域。如果不传递sqs_client
,则为必需。credentials
可选 - 指定您的密钥和密码 这是可选的,因为 SDK 可以从 多个位置 获取您的凭据。sqs_client
- 要使用的SQSClient
服务的名称。queues
可选 - 允许您设置队列的简短名称映射,这使得使用多个队列并保持配置在一个地方变得更容易。
QueueManager::put
支持的选项
url
- 一个包含队列 URL 的字符串。queue
- 一个包含配置中队列名称的字符串。time
- 表示调度此作业的\DateTime
对象。 注意: SQS 可以延迟作业长达 15 分钟。delay
- 从现在开始到调度此作业的秒数。 注意: SQS 可以延迟作业长达 15 分钟。ttr
- 在 Amazon SQS 防止其他消费者接收和处理消息的时间段内(SQS 可见性超时)。
GCP Pub/Sub
用法
Pub/Sub 运行器是一个 Symfony 命令。如果需要处理更多任务,您可以运行多个实例。
./bin/console mcfedr:queue:{name}-runner
其中 {name}
是您在配置中使用的名称。添加 -v
或更多以获取详细日志。
配置
mcfedr_queue_manager: managers: default: driver: pub_sub options: default_subscription: 'test_sub' default_topic: 'projects/project/topics/test-topic' pub_sub_queues: name1: topic: 'projects/project/topics/test-topic' subscription: 'test_sub'
default_subscription
- 默认的 Pub/Sub 订阅以监听。default_topic
- 默认的 Pub/Sub 主题以推送。key_file_path
可选 - 指定您的密钥文件。这是可选的,因为 SDK 可以从多个位置获取您的凭证。pub_sub_client
- 要使用的PubSubClient
服务的名称。pub_sub_queues
可选 - 允许您设置队列的简称映射,这使得使用多个队列并保持配置在一个地方变得更容易。每个队列都应该有一个topic
和subscription
。
QueueManager::put
支持的选项
topic
- 一个包含队列 URL 的string
。queue
- 一个包含配置中队列名称的字符串。
周期性
此驱动程序不运行任务,它需要另一个驱动程序来实际处理任务。
用法
此驱动程序没有运行守护程序,因为它只是插入其他驱动程序。使用 put
将任务放入此驱动程序,并使用 period
选项。
配置
mcfedr_queue_manager: managers: periodic: driver: periodic options: default_manager: delay default_manager_options: []
这将创建一个名为 "mcfedr_queue_manager.periodic"
的 QueueManager
服务。
default_manager
- 默认工作处理器,必须支持延迟任务,例如 Doctrine Delay。default_manager_options
- 传递给工作处理器put
的默认选项。
QueueManager::put
支持的选项
period
- 任务运行之间的平均秒数。manager
- 为此任务使用不同的工作处理器。manager_options
- 传递给处理器put
方法的选项。
Doctrine Delay
此驱动程序不运行任务,它需要另一个驱动程序来实际处理任务。
它目前 仅 与 MySQL 一起工作,因为需要原生查询以并发安全地查找任务。
用法
除了您正在使用的任何其他守护程序外,您还应该运行延迟守护程序。此运行器简单地从 Doctrine 将任务移动到您的其他工作队列。因为它通常工作量不大,所以单个实例可以处理大量任务。
./bin/console mcfedr:queue:{name}-runner
其中 {name}
是您在配置中使用的名称。添加 -v
或更多以获取详细日志。
配置
mcfedr_queue_manager: managers: delay: driver: doctrine_delay options: entity_manager: default default_manager: default default_manager_options: []
这将创建一个名为 "mcfedr_queue_manager.delay"
的 QueueManager
服务。
entity_manager
- 要使用的 Doctrine 实体管理器。default_manager
- 默认工作处理器。default_manager_options
- 传递给工作处理器put
的默认选项。
QueueManager::put
支持的选项
time
- 表示调度此作业的\DateTime
对象。delay
- 从现在开始到调度此作业的秒数。force_delay
- 一个布尔值,强制任务延迟指定的秒数。manager
- 为此任务使用不同的工作处理器。manager_options
- 传递给处理器put
方法的选项。
注意
如果 delay
或 time
选项小于 30 秒,则除非将 force_delay
选项设置为 true
,否则任务将计划立即执行。
表格
安装后,您需要更新模式,以便创建延迟任务的表。
其他选项
这些是许多其他选项的默认值。
mcfedr_queue_manager: retry_limit: 3 sleep_seconds: 5 report_memory: false doctrine_reset: true
Doctrine
为了避免内存泄漏,在任务执行后,实体管理器将被重置。
自 Symfony 3.2 以来,重置非惰性管理器服务已弃用,并在 4.0 版本中将抛出异常。因此,如果您使用 Symfony 3.2 或更高版本,您需要安装 symfony/proxy-manager-bridge 来支持 Lazy Services。
composer require proxy-manager-bridge
用法
您可以通过注入 QueueManagerRegistry::class
并调用 put
来访问 QueueManagerRegistry
以简单访问您的队列。
此外,每个管理器都将是一个服务,您可以通过名称 "mcfedr_queue_manager.$name"
访问它。它实现了 QueueManager
接口,其中您可以调用两个简单的方法。
/**
* Put a new job on a queue
*
* @param string $name The service name of the worker that implements {@link \Mcfedr\QueueManagerBundle\Queue\Worker}
* @param array $arguments Arguments to pass to execute - must be json serializable
* @param array $options Options for creating the job - these depend on the driver used
*/
public function put(string $name, array $arguments = [], array $options = []): Job
/**
* Remove a job, you should call this to cancel a job
*
* @param $job
* @throws WrongJobException
* @throws NoSuchJobException
*/
public function delete(Job $job): void;
任务
要运行的任务是实现了 Mcfedr\QueueManagerBundle\Queue\Worker
的 Symfony 服务。有一个方法,该方法使用您传递给 QueueManager::put
的参数调用。
/**
* Called to start the queued task
*
* @param array $arguments
* @throws \Exception
*/
public function execute(array $arguments): void;
如果您的任务抛出异常,它将被重试(假设驱动程序支持重试),除非抛出的异常是 UnrecoverableJobExceptionInterface
的实例。
工人类应标记为mcfedr_queue_manager.worker
,如果您使用自动装配,这将自动完成。
默认情况下,作业名称是类名,但您也可以添加具有特定ID的标记,例如。
Worker: tags: - { name: 'mcfedr_queue_manager.worker', id: 'test_worker' }
现在您可以同时使用以下方式安排此作业:
$queueManager->put(Worker::class, ...) $queueManager->put('test_worker', ...)
事件
在作业执行过程中会触发多个事件。
创建自己的驱动程序
首先,驱动程序需要实现一个QueueManager
。这应该将任务放入队列。
可以使用选项参数接受任何针对您实现的具体参数。例如,这可能包括支持延迟
或优先级
。
您还需要创建一个Job
类,许多驱动程序可以直接扩展AbstractJob
,但您可以添加任何额外需要的数据。
创建运行器
许多驱动程序可以使用RunnerCommand
作为基础,实现getJob
方法。
其他队列服务器有自己的运行器,在这种情况下,您需要编写代码以确保调用正确的工人。服务mcfedr_queue_manager.job_executor
可以帮助您完成这项工作。