mcfedr/queue-manager-bundle

管理作业队列的组件

安装次数: 88,975

依赖项: 7

建议者: 0

安全性: 0

星标: 9

关注者: 3

分支: 11

开放问题: 5

类型:symfony-bundle

7.2.0 2024-09-23 13:14 UTC

README

用于在 Symfony 中运行后台任务的组件。

Latest Stable Version License Build Status

此组件提供了一致的队列接口,具有可插拔的'驱动程序',可以使用多种不同的队列类型调度任务

还有许多'辅助'插件

  • Doctrine Delay Queue

    此插件可以提前调度作业,并在它们应该运行时将它们移动到实时队列。与不支持调度作业的 SQS 或 Beanstalkd 结合使用。

  • 周期性作业

    自动调度每小时/天/周或其他周期运行的作业。随机化实际时间以保持均匀的服务器负载。

概述

作业是实现 Worker 接口的 Symfony 服务。它有一个方法 execute(array $arguments)。作业的名称是服务名称。

您可以通过调用 $container->get(QueueManagerRegistry::class)->put($name, $arguments) 将作业添加到队列中。

请参阅您使用的驱动程序的文档,了解如何运行守护进程进程。

安装

Composer

composer require mcfedr/queue-manager-bundle

AppKernel

将组件包含到您的 AppKernel 中

    public function registerBundles()
    {
        $bundles = [
            ...
            new Mcfedr\QueueManagerBundle\McfedrQueueManagerBundle(),

配置

您必须配置一个(或多个)驱动程序来使用。通常您只有一个,并称为'default'。

Beanstalk

用法

Beanstalk 运行器是一个 Symfony 命令。如果您需要处理更多作业,可以运行多个实例。

./bin/console mcfedr:queue:{name}-runner

其中 {name} 是您在配置中使用的名称。添加 -v 或更多以获取详细日志。

配置

mcfedr_queue_manager:
    managers:
        default:
            driver: beanstalkd
            options:
                host: 127.0.0.1
                port: 11300
                default_queue: mcfedr_queue

QueueManager::put 支持的选项

  • queue - 将作业放入的队列名称。
  • priority - 作业优先级。
  • ttr - 运行时间,在作业重复之前完成作业的时间。
  • time - 表示调度此作业的 \DateTime 对象。
  • delay - 从现在开始到调度此作业的秒数。

AWS SQS

用法

SQS 运行器是一个 Symfony 命令。如果您需要处理更多作业,可以运行多个实例。

./bin/console mcfedr:queue:{name}-runner

其中 {name} 是您在配置中使用的名称。添加 -v 或更多以获取详细日志。

配置

mcfedr_queue_manager:
    managers:
        default:
            driver: sqs
            options:
                default_url: https://sqs.eu-west-1.amazonaws.com/...
                region: eu-west-1
                credentials:
                    key: 'my-access-key-id'
                    secret: 'my-secret-access-key'
                queues:
                    name: https://sqs.eu-west-1.amazonaws.com/...
                    name2: https://sqs.eu-west-1.amazonaws.com/...
  • default_url - 默认 SQS 队列 URL。
  • region - 队列所在的区域。如果不传递 sqs_client,则为必需。
  • credentials 可选 - 指定您的密钥和密码 这是可选的,因为 SDK 可以从 多个位置 获取您的凭据。
  • sqs_client - 要使用的 SQSClient 服务的名称。
  • queues 可选 - 允许您设置队列的简短名称映射,这使得使用多个队列并保持配置在一个地方变得更容易。

QueueManager::put 支持的选项

  • url - 一个包含队列 URL 的字符串。
  • queue - 一个包含配置中队列名称的字符串。
  • time - 表示调度此作业的 \DateTime 对象。 注意: SQS 可以延迟作业长达 15 分钟。
  • delay - 从现在开始到调度此作业的秒数。 注意: SQS 可以延迟作业长达 15 分钟。
  • ttr - 在 Amazon SQS 防止其他消费者接收和处理消息的时间段内(SQS 可见性超时)。

GCP Pub/Sub

用法

Pub/Sub 运行器是一个 Symfony 命令。如果需要处理更多任务,您可以运行多个实例。

./bin/console mcfedr:queue:{name}-runner

其中 {name} 是您在配置中使用的名称。添加 -v 或更多以获取详细日志。

配置

mcfedr_queue_manager:
    managers:
        default:
            driver: pub_sub
            options:
                default_subscription: 'test_sub'
                default_topic: 'projects/project/topics/test-topic'
                pub_sub_queues:
                    name1:
                        topic: 'projects/project/topics/test-topic'
                        subscription: 'test_sub'
  • default_subscription - 默认的 Pub/Sub 订阅以监听。
  • default_topic - 默认的 Pub/Sub 主题以推送。
  • key_file_path 可选 - 指定您的密钥文件。这是可选的,因为 SDK 可以从多个位置获取您的凭证。
  • pub_sub_client - 要使用的 PubSubClient 服务的名称。
  • pub_sub_queues 可选 - 允许您设置队列的简称映射,这使得使用多个队列并保持配置在一个地方变得更容易。每个队列都应该有一个 topicsubscription

QueueManager::put 支持的选项

  • topic - 一个包含队列 URL 的 string
  • queue - 一个包含配置中队列名称的字符串。

周期性

此驱动程序不运行任务,它需要另一个驱动程序来实际处理任务。

用法

此驱动程序没有运行守护程序,因为它只是插入其他驱动程序。使用 put 将任务放入此驱动程序,并使用 period 选项。

配置

mcfedr_queue_manager:
    managers:
        periodic:
            driver: periodic
            options:
                default_manager: delay
                default_manager_options: []

这将创建一个名为 "mcfedr_queue_manager.periodic"QueueManager 服务。

  • default_manager - 默认工作处理器,必须支持延迟任务,例如 Doctrine Delay
  • default_manager_options - 传递给工作处理器 put 的默认选项。

QueueManager::put 支持的选项

  • period - 任务运行之间的平均秒数。
  • manager - 为此任务使用不同的工作处理器。
  • manager_options - 传递给处理器 put 方法的选项。

Doctrine Delay

此驱动程序不运行任务,它需要另一个驱动程序来实际处理任务。

它目前 与 MySQL 一起工作,因为需要原生查询以并发安全地查找任务。

用法

除了您正在使用的任何其他守护程序外,您还应该运行延迟守护程序。此运行器简单地从 Doctrine 将任务移动到您的其他工作队列。因为它通常工作量不大,所以单个实例可以处理大量任务。

./bin/console mcfedr:queue:{name}-runner

其中 {name} 是您在配置中使用的名称。添加 -v 或更多以获取详细日志。

配置

mcfedr_queue_manager:
    managers:
        delay:
            driver: doctrine_delay
            options:
                entity_manager: default
                default_manager: default
                default_manager_options: []

这将创建一个名为 "mcfedr_queue_manager.delay"QueueManager 服务。

  • entity_manager - 要使用的 Doctrine 实体管理器。
  • default_manager - 默认工作处理器。
  • default_manager_options - 传递给工作处理器 put 的默认选项。

QueueManager::put 支持的选项

  • time - 表示调度此作业的 \DateTime 对象。
  • delay - 从现在开始到调度此作业的秒数。
  • force_delay - 一个布尔值,强制任务延迟指定的秒数。
  • manager - 为此任务使用不同的工作处理器。
  • manager_options - 传递给处理器 put 方法的选项。

注意

如果 delaytime 选项小于 30 秒,则除非将 force_delay 选项设置为 true,否则任务将计划立即执行。

表格

安装后,您需要更新模式,以便创建延迟任务的表。

其他选项

这些是许多其他选项的默认值。

mcfedr_queue_manager:
    retry_limit: 3
    sleep_seconds: 5
    report_memory: false
    doctrine_reset: true

Doctrine

为了避免内存泄漏,在任务执行后,实体管理器将被重置。

自 Symfony 3.2 以来,重置非惰性管理器服务已弃用,并在 4.0 版本中将抛出异常。因此,如果您使用 Symfony 3.2 或更高版本,您需要安装 symfony/proxy-manager-bridge 来支持 Lazy Services

composer require proxy-manager-bridge

用法

您可以通过注入 QueueManagerRegistry::class 并调用 put 来访问 QueueManagerRegistry 以简单访问您的队列。

此外,每个管理器都将是一个服务,您可以通过名称 "mcfedr_queue_manager.$name" 访问它。它实现了 QueueManager 接口,其中您可以调用两个简单的方法。

/**
 * Put a new job on a queue
 *
 * @param string $name The service name of the worker that implements {@link \Mcfedr\QueueManagerBundle\Queue\Worker}
 * @param array $arguments Arguments to pass to execute - must be json serializable
 * @param array $options Options for creating the job - these depend on the driver used
 */
public function put(string $name, array $arguments = [], array $options = []): Job

/**
 * Remove a job, you should call this to cancel a job
 *
 * @param $job
 * @throws WrongJobException
 * @throws NoSuchJobException
 */
public function delete(Job $job): void;

任务

要运行的任务是实现了 Mcfedr\QueueManagerBundle\Queue\Worker 的 Symfony 服务。有一个方法,该方法使用您传递给 QueueManager::put 的参数调用。

/**
 * Called to start the queued task
 *
 * @param array $arguments
 * @throws \Exception
 */
public function execute(array $arguments): void;

如果您的任务抛出异常,它将被重试(假设驱动程序支持重试),除非抛出的异常是 UnrecoverableJobExceptionInterface 的实例。

工人类应标记为mcfedr_queue_manager.worker,如果您使用自动装配,这将自动完成。

默认情况下,作业名称是类名,但您也可以添加具有特定ID的标记,例如。

Worker:
  tags:
  - { name: 'mcfedr_queue_manager.worker', id: 'test_worker' }

现在您可以同时使用以下方式安排此作业:

$queueManager->put(Worker::class, ...)
$queueManager->put('test_worker', ...)

事件

在作业执行过程中会触发多个事件。

创建自己的驱动程序

首先,驱动程序需要实现一个QueueManager。这应该将任务放入队列。

可以使用选项参数接受任何针对您实现的具体参数。例如,这可能包括支持延迟优先级

您还需要创建一个Job类,许多驱动程序可以直接扩展AbstractJob,但您可以添加任何额外需要的数据。

创建运行器

许多驱动程序可以使用RunnerCommand作为基础,实现getJob方法。

其他队列服务器有自己的运行器,在这种情况下,您需要编写代码以确保调用正确的工人。服务mcfedr_queue_manager.job_executor可以帮助您完成这项工作。