marfatech/rabbit-queue-bundle

为消费者系统提供与RabbitMQ队列交互的可能性

该软件包的官方存储库似乎已丢失,因此该软件包已被冻结

安装数: 14,806

依赖: 0

建议: 0

安全: 0

星级: 0

关注者: 5

分支: 0

开放问题: 1

类型:symfony-bundle

v3.4.0-RC1 2023-08-01 09:46 UTC

README

Latest Stable Version Total Downloads

简介

该软件包提供通过producer-consumer机制处理RabbitMQ队列的工具。

内容

  1. 要求
  2. 安装
  3. 配置
  4. 组件描述
  5. 可用命令
  6. 使用
  7. 使用 RouterPublisher
  8. 示例
  9. 许可证

要求

为了正确运行软件包,需要连接以下RabbitMQ插件

安装

步骤 1: 加载软件包

在项目目录中,运行以下命令以加载适合的稳定版本

    composer require marfatech/rabbit-queue-bundle

此命令假定Composer已安装并全局可用。

步骤 2: 连接软件包

需要通过将其添加到项目中的app/AppKernel.php文件中注册的软件包列表中来启用软件包。

<?php
// app/AppKernel.php

class AppKernel extends Kernel
{
    // ...

    public function registerBundles()
    {
        $bundles = [
            // ...

            new MarfaTech\Bundle\RabbitQueueBundle\MarfaTechRabbitQueueBundle(),
        ];

        return $bundles;
    }

    // ...
}

配置

要开始使用软件包,需要描述连接到RabbitMQ的配置。

# app/packages/marfatech_rabbit_queue.yaml
marfatech_rabbit_queue:
    connections:
        default:
            host: 'rabbitmq'              # хост для подключения к rabbitMQ
            port: 5672                    # порт для подключения к rabbitMQ
            username: 'rabbitmq_user'     # логин для подключения к rabbitMQ
            password: 'rabbitmq_password' # пароль для подключения к rabbitMQ
            vhost: 'example_vhost'        # виртуальный хост для подключения (необязательный параметр)
            connection_timeout: 3         # таймаут соединения @deprecated используйте options.connection_timeout
            read_write_timeout: 3         # таймаут на чтение/запись @deprecated используйте options.read_write_timeout
            heartbeat: 0                  # частота heartbeat @deprecated используйте options.heartbeat
    options:                              # опции для попыток подключений ко всем хостам из списка по очереди (необязательный параметр)
        connection_timeout: 3             # таймаут соединения
        read_write_timeout: 3             # таймаут на чтение/запись
        heartbeat: 0                      # частота heartbeat
        lazy_connection: false            # Lazy соединение инициализируется в момент использования
        reconnect_retries: 3              # Количество попыток реконнекта к RabbitMq при потере соединения int или null (по умолчанию 0)
    consumer:
        wait_timeout: 3                   # таймаут ожидания новых сообщений для обработки пачки в секундах (по умолчанию 3)
        idle_timeout: 0                   # таймаут ожидания сообщений в пустой очереди в секундах (по умолчанию 0 - нет таймаута)
        batch_timeout: 0                  # таймаут сборки пачки сообщений в секундах (по умолчанию 0 - нет таймаута)
        default_max_processed_tasks_count: 1000    # максимальное количество задач в обработке (по умолчанию 1000)

当指定options时,配置中的connection_timeoutread_write_timeoutheartbeat值将从其中获取。如果没有指定options,则这些配置键的值将取自第一个connections键值。

将依次尝试连接到配置键connection中指定的主机,并返回第一个成功的连接。多主机连接

如果指定lazy_connection = true,则连接将在使用时初始化,而不是在初始化所有类时初始化。

reconnect_retries参数用于在连接到RabbitMq断开时自动重新连接。它可以接受整数值(最大重连尝试次数),或null以进行无限重连。

组件描述

生产者

Producer - 用于向队列发送消息。

为了实现这些目的,在包中实现了 RabbitMqProducer,通过它可以发送带有指定参数的消息到队列。

<?php
$data = ['message' => 'example']; # Сообщение
$options = ['key' => 'unique_key', 'delay' => 1000]; # Опции, в зависимости от типа очереди
$routingKey = 'test.routing.key'; # Ключ маршрутизации сообщения, для очередей с типом `ROUTER`
$properties = ['type' => 'test']; # Дополнительные свойства сообщения AMQPMessage

/** @var \MarfaTech\Bundle\RabbitQueueBundle\Producer\RabbitMqProducer $producer */
$producer->put('queue_name', $data, $options, $routingKey, $properties);

队列池

默认情况下,所有消息都通过消息池发送到队列,这允许延迟发送累积的消息。可以通过在调用 MarfaTech\Bundle\RabbitQueueBundle\Producer\RabbitMqProducerInterface::put 函数时传递 use-queue-pool 选项来为每条消息单独更改此行为。

