lamoda/queue-bundle

此包已被废弃,不再维护。没有建议的替代包。

Symfony 队列组件


README

Build Status Scrutinizer Code Quality Code Coverage Build Status

Symfony 组件,用于方便地处理队列。目前它支持 RabbitMQ。

安装

  1. 安装组件

    composer require lamoda/queue-bundle
  2. 扩展 Lamoda\QueueBundle\Entity\QueueEntityMappedSuperclass

    use Doctrine\ORM\Mapping as ORM;
    use Lamoda\QueueBundle\Entity\QueueEntityMappedSuperclass;
    
    /**
     * @ORM\Entity(repositoryClass="Lamoda\QueueBundle\Entity\QueueRepository")
     */
    class Queue extends QueueEntityMappedSuperclass
    {
    }
  3. 配置组件参数

    lamoda_queue:
        ## required
        entity_class: App\Entity\Queue
        max_attempts: 5
        batch_size_per_requeue: 5
        batch_size_per_republish: 5
        ## optional (will use for default delay Geometric Progression Strategy)
        strategy_delay_geometric_progression_start_interval_sec: 60
        strategy_delay_geometric_progression_multiplier: 2
  4. 注册组件

    class AppKernel extends Kernel
    {
        // ...
        
        public function registerBundles()
        {
            $bundles = [
                // ...
                new Lamoda\QueueBundle\LamodaQueueBundle(),
                // ...
            ];
    
            return $bundles;
        }
        
        // ...
    }

    或将它添加到 config/bundles.php

    return [
        // ...
        Lamoda\QueueBundle\LamodaQueueBundle::class => ['all' => true],
        // ...
    ];
  5. 迁移模式

    1. doctrine:migrations:diff 生成 queue 表的迁移
    2. doctrine:migrations:migrate - 应用迁移

设置

创建新的交换机

  1. 定义新的交换机常量

    namespace App\Constant;
    
    class Exchanges
    {
        public const DEFAULT = 'default';
    }
  2. 将新的节点添加到 old_sound_rabbit_mq.producers 中,使用之前定义的常量名称,例如

    old_sound_rabbit_mq:
        producers:
            default:
                connection: default
                exchange_options:
                    name: !php/const App\Constant\Exchanges::DEFAULT
                    type: "direct"

