lamoda / queue-bundle
Symfony 队列组件
Requires
- php: >=7.2.0
- ext-json: *
- ext-pdo: *
- doctrine/doctrine-bundle: ^1.9 || ^2.0
- doctrine/orm: ~2.3
- jms/serializer-bundle: ^2.4 || ^3.0
- php-amqplib/php-amqplib: ~3.0
- php-amqplib/rabbitmq-bundle: ~2.9.0
- symfony/config: ^4.1 || ^5.0
- symfony/console: ^4.2 || ^5.0
- symfony/dependency-injection: ^4.2 || ^5.0
- symfony/event-dispatcher: ^4.3 || ^5.0
- symfony/framework-bundle: ^4.3
- symfony/monolog-bundle: ^3.3
Requires (Dev)
- codeception/codeception: ^4.1
- codeception/module-asserts: ^1.3
- friendsofphp/php-cs-fixer: ^2.13
- phpunit/phpunit: ^8.0
- dev-master
- 3.0.0
- 2.2.2
- 2.2.1
- 2.2.0
- 2.1.1
- 2.1.0
- 2.0.1
- 2.0.0
- 1.2.0
- 1.1.0
- 1.0.3
- 1.0.2
- 1.0.1
- 1.0.0
- dev-log-for-queue-creating
- dev-feature/SCENTRE-6712-low-log-level
- dev-feature/upgrade-php-amqplib-to-v2.9
- dev-feature/SCENTRE-6184-republish-queue-in-initial-status
- dev-feature/SCENTRE-5889_extend_retry_sending_time
- dev-php74-symfony5
- dev-feature/SCENTRE-5818
- dev-feature/SCENTRE-5744
This package is auto-updated.
Last update: 2023-06-29 01:22:25 UTC
README
Symfony 组件,用于方便地处理队列。目前它支持 RabbitMQ。
安装
-
安装组件
composer require lamoda/queue-bundle
-
扩展
Lamoda\QueueBundle\Entity\QueueEntityMappedSuperclass
use Doctrine\ORM\Mapping as ORM; use Lamoda\QueueBundle\Entity\QueueEntityMappedSuperclass; /** * @ORM\Entity(repositoryClass="Lamoda\QueueBundle\Entity\QueueRepository") */ class Queue extends QueueEntityMappedSuperclass { }
-
配置组件参数
lamoda_queue: ## required entity_class: App\Entity\Queue max_attempts: 5 batch_size_per_requeue: 5 batch_size_per_republish: 5 ## optional (will use for default delay Geometric Progression Strategy) strategy_delay_geometric_progression_start_interval_sec: 60 strategy_delay_geometric_progression_multiplier: 2
-
注册组件
class AppKernel extends Kernel { // ... public function registerBundles() { $bundles = [ // ... new Lamoda\QueueBundle\LamodaQueueBundle(), // ... ]; return $bundles; } // ... }
或将它添加到
config/bundles.php
return [ // ... Lamoda\QueueBundle\LamodaQueueBundle::class => ['all' => true], // ... ];
-
迁移模式
doctrine:migrations:diff
生成queue
表的迁移doctrine:migrations:migrate
- 应用迁移
设置
创建新的交换机
-
定义新的交换机常量
namespace App\Constant; class Exchanges { public const DEFAULT = 'default'; }
-
将新的节点添加到
old_sound_rabbit_mq.producers
中,使用之前定义的常量名称,例如old_sound_rabbit_mq: producers: default: connection: default exchange_options: name: !php/const App\Constant\Exchanges::DEFAULT type: "direct"
创建新的队列
-
定义新的队列常量
namespace App\Constant; class Queues { public const NOTIFICATION = 'notification'; }
-
在
old_sound_rabbit_mq.consumers
中注册队列消费者,使用之前定义的常量名称,例如old_sound_rabbit_mq: consumers: notification: connection: default exchange_options: name: !php/const App\Constant\Exchanges::DEFAULT type: "direct" queue_options: name: !php/const App\Constant\Queues::NOTIFICATION routing_keys: - !php/const App\Constant\Queues::NOTIFICATION callback: "lamoda_queue.consumer"
-
创建作业类,通过示例扩展
AbstractJob
namespace App\Job; use App\Constant\Exchanges; use App\Constant\Queues; use Lamoda\QueueBundle\Job\AbstractJob; use JMS\Serializer\Annotation as JMS; class SendNotificationJob extends AbstractJob { /** * @var string * * @JMS\Type("int") */ private $message; public function __construct(string $message) { $this->message = $message; } public function getDefaultQueue(): string { return Queues::NOTIFICATION; } public function getDefaultExchange(): string { return Exchanges::DEFAULT; } }
-
创建作业处理器,通过示例实现 HandlerInterface
namespace App\Handler; use Lamoda\QueueBundle\Handler\HandlerInterface; use Lamoda\QueueBundle\QueueInterface; class SendNotificationHandler implements HandlerInterface { public function handle(QueueInterface $job): void { // implement service logic here } }
-
在服务容器中标记处理器
services: App\Handler\SendNotificationHandler: public: true tags: - { name: queue.handler, handle: App\Job\SendNotificationJob }
-
配置延迟策略参数(可选)
lamoda_queue: ... queues: queue_one: 'delay_arithmetic_progression' queue_two: 'delay_special_geometric_progression' # Settings of special behaviors for Delay strategies (optional) services: lamoda_queue.strategy.delay.arithmetic_progression: class: Lamoda\QueueBundle\Strategy\Delay\ArithmeticProgressionStrategy tags: - { name: 'lamoda_queue_strategy', key: 'delay_arithmetic_progression' } arguments: - 60 # start_interval_sec parameter - 1700 # multiplier parameter lamoda_queue.strategy.delay.geometric_progression: class: Lamoda\QueueBundle\Strategy\Delay\GeometricProgressionStrategy tags: - { name: 'lamoda_queue_strategy', key: 'delay_special_geometric_progression' } arguments: - 70 # start_interval_sec parameter - 4 # multiplier parameter
在这个块中,您可以针对每个队列配置特殊的延迟行为。为此,您必须注册使用几种基本策略之一(ArithmeticProgressionStrategy、GeometricProgressionStrategy)或您自己的策略(为此,您必须创建一个实现 DelayStrategyInterface 的服务类)的服务。
每个策略服务都必须有一个名为
lamoda_queue_strategy
的标签和唯一的key
。之后,您可以使用这些keys
来匹配lamoda_queue.queues
部分中的队列。默认情况下,使用 GeometricProgressionStrategy 并带有参数(您可以在
lamoda_queue
配置部分中进行自定义)strategy_delay_geometric_progression_start_interval_sec: 60 strategy_delay_geometric_progression_multiplier: 2
-
在 "codeception.yml" 中的
modules.config.AMQP.queues
添加队列名称 -
执行
./bin/console queue:init
命令
用法
初始化交换机和队列
./bin/console queue:init
将作业添加到队列
$job = new SendNotificationJob($id); $container->get(Lamoda\QueueBundle\Factory\PublisherFactory::class)->publish($job);
运行队列中的作业
./bin/console queue:consume notification
重新入队失败的队列
./bin/console queue:requeue
高级用法
您可以将任何原始类排队,只需实现 QueueInterface
namespace App\Process; use Lamoda\QueueBundle\Entity\QueueInterface; class MyProcess implements QueueInterface { // implement interface functions }
services: App\Handler\MyProcessHandler: public: true tags: - { name: queue.handler, handle: App\Process\MyProcess }
$process = new MyProcess(); $container->get('queue.publisher')->publish($process);
如何重新运行队列
如果您想重新运行队列,则抛出 Lamoda\QueueBundle\Exception\RuntimeException
。
如果您想标记队列为失败,则抛出任何其他类型的异常。
namespace App\Handler; use Lamoda\QueueBundle\Handler\HandlerInterface; use Lamoda\QueueBundle\QueueInterface; class SendNotificationHandler implements HandlerInterface { public function handle(QueueInterface $job): void { // implement service logic here // Rerun queue if ($rerun === true) { throw new Lamoda\QueueBundle\Exception\RuntimeException('Error message'); } // Mark queue as failed if ($failed === true) { throw new \Exception(); } } }
默认情况下,延迟时间以指数方式计算。您可以通过配置来影响它。
lamoda_queue: ## required ## ... max_attempts: 5 ## optional strategy_delay_geometric_progression_start_interval_sec: 60 strategy_delay_geometric_progression_multiplier: 2
事件
Lamoda\QueueBundle\Event\QueueAttemptsReachedEvent
当消费者想要执行达到最大尝试次数的队列时。
属性
- 队列实体
QueueAttemptsReachedEvent::getQueue()
开发
PHP编码标准修复器
make php-cs-check make php-cs-fix
测试
单元
make test-unit