mkoprek / rabbitmq-dlq-bundle
1.0.0
2021-08-18 14:03 UTC
Requires
- php: >=8.0
- php-amqplib/rabbitmq-bundle: ^2.8
- symfony/config: *
- symfony/dependency-injection: *
- symfony/http-kernel: *
Requires (Dev)
- phpstan/phpstan: ^0.12
- phpunit/phpunit: ^9.5
- slevomat/coding-standard: ^7.0
README
配置
此包是RabbitMqBundle的扩展,它会自动将DLQ队列添加到现有的old_sound_rabbit_mq.yaml
中的multiple_consumer
multiple_consumers: default: connection: default exchange_options: name: 'exchange' type: 'topic' graceful_max_execution: timeout: 60 queues: legacy.investments.investment_added.event: name: 'legacy.investments.investment_added.event' routing_key: 'legacy.investments.investment_added.event' callback: Namespace\InvestmentAddedLegacyConsumer legacy.investments.investment_edited.event: name: 'legacy.investments.investment_edited.event' routing_key: 'legacy.investments.investment_edited.event' callback: Namespace\InvestmentEditedLegacyConsumer
配置之后,您将拥有2个额外的DLQ队列,具有路由键
- legacy.investments.investment_added.retry
- legacy.investments.investment_edited.retry
每个*.retry队列将在延迟30秒后重新路由所有消息回原始队列。
要将消息放入*.retry队列,您只需在解析消息时抛出任何异常。
消费
您正在创建消费者,就像上面的例子一样 - 通过添加回调。此回调必须扩展AbstractMessageConsumer
。
就是这样!如果一切正常,只需让它保持原样。
如果有任何问题,那么抛出异常。
生产
只需将MessageProducerInterface
注入到您需要生产消息的服务中。然后创建一个扩展AbstractMessage
或实现MessageInterface
的类。
消息
<?php declare(strict_types=1); use MKoprek\RabbitmqDlqBundle\Message\AbstractMessage; class Message extends AbstractMessage { public const ROUTING_KEY = 'legacy.investments.investment_added.event'; public function __construct(array $array) { $this->payload = [ 'id' => '7186971d-1b63-46ba-9804-012e8477d370', 'name' => 'Lorem Ipsum', 'array' => $array, ]; } }
生产者
<?php declare(strict_types=1); use MKoprek\RabbitmqDlqBundle\Producer\MessageProducerInterface; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; class ProduceMessage { public function __construct(private MessageProducerInterface $producer) { } protected function produce(InputInterface $input, OutputInterface $output): void { $this->producer->produce( new Message(['some_key' => 'some_val']) ); } }