printed/rabbitmq-queue-bundle

RabbitMQ生产者和消费者助手包装器

5.2.0 2023-08-09 12:38 UTC

README

Symfony的RabbitMQ包装器包,旨在提高您使用消费者和生产者的体验。该包依赖于php-amqplib/rabbitmq-bundle包。

显著特性

  • 可取消的队列任务
  • 队列任务完成进度跟踪
  • 一个基于supervisord的零停机时间部署程序
  • (通过 php-amqplib/rabbitmq-bundle) 队列消费者的优雅最大执行时间
  • 队列任务重试

设置与依赖

我们假设您熟悉php-amqplib/rabbitmq-bundle的配置和设置。

对memcached用户的特别说明

请打开您的项目中的vendor/doctrine/cache/lib/Doctrine/Common/Cache/MemcachedCache.php文件,查看您是否可以找到以下代码片段

protected function doContains($id)
{
    return false !== $this->memcached->get($id)
        || $this->memcached->getResultCode() !== Memcached::RES_NOTFOUND;
}

如果找到了,请考虑将doctrine/cache的版本升级到至少1.7.0,否则CacheQueueMaintenanceStrategy可能会在连接到您的memcached服务器出现任何问题时说维护模式已启动。

包配置

printedcom_rabbitmq_queue_bundle:
  options:
    # Name of the service that acts as a default producer in RabbitMQ. See below this code snippet for details.
    default_rabbitmq_producer_name: 'default_rabbitmq_producer'

    # Name of the service that implements the queue maintenance mode. Use one of the following:
    #
    # 1. 'printed.bundle.queue.service.queue_maintenance.cache_queue_maintenance_strategy' (recommended)
    # 2. 'printed.bundle.queue.service.queue_maintenance.filesystem_queue_maintenance_strategy'
    #
    # To understand the difference, see the comments in the source code.
    queue_maintenance_strategy__service_name: 'printed.bundle.queue.service.queue_maintenance.cache_queue_maintenance_strategy'
  
    # Name of a cache service that implements the requirements outlined in the CacheQueueMaintenanceStrategy
    cache_queue_maintenance_strategy__cache_service_name: 'rabbitmq_queue_bundle_cache'

    # Name of the service that implements the New Deployments Detection feature. Choose one of the following:
    #
    # 1. 'printed.bundle.queue.service.new_deployments_detector.noop_strategy' - disables this functionality
    # 2. 'printed.bundle.queue.service.new_deployments_detector.cache_strategy'
    new_deployments_detector_strategy__service_name: 'printed.bundle.queue.service.new_deployments_detector.cache_strategy'

    # Name of a cache service that implements the requirements outlined in the CacheQueueMaintenanceStrategy
    new_deployments_detector_strategy__cache_service_name: 'rabbitmq_queue_bundle_cache'

    # Exit code used to exit a worker when it's detected that it's running old code
    consumer_exit_code__running_using_old_code: 15

    # With tools like supervisord, it's important to have consumers running without exiting for a specified amount of time
    # in order to prove that the script started successfully. This is a problem if a consumer manages to start, connect to rabbitmq 
    # and fail due to exception being thrown during execution of the task faster than the specified amount of time. The following
    # option makes the consumer not fail too fast. You essentially want to put your supervisord's "startsecs" value here (in seconds).
    # Bear in mind that the underlying code will always add 1 second to whatever value you put here in order to compensate for
    # fraction of seconds that aren't taken into account during evaluating of how long the script has been running for. This means that
    # you just need to put the value from supervisord's config "as is" without worrying about race conditions.
    #
    # You can disable this feature by either setting this option to null or not mentioning this option at all. I.e. by default this option
    # is disabled.
    minimal_runtime_in_seconds_on_consumer_exception: 1

    rabbitmq_user: '%rabbitmq_user%'
    rabbitmq_password: '%rabbitmq_pass%'

    # Pass '/' or don't set this option if you don't know what rabbtimq vhost is.
    rabbitmq_vhost: '%rabbitmq_vhost%'

    # This is used only by commands that call the rabbit management api. You don't need to do 
    # anything with this key if you don't use those commands.
    rabbitmq_api_base_url: 'http://%rabbitmq_host%:15672'
  • rabbitmq-queue-bundle.default_rabbitmq_producer_name 预期您至少有一个以下配置的RabbitMQ生产者