<?php

use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueOptionEnum;
use MarfaTech\Bundle\RabbitQueueBundle\Producer\RabbitMqProducerInterface;

$data = ['message' => 'example'];
$options = [QueueOptionEnum::USE_QUEUE_POOL => false];

/** @var RabbitMqProducerInterface $producer */
$producer->put('queue_name', $data, $options);

发布者

通过特殊的发布者类来发布队列中的消息。Producer 定义了根据关联的队列类型使用哪个发布者。

因此,对于每种新的队列类型都需要一个具有自定义处理/验证和发布消息到通道逻辑的 Publisher 类。

包支持以下队列和交换机类型

  • FIFO
  • 延迟
  • 去重
  • 去重 + 延迟
  • 路由器

路由器用于创建分叉拓扑,如 这里这里 所述。

如果想要添加自己的队列类型,需要创建一个继承自 AbstractPublisher 或实现 PublisherInterface 的类。

示例 DelayPublisher

<?php

declare(strict_types=1);

namespace MarfaTech\Bundle\RabbitQueueBundle\Publisher;

use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueHeaderOptionEnum;
use MarfaTech\Bundle\RabbitQueueBundle\Definition\DefinitionInterface;
use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueOptionEnum;
use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum;
use MarfaTech\Bundle\RabbitQueueBundle\Exception\RabbitQueueException;

use function is_int;
use function sprintf;

class DelayPublisher extends AbstractPublisher
{
    public const QUEUE_TYPE = QueueTypeEnum::FIFO | QueueTypeEnum::DELAY;
    
    /**
    * Custom prepare options logic
    */
    protected function prepareOptions(DefinitionInterface $definition, array $options): array
    {
        $delay = $options[QueueOptionEnum::DELAY] ?? null;

        if (!is_int($delay)) {
            $message = sprintf(
                'Element for queue "%s" must be with option %s. See %s',
                $definition::getQueueName(),
                QueueOptionEnum::DELAY,
                QueueOptionEnum::class
            );

            throw new RabbitQueueException($message);
        }

        $amqpTableOption[QueueHeaderOptionEnum::X_DELAY] = $delay * 1000;

        return $amqpTableOption;
    }

    /**
    * Queue type supported by publisher
    */
    public static function getQueueType(): string
    {
        return (string) self::QUEUE_TYPE;
    }
}

消费者

Consumer - 用于从队列中获取和处理消息。

为了实现处理消息的逻辑,需要创建一个实现 ConsumerInterface 或继承自 AbstractConsumer 的类,其中包含一些预定义的方法值。

<?php

declare(strict_types=1);

namespace Acme\AppBundle\Consumer;

use MarfaTech\Bundle\RabbitQueueBundle\Consumer\AbstractConsumer;

class ExampleConsumer extends AbstractConsumer
{
    public const DEFAULT_BATCH_SIZE = 100; # Размер пачки

    /**
     * {@inheritDoc}
     */
    public function process(array $messageList): void
    {
        foreach ($messageList as $item) {
            $data = $this->decodeMessageBody($item); # Decode message by hydrator

            // handle some task by specific logic
        }
    }

    /**
     * {@inheritDoc}
     */
    public function getBindQueueName(): string
    {
        return 'example';
    }

    /**
     * {@inheritDoc}
     */
    public static function getName(): string
    {
        return 'example';
    }
}

process() 方法中需要实现处理接收到的消息。消息以批次形式接收,批次大小由常量 DEFAULT_BATCH_SIZE (默认 = 1)定义。

单个队列中所有消费者的 DEFAULT_BATCH_SIZE 之和不应超过 65535.

Hydrator

为了方便处理不同格式的消息,包提供了一些工具来执行消息的编码/解码(将消息编码/解码为所需的格式)。

默认情况下,以下氢化器可用

  • JsonHydrator - 用于处理 json 格式的消息(默认)。
  • PlainTextHydrator - 用于处理简单的文本消息。

也可以创建自己的氢化器。为此,需要实现 HydratorInterface 并将配置参数 hydrator_name 的值更改为新氢化器的类型。

定义

RabbitMQ 允许创建复杂的队列方案,由多个相互关联的 exchangequeue 组成。

为了方便处理方案,包提供了将队列方案保存到特殊类 Definition 的功能,这些类实现 DefinitionInterface

FIFO 示例

<?php

declare(strict_types=1);

namespace MarfaTech\Bundle\RabbitQueueBundle\Definition;