创建新的队列

  1. 定义新的队列常量

    namespace App\Constant;
    
    class Queues
    {
        public const NOTIFICATION = 'notification';
    }
  2. old_sound_rabbit_mq.consumers 中注册队列消费者,使用之前定义的常量名称,例如

    old_sound_rabbit_mq:
        consumers:
            notification:
                connection: default
                exchange_options:
                    name: !php/const App\Constant\Exchanges::DEFAULT
                    type: "direct"
                queue_options:
                    name: !php/const App\Constant\Queues::NOTIFICATION
                    routing_keys:
                      - !php/const App\Constant\Queues::NOTIFICATION
                callback: "lamoda_queue.consumer"
  3. 创建作业类,通过示例扩展 AbstractJob

    namespace App\Job;
    
    use App\Constant\Exchanges;
    use App\Constant\Queues;
    use Lamoda\QueueBundle\Job\AbstractJob;
    use JMS\Serializer\Annotation as JMS;
    
    class SendNotificationJob extends AbstractJob
    {
        /**
         * @var string
         *
         * @JMS\Type("int")
         */
        private $message;
    
        public function __construct(string $message)
        {
            $this->message = $message;
        }
    
        public function getDefaultQueue(): string
        {
            return Queues::NOTIFICATION;
        }
    
        public function getDefaultExchange(): string
        {
            return Exchanges::DEFAULT;
        }
    }
  4. 创建作业处理器,通过示例实现 HandlerInterface

    namespace App\Handler;
    
    use Lamoda\QueueBundle\Handler\HandlerInterface;
    use Lamoda\QueueBundle\QueueInterface;
    
    class SendNotificationHandler implements HandlerInterface
    {
        public function handle(QueueInterface $job): void
        {
            // implement service logic here
        }
    }
  5. 在服务容器中标记处理器

    services:
        App\Handler\SendNotificationHandler:
            public: true
            tags:
                - { name: queue.handler, handle: App\Job\SendNotificationJob }
  6. 配置延迟策略参数(可选)

    lamoda_queue:
    ...
        queues:
            queue_one: 'delay_arithmetic_progression'
            queue_two: 'delay_special_geometric_progression'
    
    # Settings of special behaviors for Delay strategies (optional)
    services:
      lamoda_queue.strategy.delay.arithmetic_progression:
        class: Lamoda\QueueBundle\Strategy\Delay\ArithmeticProgressionStrategy
        tags:
          - { name: 'lamoda_queue_strategy', key: 'delay_arithmetic_progression' }
        arguments:
          - 60 # start_interval_sec parameter
          - 1700 # multiplier parameter
      
     lamoda_queue.strategy.delay.geometric_progression:
        class: Lamoda\QueueBundle\Strategy\Delay\GeometricProgressionStrategy
        tags:
          - { name: 'lamoda_queue_strategy', key: 'delay_special_geometric_progression' }
        arguments:
          - 70 # start_interval_sec parameter
          - 4 # multiplier parameter
    

    在这个块中,您可以针对每个队列配置特殊的延迟行为。为此,您必须注册使用几种基本策略之一(ArithmeticProgressionStrategy、GeometricProgressionStrategy)或您自己的策略(为此,您必须创建一个实现 DelayStrategyInterface 的服务类)的服务。

    每个策略服务都必须有一个名为 lamoda_queue_strategy 的标签和唯一的 key。之后,您可以使用这些 keys 来匹配 lamoda_queue.queues 部分中的队列。

    默认情况下,使用 GeometricProgressionStrategy 并带有参数(您可以在 lamoda_queue 配置部分中进行自定义)

        strategy_delay_geometric_progression_start_interval_sec: 60
        strategy_delay_geometric_progression_multiplier: 2
    
  7. 在 "codeception.yml" 中的 modules.config.AMQP.queues 添加队列名称

  8. 执行 ./bin/console queue:init 命令

用法

初始化交换机和队列

./bin/console queue:init

将作业添加到队列

$job = new SendNotificationJob($id);
$container->get(Lamoda\QueueBundle\Factory\PublisherFactory::class)->publish($job);

运行队列中的作业

./bin/console queue:consume notification

重新入队失败的队列

./bin/console queue:requeue

高级用法

您可以将任何原始类排队,只需实现 QueueInterface

namespace App\Process;

use Lamoda\QueueBundle\Entity\QueueInterface;

class MyProcess implements QueueInterface
{
    // implement interface functions
}
services:
    App\Handler\MyProcessHandler:
        public: true
        tags:
            - { name: queue.handler, handle: App\Process\MyProcess }
$process = new MyProcess();
$container->get('queue.publisher')->publish($process);

如何重新运行队列

如果您想重新运行队列,则抛出 Lamoda\QueueBundle\Exception\RuntimeException

如果您想标记队列为失败,则抛出任何其他类型的异常。

namespace App\Handler;

use Lamoda\QueueBundle\Handler\HandlerInterface;
use Lamoda\QueueBundle\QueueInterface;

class SendNotificationHandler implements HandlerInterface
{
    public function handle(QueueInterface $job): void
    {
        // implement service logic here
        
        // Rerun queue
        if ($rerun === true) {
            throw new Lamoda\QueueBundle\Exception\RuntimeException('Error message');
        }
        
        // Mark queue as failed
        if ($failed === true) {
            throw new \Exception();
        }
    }
}

默认情况下,延迟时间以指数方式计算。您可以通过配置来影响它。

lamoda_queue:
  ## required
  ## ...
  max_attempts: 5
  ## optional
  strategy_delay_geometric_progression_start_interval_sec: 60
  strategy_delay_geometric_progression_multiplier: 2

事件

Lamoda\QueueBundle\Event\QueueAttemptsReachedEvent

当消费者想要执行达到最大尝试次数的队列时。

属性

  • 队列实体 QueueAttemptsReachedEvent::getQueue()

开发

PHP编码标准修复器

make php-cs-check
make php-cs-fix

测试

单元

make test-unit