producers:
    default:
        connection:       default
        service_alias:    default_rabbitmq_producer

service_alias的值应在rabbitmq-queue-bundle.default_rabbitmq_producer_name参数中提供。此配置创建一个RabbitMQ生产者,它通过队列名称将任务派发到适当的队列。这也被称为RabbitMQ中的“默认”生产者。

重要通知:为您的消费者使用专用的EntityManager。

AbstractQueueConsumer应理想地使用专用的独立EntityManager来处理QueueTasks。这是为了防止当应用程序的默认EntityManager停止操作时(例如,由于“已关闭”错误或由于实体管理器的单元工作已被清除)对QueueTasks进行更改时出现问题。

为了实现这一点,请为DoctrineBundle使用多EntityManager配置,然后将专用的EntityManager提供给消费者构造函数。

示例配置

# DoctrineBundle configuration
doctrine:
  # (...)
  orm:
    # (...)
    default_entity_manager: default
    entity_managers:
      default:
        auto_mapping: true

      queue_consumer:
        mappings:
          Printed\Bundle\Queue:
            type: annotation
            prefix: Printed\Bundle\Queue\Entity
            dir: '%kernel.project_dir%/vendor/printed/rabbitmq-queue-bundle/src/Printed/Bundle/Queue/Entity'

# Registering your queue consumer e.g. in services.yml
acme.queue.consumer.my_queue_task1:
  class: 'Acme\Bundle\Queue\Consumer\MyQueueTask1QueueConsumer'
  arguments:
    - '@doctrine.orm.default_entity_manager'
    - '@validator'
    - '@monolog.logger'
    - '@Psr\Container\ContainerInterface'
    - '@printed.bundle.queue.service.service_container_parameters'
    - '@doctrine.orm.queue_consumer_entity_manager'
  tags:
    - container.service_subscriber

用法

假设您已为前面的包完成配置并运行基本的Symfony演示应用程序,以下步骤应该相当容易遵循。根据需要可以做出一些小的更改,但示例旨在帮助传达要点。

示例rabbitmq.yml配置

old_sound_rabbit_mq:
    connections:
        default:
            host: '%rabbitmq_host%'
            port: '%rabbitmq_port%'
            user: '%rabbitmq_user%'
            password: '%rabbitmq_pass%'
            vhost: '/'
            lazy: true
            connection_timeout: 3
            
            # this needs to be 2x of the heartbeat option
            read_write_timeout: 7200
            
            # don't forget that you should enable tcp keepalive in rabbitmq as well: https://rabbitmq.cn/networking.html#socket-gen-tcp-options
            keepalive: true
            
            # keep this value high because https://github.com/php-amqplib/RabbitMqBundle/issues/301
            heartbeat: 3600
    
    producers:
        default:
            connection:       default
            service_alias:    default_rabbitmq_producer
    
    consumers:
        upload_picture: &consumer_template
            connection:       default
            queue_options:    { name: 'upload-picture' }
            callback:         upload_picture_service
            qos_options:      { prefetch_size: 0, prefetch_count: 1, global: false }
            graceful_max_execution:
                timeout: 1800
                exit_code: 10
        
        render_treasure_map:
            <<: *consumer_template
            queue_options:    { name: 'render_treasure_map' }
            callback:         render_treasure_map_service

Monolog

消费者公开了一个属性$this->logger,它将是一个注册在Symfony中的monolog/monolog实例,具有queue通道。请随意处理通道,但至少需要在您的monolog配置中添加channels: [queue]

生产(有效载荷)

我们使用称为 payloads 的类来包含消费者的数据(也称为工作者)。当此类被构建并交给派发器时,将首先使用 symfony/validator 进行验证,然后序列化并存储到数据库中。新创建的记录的 ID 将与在抽象方法 getExchangeName 中定义的交换机一起发送到 RabbitMQ 队列。这允许在不需要担心目的地并保持代码更新时,轻松地调度 payloads。