use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueEnum;
use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ExampleFifoDefinition implements DefinitionInterface
{
    public const QUEUE_NAME = QueueEnum::EXAMPLE_FIFO;
    public const ENTRY_POINT = self::QUEUE_NAME;

    /**
     * {@inheritDoc}
     */
    public function init(AMQPStreamConnection $connection): void
    {
        $channel = $connection->channel();

        $channel->queue_declare(
            self::ENTRY_POINT,
            false,
            true,
            false,
            false
        );
    }

    /**
     *
     * {@inheritDoc}
     */
    public function getEntryPointName(): string
    {
        return self::ENTRY_POINT;
    }

    /**
     * {@inheritDoc}
     */
    public function getQueueType(): int
    {
        return QueueTypeEnum::FIFO;
    }

    /**
     * {@inheritDoc}
     */
    public static function getQueueName(): string
    {
        return self::QUEUE_NAME;
    }
}

延迟 + 去重示例

<?php

declare(strict_types=1);

namespace MarfaTech\Bundle\RabbitQueueBundle\Definition;

use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueEnum;
use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;

class ExampleDeduplicateDelayDefinition implements DefinitionInterface
{
    public const QUEUE_NAME = QueueEnum::EXAMPLE_DEDUPLICATE_DELAY;
    public const ENTRY_POINT = self::QUEUE_NAME . '@exchange_deduplication';

    private const SECOND_POINT = self::QUEUE_NAME . '@exchange_delay';
    private const THIRD_POINT = self::QUEUE_NAME;

    /**
     * {@inheritDoc}
     */
    public function init(AMQPStreamConnection $connection): void
    {
        $channel = $connection->channel();

        $channel->exchange_declare(
            self::ENTRY_POINT,
            'x-message-deduplication',
            false,
            true,
            false,
            false,
            false,
            new AMQPTable(['x-cache-size' => 1_000_000_000])
        );

        $channel->exchange_declare(
            self::SECOND_POINT,
            'x-delayed-message',
            false,
            true,
            false,
            false,
            false,
            new AMQPTable(['x-delayed-type' => AMQPExchangeType::DIRECT])
        );

        $channel->queue_declare(
            self::THIRD_POINT,
            false,
            true,
            false,
            false
        );

        $channel->exchange_bind(self::SECOND_POINT, self::ENTRY_POINT);
        $channel->queue_bind(self::THIRD_POINT, self::SECOND_POINT);
    }

    /**
     * {@inheritDoc}
     */
    public function getEntryPointName(): string
    {
        return self::ENTRY_POINT;
    }

    /**
     * {@inheritDoc}
     */
    public function getQueueType(): int
    {
        return QueueTypeEnum::FIFO | QueueTypeEnum::DEDUPLICATE | QueueTypeEnum::DELAY;
    }

    /**
     * {@inheritDoc}
     */
    public static function getQueueName(): string
    {
        return self::QUEUE_NAME;
    }
}

init() 方法中,使用标准方法 php-amqplib 声明一个队列结构,该结构包括必要的 exchangesqueuebindings

getEntryPointName() 方法负责确定消息的入口点。入口点可以是 exchangequeue 名称,具体取决于方案结构。

getQueueName() 方法返回最终接收消息的队列名称。

消息的生命周期

Сообщение -> Producer -> EntryPoint -> Структура очереди exchanges, bindings -> Queue -> Consumer

因此,producer 将消息发送到入口点,而 consumer 从队列中取出消息。

在简单情况下,当使用常规的 FIFO 队列时,入口点将是队列的名称。

可用的命令

  1. rabbit:consumer:run - 启动选定的消费者。
php bin/console rabbit:consumer:run <name> # <name> - название консьюмера.
  1. rabbit:definition:update - 根据现有的 Definition 类加载所有 RabbitMQ 队列方案。

注意:此命令不会更新现有方案。

php bin/console rabbit:definition:update
  1. rabbit:consumer:list - 输出在项目中注册的消费者列表。
php bin/console rabbit:consumer:list

命令输出示例

 Total consumers count: 2
+--------------------+------------+
| Queue Name         | Batch Size |
+--------------------+------------+
| example_first      | 1          |
| example_second     | 100        |
+--------------------+------------+

使用方法

步骤 1:创建队列方案(Definition)

为了初始化方案,需要创建一个实现 DefinitionInterface 的 Definition 类。在 init 方法中,需要使用标准方法通过 php-amqplib 声明队列结构,包括必要的 exchangesqueuebindings

创建 Definition 示例

步骤 2:创建 consumer

接下来,需要创建一个继承自 AbstractConsumer 的 consumer 类。在 process 方法中实现接收到的消息的处理。

创建 Consumer 示例

如果项目中没有启用 autowire 机制,则需要使用 marfatech_rabbit_queue.consumer 标签注册 consumer。

services:
    app.acme.consumer:
        class:      Acme\AppBundle\Consumer\ExampleConsumer
        tags:
            - { name: marfatech_rabbit_queue.consumer }

步骤 3:加载 RabbitMQ 队列方案

