marfatech / rabbit-queue-bundle
为消费者系统提供与RabbitMQ队列交互的可能性
该软件包的官方存储库似乎已丢失,因此该软件包已被冻结
Requires
- php: ^8.0
- ext-json: *
- ext-pcntl: *
- php-amqplib/php-amqplib: ^2.12
- symfony/config: ~4.4||~5.4||~6.0
- symfony/console: ~4.4||~5.4||~6.0
- symfony/dependency-injection: ~4.4||~5.4||~6.0
- symfony/http-kernel: ~4.4||~5.4||~6.0
Requires (Dev)
- phpunit/phpunit: ^7.0||^8.0||^9.0
- psr/log: ~1.0
- symfony/framework-bundle: ^5.3||~6.0
- symfony/monolog-bundle: ^3.7
Suggests
- symfony/monolog-bundle: Provides advanced logging mechanism
- symfony/symfony: Allows more advanced functionality with Symfony
This package is auto-updated.
Last update: 2024-02-13 11:13:43 UTC
README
简介
该软件包提供通过producer-consumer机制处理RabbitMQ队列的工具。
内容
要求
为了正确运行软件包,需要连接以下RabbitMQ插件
安装
步骤 1: 加载软件包
在项目目录中,运行以下命令以加载适合的稳定版本
composer require marfatech/rabbit-queue-bundle
此命令假定Composer已安装并全局可用。
步骤 2: 连接软件包
需要通过将其添加到项目中的app/AppKernel.php
文件中注册的软件包列表中来启用软件包。
<?php // app/AppKernel.php class AppKernel extends Kernel { // ... public function registerBundles() { $bundles = [ // ... new MarfaTech\Bundle\RabbitQueueBundle\MarfaTechRabbitQueueBundle(), ]; return $bundles; } // ... }
配置
要开始使用软件包,需要描述连接到RabbitMQ
的配置。
# app/packages/marfatech_rabbit_queue.yaml marfatech_rabbit_queue: connections: default: host: 'rabbitmq' # хост для подключения к rabbitMQ port: 5672 # порт для подключения к rabbitMQ username: 'rabbitmq_user' # логин для подключения к rabbitMQ password: 'rabbitmq_password' # пароль для подключения к rabbitMQ vhost: 'example_vhost' # виртуальный хост для подключения (необязательный параметр) connection_timeout: 3 # таймаут соединения @deprecated используйте options.connection_timeout read_write_timeout: 3 # таймаут на чтение/запись @deprecated используйте options.read_write_timeout heartbeat: 0 # частота heartbeat @deprecated используйте options.heartbeat options: # опции для попыток подключений ко всем хостам из списка по очереди (необязательный параметр) connection_timeout: 3 # таймаут соединения read_write_timeout: 3 # таймаут на чтение/запись heartbeat: 0 # частота heartbeat lazy_connection: false # Lazy соединение инициализируется в момент использования reconnect_retries: 3 # Количество попыток реконнекта к RabbitMq при потере соединения int или null (по умолчанию 0) consumer: wait_timeout: 3 # таймаут ожидания новых сообщений для обработки пачки в секундах (по умолчанию 3) idle_timeout: 0 # таймаут ожидания сообщений в пустой очереди в секундах (по умолчанию 0 - нет таймаута) batch_timeout: 0 # таймаут сборки пачки сообщений в секундах (по умолчанию 0 - нет таймаута) default_max_processed_tasks_count: 1000 # максимальное количество задач в обработке (по умолчанию 1000)
当指定options
时,配置中的connection_timeout
、read_write_timeout
和heartbeat
值将从其中获取。如果没有指定options
,则这些配置键的值将取自第一个connections
键值。
将依次尝试连接到配置键connection
中指定的主机,并返回第一个成功的连接。多主机连接
如果指定lazy_connection
= true
,则连接将在使用时初始化,而不是在初始化所有类时初始化。
reconnect_retries
参数用于在连接到RabbitMq断开时自动重新连接。它可以接受整数值(最大重连尝试次数),或null
以进行无限重连。
组件描述
生产者
Producer
- 用于向队列发送消息。
为了实现这些目的,在包中实现了 RabbitMqProducer,通过它可以发送带有指定参数的消息到队列。
<?php $data = ['message' => 'example']; # Сообщение $options = ['key' => 'unique_key', 'delay' => 1000]; # Опции, в зависимости от типа очереди $routingKey = 'test.routing.key'; # Ключ маршрутизации сообщения, для очередей с типом `ROUTER` $properties = ['type' => 'test']; # Дополнительные свойства сообщения AMQPMessage /** @var \MarfaTech\Bundle\RabbitQueueBundle\Producer\RabbitMqProducer $producer */ $producer->put('queue_name', $data, $options, $routingKey, $properties);
队列池
默认情况下,所有消息都通过消息池发送到队列,这允许延迟发送累积的消息。可以通过在调用 MarfaTech\Bundle\RabbitQueueBundle\Producer\RabbitMqProducerInterface::put
函数时传递 use-queue-pool
选项来为每条消息单独更改此行为。
<?php use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueOptionEnum; use MarfaTech\Bundle\RabbitQueueBundle\Producer\RabbitMqProducerInterface; $data = ['message' => 'example']; $options = [QueueOptionEnum::USE_QUEUE_POOL => false]; /** @var RabbitMqProducerInterface $producer */ $producer->put('queue_name', $data, $options);
发布者
通过特殊的发布者类来发布队列中的消息。Producer
定义了根据关联的队列类型使用哪个发布者。
因此,对于每种新的队列类型都需要一个具有自定义处理/验证和发布消息到通道逻辑的 Publisher
类。
包支持以下队列和交换机类型
- FIFO
- 延迟
- 去重
- 去重 + 延迟
- 路由器
如果想要添加自己的队列类型,需要创建一个继承自 AbstractPublisher 或实现 PublisherInterface 的类。
示例 DelayPublisher
<?php declare(strict_types=1); namespace MarfaTech\Bundle\RabbitQueueBundle\Publisher; use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueHeaderOptionEnum; use MarfaTech\Bundle\RabbitQueueBundle\Definition\DefinitionInterface; use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueOptionEnum; use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum; use MarfaTech\Bundle\RabbitQueueBundle\Exception\RabbitQueueException; use function is_int; use function sprintf; class DelayPublisher extends AbstractPublisher { public const QUEUE_TYPE = QueueTypeEnum::FIFO | QueueTypeEnum::DELAY; /** * Custom prepare options logic */ protected function prepareOptions(DefinitionInterface $definition, array $options): array { $delay = $options[QueueOptionEnum::DELAY] ?? null; if (!is_int($delay)) { $message = sprintf( 'Element for queue "%s" must be with option %s. See %s', $definition::getQueueName(), QueueOptionEnum::DELAY, QueueOptionEnum::class ); throw new RabbitQueueException($message); } $amqpTableOption[QueueHeaderOptionEnum::X_DELAY] = $delay * 1000; return $amqpTableOption; } /** * Queue type supported by publisher */ public static function getQueueType(): string { return (string) self::QUEUE_TYPE; } }
消费者
Consumer
- 用于从队列中获取和处理消息。
为了实现处理消息的逻辑,需要创建一个实现 ConsumerInterface 或继承自 AbstractConsumer 的类,其中包含一些预定义的方法值。
<?php declare(strict_types=1); namespace Acme\AppBundle\Consumer; use MarfaTech\Bundle\RabbitQueueBundle\Consumer\AbstractConsumer; class ExampleConsumer extends AbstractConsumer { public const DEFAULT_BATCH_SIZE = 100; # Размер пачки /** * {@inheritDoc} */ public function process(array $messageList): void { foreach ($messageList as $item) { $data = $this->decodeMessageBody($item); # Decode message by hydrator // handle some task by specific logic } } /** * {@inheritDoc} */ public function getBindQueueName(): string { return 'example'; } /** * {@inheritDoc} */ public static function getName(): string { return 'example'; } }
在 process()
方法中需要实现处理接收到的消息。消息以批次形式接收,批次大小由常量 DEFAULT_BATCH_SIZE
(默认 = 1)定义。
单个队列中所有消费者的 DEFAULT_BATCH_SIZE
之和不应超过 65535.
Hydrator
为了方便处理不同格式的消息,包提供了一些工具来执行消息的编码/解码(将消息编码/解码为所需的格式)。
默认情况下,以下氢化器可用
- JsonHydrator - 用于处理 json 格式的消息(默认)。
- PlainTextHydrator - 用于处理简单的文本消息。
也可以创建自己的氢化器。为此,需要实现 HydratorInterface 并将配置参数 hydrator_name
的值更改为新氢化器的类型。
定义
RabbitMQ 允许创建复杂的队列方案,由多个相互关联的 exchange
和 queue
组成。
为了方便处理方案,包提供了将队列方案保存到特殊类 Definition
的功能,这些类实现 DefinitionInterface。
FIFO 示例
<?php declare(strict_types=1); namespace MarfaTech\Bundle\RabbitQueueBundle\Definition; use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueEnum; use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum; use PhpAmqpLib\Connection\AMQPStreamConnection; class ExampleFifoDefinition implements DefinitionInterface { public const QUEUE_NAME = QueueEnum::EXAMPLE_FIFO; public const ENTRY_POINT = self::QUEUE_NAME; /** * {@inheritDoc} */ public function init(AMQPStreamConnection $connection): void { $channel = $connection->channel(); $channel->queue_declare( self::ENTRY_POINT, false, true, false, false ); } /** * * {@inheritDoc} */ public function getEntryPointName(): string { return self::ENTRY_POINT; } /** * {@inheritDoc} */ public function getQueueType(): int { return QueueTypeEnum::FIFO; } /** * {@inheritDoc} */ public static function getQueueName(): string { return self::QUEUE_NAME; } }
延迟 + 去重示例
<?php declare(strict_types=1); namespace MarfaTech\Bundle\RabbitQueueBundle\Definition; use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueEnum; use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Wire\AMQPTable; class ExampleDeduplicateDelayDefinition implements DefinitionInterface { public const QUEUE_NAME = QueueEnum::EXAMPLE_DEDUPLICATE_DELAY; public const ENTRY_POINT = self::QUEUE_NAME . '@exchange_deduplication'; private const SECOND_POINT = self::QUEUE_NAME . '@exchange_delay'; private const THIRD_POINT = self::QUEUE_NAME; /** * {@inheritDoc} */ public function init(AMQPStreamConnection $connection): void { $channel = $connection->channel(); $channel->exchange_declare( self::ENTRY_POINT, 'x-message-deduplication', false, true, false, false, false, new AMQPTable(['x-cache-size' => 1_000_000_000]) ); $channel->exchange_declare( self::SECOND_POINT, 'x-delayed-message', false, true, false, false, false, new AMQPTable(['x-delayed-type' => AMQPExchangeType::DIRECT]) ); $channel->queue_declare( self::THIRD_POINT, false, true, false, false ); $channel->exchange_bind(self::SECOND_POINT, self::ENTRY_POINT); $channel->queue_bind(self::THIRD_POINT, self::SECOND_POINT); } /** * {@inheritDoc} */ public function getEntryPointName(): string { return self::ENTRY_POINT; } /** * {@inheritDoc} */ public function getQueueType(): int { return QueueTypeEnum::FIFO | QueueTypeEnum::DEDUPLICATE | QueueTypeEnum::DELAY; } /** * {@inheritDoc} */ public static function getQueueName(): string { return self::QUEUE_NAME; } }
在 init()
方法中,使用标准方法 php-amqplib 声明一个队列结构,该结构包括必要的 exchanges
、queue
和 bindings
。
getEntryPointName()
方法负责确定消息的入口点。入口点可以是 exchange
或 queue
名称,具体取决于方案结构。
getQueueName()
方法返回最终接收消息的队列名称。
消息的生命周期
Сообщение -> Producer -> EntryPoint -> Структура очереди exchanges, bindings -> Queue -> Consumer
因此,producer
将消息发送到入口点,而 consumer
从队列中取出消息。
在简单情况下,当使用常规的 FIFO 队列时,入口点将是队列的名称。
可用的命令
rabbit:consumer:run
- 启动选定的消费者。
php bin/console rabbit:consumer:run <name> # <name> - название консьюмера.
rabbit:definition:update
- 根据现有的Definition
类加载所有RabbitMQ
队列方案。
注意:此命令不会更新现有方案。
php bin/console rabbit:definition:update
rabbit:consumer:list
- 输出在项目中注册的消费者列表。
php bin/console rabbit:consumer:list
命令输出示例
Total consumers count: 2
+--------------------+------------+
| Queue Name | Batch Size |
+--------------------+------------+
| example_first | 1 |
| example_second | 100 |
+--------------------+------------+
使用方法
步骤 1:创建队列方案(Definition)
为了初始化方案,需要创建一个实现 DefinitionInterface 的 Definition 类。在 init
方法中,需要使用标准方法通过 php-amqplib 声明队列结构,包括必要的 exchanges
、queue
和 bindings
。
步骤 2:创建 consumer
接下来,需要创建一个继承自 AbstractConsumer 的 consumer 类。在 process
方法中实现接收到的消息的处理。
如果项目中没有启用 autowire
机制,则需要使用 marfatech_rabbit_queue.consumer
标签注册 consumer。
services: app.acme.consumer: class: Acme\AppBundle\Consumer\ExampleConsumer tags: - { name: marfatech_rabbit_queue.consumer }
步骤 3:加载 RabbitMQ 队列方案
为了将 definition
方案加载到 RabbitMQ,需要执行 rabbit:definition:update
命令。该命令将根据实现 DefinitionInterface 的现有 Definition
类更新方案。
php bin/console rabbit:definition:update
步骤 4:启动 consumer
为了启动 consumer,需要执行 rabbit:consumer:run
命令。为了启动,需要传递特定的 consumer
名称。
启动先前描述的 consumer 的示例
php bin/console rabbit:consumer:run example
要查看所有注册的 consumer 列表,只需执行 rabbit:consumer:list
命令。
使用 RouterPublisher
RouterPublisher
应在需要多个队列且每条消息应立即进入由 routingKey
确定的子集时使用。为此,需要创建一个 Definition
,其中只定义 exchange
类型为 direct
、topic
或 fanout
。这个 Definition
将用作消息的入口点。之后,需要为每个队列创建一个 Definition
,并将它们全部绑定到第一个 Definition
。如果用 Definition
替代队列并绑定,可以创建复杂的路由。
带有 exchange
的 Definition
示例
<?php declare(strict_types=1); namespace Wakeapp\Bundle\RabbitQueueBundle\Definition; use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueEnum; use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum; use PhpAmqpLib\Connection\AMQPStreamConnection; class ExampleTopicExchangeDefinition implements DefinitionInterface { public const QUEUE_NAME = QueueEnum::EXAMPLE_TOPIC_EXCHENGE; public const ENTRY_POINT = self::QUEUE_NAME; /** * {@inheritDoc} */ public function init(AMQPStreamConnection $connection): void { $channel = $connection->channel(); $channel->exchange_declare( self::QUEUE_NAME, 'topic', false, true, ); } /** * {@inheritDoc} */ public function getEntryPointName(): string { return self::ENTRY_POINT; } /** * {@inheritDoc} */ public function getQueueType(): int { return QueueTypeEnum::ROUTER; } /** * {@inheritDoc} */ public static function getQueueName(): string { return self::QUEUE_NAME; } }
用于队列的 Definition
示例
<?php declare(strict_types=1); namespace Wakeapp\Bundle\RabbitQueueBundle\Definition; use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueEnum; use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum; use PhpAmqpLib\Connection\AMQPStreamConnection; class ExampleRoutedQueryDefinition implements DefinitionInterface { public const QUEUE_NAME = QueueEnum::EXAMPLE_ROUTED_FIFO; public const ENTRY_POINT = QueueEnum::EXAMPLE_TOPIC_EXCHENGE; // это QUEUE_NAME из примера выше public const ROUTING = [ '*.orange.*', 'big.#', '*.black.car' ]; /** * {@inheritDoc} */ public function init(AMQPStreamConnection $connection): void { $channel = $connection->channel(); $channel->queue_declare( self::QUEUE_NAME, false, true, false, false ); foreach (self::ROUTING as $route) { $channel->queue_bind(self::QUEUE_NAME, self::ENTRY_POINT, $route); // биндим на exchange из первой Definition } } /** * {@inheritDoc} */ public function getEntryPointName(): string { return self::ENTRY_POINT; } /** * {@inheritDoc} */ public function getQueueType(): int { return QueueTypeEnum::FIFO; } /** * {@inheritDoc} */ public static function getQueueName(): string { return self::QUEUE_NAME; } }
定义了交易所和队列后,发送消息的方式与以前相同,但只有当 routingKey(方法 put() 的第四个参数)合适时,消息才会进入队列。
<?php $data = ['message' => 'example']; # Сообщение $options = []; /** @var \Wakeapp\Bundle\RabbitQueueBundle\Producer\RabbitMqProducer $producer */ $producer->put('queue_name', $data, $options, 'small.orange.bicycle'); // попадет в очередь по роуту '*.orange.*' $producer->put('queue_name', $data, $options, 'big.aaa.bbb.and.more.words'); // попадет в очередь по роуту 'big.#' $producer->put('queue_name', $data, $options, 'small.black.bicycle'); // НЕ попадет в очередь из примера
重要!!!routeKey 的长度不应超过 255 个字符
示例
使用 RewindPartialException
为了将消息回滚到队列的末尾,需要抛出异常 RewindPartialException。第一个参数接受一个消息标识符(标签)数组。第二个参数是一个数组,其中键是消息标签,值是消息上下文。通过上下文可以管理消息处理逻辑。获取上下文
$headers = $message->get('application_headers'); $context = $headers->getNativeData()[QueueHeaderOptionEnum::X_CONTEXT];