namespace AppBundle\Queue\Payload;

use Printed\Bundle\Queue\Queue\AbstractQueuePayload;

use Symfony\Component\Validator\Constraints as Assert;

/**
 * {@inheritdoc}
 */
class ExampleQueuePayload extends AbstractQueuePayload
{

    /**
     * @Assert\NotNull()
     * @Assert\Type(type="integer")
     */
    protected $data;

    /**
     * {@inheritdoc}
     */
    public static function getExchangeName(): string
    {
        return 'example_exchange';
    }
    
    // public function getData()
    // public function setData()
    // ..

}

要调度此 payload,您可以使用通过此包提供的派发器类。您可以通过 printed.bundle.queue.service.queue_task_dispatcher 对容器进行访问。例如 ..

class MyServiceClass
{
    private $queueTaskDispatcher;

    public function __construct(QueueTaskDispatcher $queueTaskDispatcher)
    {
        $this->queueTaskDispatcher = $queueTaskDispatcher;
    }

    public function doSomething()
    {
        // do something..

        $payload = new ExampleQueuePayload;
        $payload->setData($myData);
        
        $this->queueTaskDispatcher->dispatch($payload);
    }
}
services:
  MyServiceClass:
    class: MyServiceClass
    arguments:
      - '@printed.bundle.queue.service.queue_task_dispatcher'

消费(工作者)

php-amqplib/rabbitmq-bundle 中所述,您的消费者需要注册为服务并分配到配置中的消费者定义。这里也是一样的,但我们需要一些所有消费者都通用的参数。

services:
  app_bundle.queue.consumer.example_consumer:
    class: 'AppBundle\Queue\Consumer\ExampleQueueConsumer'
    arguments:
      - '@doctrine.orm.entity_manager'
      - '@validator'
      - '@monolog.logger.queue'
      - '@Psr\Container\ContainerInterface'
      - '@printed.bundle.queue.service.service_container_parameters'
    public: false
    tags:
      - container.service_subscriber

请注意,需要使用 container.service_subscriber 标签才能使用 ServiceSubscriber Symfony 功能 链接

然后,您的工作将非常熟悉,只是进行了一些更改和增强。

namespace AppBundle\Queue\Consumer;

use AppBundle\Queue\Payload\ExampleQueuePayload;

use Printed\Bundle\Queue\Exception\Consumer\QueueFatalErrorException;
use Printed\Bundle\Queue\Queue\AbstractQueueConsumer;
use Printed\Bundle\Queue\Queue\AbstractQueuePayload;

use Symfony\Component\HttpFoundation\Request;

/**
 * {@inheritdoc}
 */
class ExampleQueueConsumer extends AbstractQueueConsumer
{
    /**
     * {@inheritDoc}
     */
    public static function getSubscribedServices(): array
    {
        return array_merge(parent::getSubscribedServices(), [
            'some_service' => MySomeService::class,
        ]);
    }

    /**
     * {@inheritdoc}
     *
     * @param ExampleQueuePayload $payload
     */
    public function run(AbstractQueuePayload $payload): bool
    {

        //  You hava access to all the standard things ..
        $someService = $this->locator->get('some_service');
        $usefulParameter = $this->containerParameters->get('my_app.useful_parameter');

        $this->em->persist($entity);
        $this->logger->info('Oh noes');

        //  Setting a task response data.
        $this->task->setResponseData(['something' => true]);

        //  Do lots of things ..
        //  And a few more things ..
        
        return self::TASK_COMPLETE;

    }

}

任务必须始终以布尔值退出,true 表示通过,false 表示失败。为了使此信息更加详细,您可以使用常量 TASK_COMPLETETASK_FAILED,它们只是那些布尔值。当将任务标记为失败时,任务对尝试的计数在将其送回队列进行另一次尝试之前会增加。每个任务都会获得由消费者中的 getAttemptLimit() 方法指定的尝试限制。您可以自由覆盖此限制,但默认限制设置为 10。在那些您知道任务将永远失败且不希望它使用剩余尝试的情况下,您可以抛出 QueueFatalErrorException 异常。消息将被记录到常规日志中,尝试限制将被设置为最大值,这将防止作业生成更多尝试。