为了将 definition 方案加载到 RabbitMQ,需要执行 rabbit:definition:update 命令。该命令将根据实现 DefinitionInterface 的现有 Definition 类更新方案。

php bin/console rabbit:definition:update

步骤 4:启动 consumer

为了启动 consumer,需要执行 rabbit:consumer:run 命令。为了启动,需要传递特定的 consumer 名称。

启动先前描述的 consumer 的示例

php bin/console rabbit:consumer:run example

要查看所有注册的 consumer 列表,只需执行 rabbit:consumer:list 命令。

使用 RouterPublisher

RouterPublisher 应在需要多个队列且每条消息应立即进入由 routingKey 确定的子集时使用。为此,需要创建一个 Definition,其中只定义 exchange 类型为 directtopicfanout。这个 Definition 将用作消息的入口点。之后,需要为每个队列创建一个 Definition,并将它们全部绑定到第一个 Definition。如果用 Definition 替代队列并绑定,可以创建复杂的路由。

带有 exchangeDefinition 示例

<?php

declare(strict_types=1);

namespace Wakeapp\Bundle\RabbitQueueBundle\Definition;

use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueEnum;
use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ExampleTopicExchangeDefinition implements DefinitionInterface
{
    public const QUEUE_NAME = QueueEnum::EXAMPLE_TOPIC_EXCHENGE;
    public const ENTRY_POINT = self::QUEUE_NAME;

    /**
     * {@inheritDoc}
     */
    public function init(AMQPStreamConnection $connection): void
    {
        $channel = $connection->channel();

        $channel->exchange_declare(
            self::QUEUE_NAME,
            'topic',
            false,
            true,
        );
    }

    /**
     * {@inheritDoc}
     */
    public function getEntryPointName(): string
    {
        return self::ENTRY_POINT;
    }

    /**
     * {@inheritDoc}
     */
    public function getQueueType(): int
    {
        return QueueTypeEnum::ROUTER;
    }

    /**
     * {@inheritDoc}
     */
    public static function getQueueName(): string
    {
        return self::QUEUE_NAME;
    }
}

用于队列的 Definition 示例

<?php

declare(strict_types=1);

namespace Wakeapp\Bundle\RabbitQueueBundle\Definition;

use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueEnum;
use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ExampleRoutedQueryDefinition implements DefinitionInterface
{
    public const QUEUE_NAME = QueueEnum::EXAMPLE_ROUTED_FIFO;
    public const ENTRY_POINT = QueueEnum::EXAMPLE_TOPIC_EXCHENGE; // это QUEUE_NAME из примера выше
    public const ROUTING = [
        '*.orange.*',
        'big.#',
        '*.black.car'
    ];

    /**
     * {@inheritDoc}
     */
    public function init(AMQPStreamConnection $connection): void
    {
        $channel = $connection->channel();

        $channel->queue_declare(
            self::QUEUE_NAME,
            false,
            true,
            false,
            false
        );
        
        foreach (self::ROUTING as $route) {
            $channel->queue_bind(self::QUEUE_NAME, self::ENTRY_POINT, $route); // биндим на exchange из первой Definition
        }
    }

    /**
     * {@inheritDoc}
     */
    public function getEntryPointName(): string
    {
        return self::ENTRY_POINT;
    }

    /**
     * {@inheritDoc}
     */
    public function getQueueType(): int
    {
        return QueueTypeEnum::FIFO;
    }

    /**
     * {@inheritDoc}
     */
    public static function getQueueName(): string
    {
        return self::QUEUE_NAME;
    }
}

定义了交易所和队列后,发送消息的方式与以前相同,但只有当 routingKey(方法 put() 的第四个参数)合适时,消息才会进入队列。

<?php
$data = ['message' => 'example']; # Сообщение
$options = [];

/** @var \Wakeapp\Bundle\RabbitQueueBundle\Producer\RabbitMqProducer $producer */
$producer->put('queue_name', $data, $options, 'small.orange.bicycle'); // попадет в очередь по роуту '*.orange.*'
$producer->put('queue_name', $data, $options, 'big.aaa.bbb.and.more.words'); // попадет в очередь по роуту 'big.#'
$producer->put('queue_name', $data, $options, 'small.black.bicycle'); // НЕ попадет в очередь из примера

重要!!!routeKey 的长度不应超过 255 个字符

示例

使用 RewindPartialException

为了将消息回滚到队列的末尾,需要抛出异常 RewindPartialException。第一个参数接受一个消息标识符(标签)数组。第二个参数是一个数组,其中键是消息标签,值是消息上下文。通过上下文可以管理消息处理逻辑。获取上下文

$headers = $message->get('application_headers');
$context = $headers->getNativeData()[QueueHeaderOptionEnum::X_CONTEXT];

许可证

license