mkoprek/rabbitmq-dlq-bundle

安装: 0

依赖: 0

建议者: 0

安全: 0

星星: 1

关注者: 2

分支: 0

开放问题: 0

类型:symfony-bundle

1.0.0 2021-08-18 14:03 UTC

This package is auto-updated.

Last update: 2024-09-19 22:51:37 UTC


README

Build Status codecov

配置

此包是RabbitMqBundle的扩展,它会自动将DLQ队列添加到现有的old_sound_rabbit_mq.yaml中的multiple_consumer

    multiple_consumers:
        default:
            connection: default
            exchange_options:
                name: 'exchange'
                type: 'topic'
            graceful_max_execution:
                timeout: 60
            queues:
                legacy.investments.investment_added.event:
                    name: 'legacy.investments.investment_added.event'
                    routing_key: 'legacy.investments.investment_added.event'
                    callback: Namespace\InvestmentAddedLegacyConsumer
                legacy.investments.investment_edited.event:
                    name: 'legacy.investments.investment_edited.event'
                    routing_key: 'legacy.investments.investment_edited.event'
                    callback: Namespace\InvestmentEditedLegacyConsumer

配置之后,您将拥有2个额外的DLQ队列,具有路由键

  • legacy.investments.investment_added.retry
  • legacy.investments.investment_edited.retry

每个*.retry队列将在延迟30秒后重新路由所有消息回原始队列。

要将消息放入*.retry队列,您只需在解析消息时抛出任何异常

消费

您正在创建消费者,就像上面的例子一样 - 通过添加回调。此回调必须扩展AbstractMessageConsumer

就是这样!如果一切正常,只需让它保持原样。
如果有任何问题,那么抛出异常。

生产

只需将MessageProducerInterface注入到您需要生产消息的服务中。然后创建一个扩展AbstractMessage或实现MessageInterface的类。

消息

<?php
declare(strict_types=1);

use MKoprek\RabbitmqDlqBundle\Message\AbstractMessage;

class Message extends AbstractMessage
{
    public const ROUTING_KEY = 'legacy.investments.investment_added.event';

    public function __construct(array $array)
    {
        $this->payload = [
            'id' => '7186971d-1b63-46ba-9804-012e8477d370',
            'name' => 'Lorem Ipsum',
            'array' => $array,
        ];
    }
}

生产者

<?php
declare(strict_types=1);

use MKoprek\RabbitmqDlqBundle\Producer\MessageProducerInterface;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class ProduceMessage
{
    public function __construct(private MessageProducerInterface $producer)
    {
    }

    protected function produce(InputInterface $input, OutputInterface $output): void
    {
        $this->producer->produce(
            new Message(['some_key' => 'some_val'])
        );
    }
}