运行这些消费者与提到的包相同:./bin/console rabbitmq:consumer example_consumer -vv

维护和部署

为了优雅地关闭您的队列或停止其进度,我们定义了以下控制台命令。这将允许轻松维护队列消费者/工作者。

queue:maintenance:up
queue:maintenance:down
queue:maintenance:wait

本质上,queue:maintenance:up 将阻止新作业被工作者处理。当维护模式处于开启状态时,将作业发送到工作者,工作者将立即以代码 0 退出。这对于使用类似 supervisord 的东西非常有用,因为 supervisord 默认不会重启退出状态为该状态的程序。

重要的是要理解 queue:maintenance:up 的主要目的是阻止新作业被处理。该命令不是用于停止/重启工作者(尽管大多数情况下确实会发生)。请使用以下描述的“新部署检测”功能来重启工作者。

queue:maintenance:wait 将轮询数据库中的运行任务,此命令仅在没有任何任务被标记为运行时退出。使用 -r 参数可以配置它轮询数据库的秒数。此外,作为 symfony 命令,您可以在部署工具中抑制输出 -q

然后,当然,queue:maintenance:down 将允许队列再次运行。尽管您需要手动重启它们,因为它们已经退出了。

在您禁用维护模式之前,您需要确保所有旧空闲工作者都已重新启动。有几种方法可以做到这一点。此捆绑包为您提供了一个所有工作者在检测到新部署时退出的方法。这个特性叫做“新部署检测”,需要您调用queue:store-new-deployment-stamp-command并传递一个可以用来比较部署的字符串(通常时间戳就足够了)。确保您配置此捆绑包以实际使用此功能。

本质上,您的构建脚本可能看起来像这样

./bin/console queue:maintenance:up
./bin/console queue:maintenance:wait -q -r 10

# deploy code
# database migrations
# cache cleaning/warming

./bin/console queue:store-new-deployment-stamp-command `date +%s` 

./bin/console queue:maintenance:down
# "supervisorctl reread"
# "supervisorctl update"

由于任务及其全部负载数据都存储在数据库中,因此可以重新启动所有任务。如果您需要升级或迁移RabbitMQ实例,这将很有用。此时我们创建了此命令(抱歉),但是通过复制您最初启动任务的流程,只是循环遍历数据库中标记为pendingstatus = 1的所有条目,就足够简单了。有关更多信息,请参阅QueueTaskInterface

提示

RabbitMQ vhosts

官方文档

如果您使用一个RabbitMQ服务器来托管应用程序的多个部署的队列(例如,在一个RabbitMQ服务器上托管测试、预发布、回归环境),此功能可以让您这样做而不会遇到队列名称冲突。

要使用它,设置以下选项

  1. old_sound_rabbit_mq.connections.default.vhost
  2. printedcom_rabbitmq_queue_bundle.rabbitmq_vhost

进程的Supervisord组。

官方文档

这允许您将所有队列消费者收集在一个Supervisord进程组中,这反过来又允许您通过该组名称启动和停止它们,并允许您在单个机器上运行来自多个应用程序环境的队列消费者,而不会遇到名称冲突。

这也是停止您的消费者、部署您的代码、重新读取Supervisord配置以及重新启动消费者的更合适的方式。这取代了此捆绑包的“新部署检测”功能。

队列任务优先级

官方文档

RabbitMQ提供了一种按不同优先级分配和处理队列任务的方法。此捆绑包了解此功能,并提供了一种利用它的方法。请参阅:AbstractQueuePayload::__construct()$queueMessageProperties参数。

此功能有用之处之一是当您得出结论,您有n个消费者大部分都在做无用功,但仍然占用服务器的资源(RAM)时。您可以重新组织您的代码,使用单个队列消费者监听单个、共享的队列,并处理具有不同优先级的队列任务。此捆绑包不提供任何关于此类实现的意见,但是具有5个优先级的共享队列(最低、低、正常、高、最高)是一个合理的设置。

测试和贡献

遗憾的是目前没有测试,实际上没有什么可以测试的。但如果有任何东西要贡献,请现在打开一个pull-request。只需保持代码干净!