sylius-labs / rabbitmq-simplebus-bundle
将 RabbitMQ 与 SimpleBus 集成。
v1.1.0
2018-09-06 12:18 UTC
Requires
- php: ^7.1
- php-amqplib/rabbitmq-bundle: ^1.12
- simple-bus/message-bus: ^3.0
- simple-bus/symfony-bridge: ^5.1
- symfony/monolog-bundle: ^3.1
Requires (Dev)
This package is auto-updated.
Last update: 2024-09-06 09:29:30 UTC
README
将来自 RabbitMQ 的 AMQP 消息转换为由 SimpleBus 处理的事件。
安装
- 需要此包
$ composer require sylius-labs/rabbitmq-simplebus-bundle
- 将捆绑添加到
AppKernel.php
public function registerBundles() { $bundles = [ new \SyliusLabs\RabbitMqSimpleBusBundle\RabbitMqSimpleBusBundle(), ]; return array_merge(parent::registerBundles(), $bundles); }
使用方法
- 创建自定义的 AMQP 消息反序列化器
// src/Acme/CustomDenormalizer.php namespace Acme; use PhpAmqpLib\Message\AMQPMessage; use SyliusLabs\RabbitMqSimpleBusBundle\Denormalizer\DenormalizationFailedException; use SyliusLabs\RabbitMqSimpleBusBundle\Denormalizer\DenormalizerInterface; class CustomDenormalizer implements DenormalizerInterface { public function supports(AMQPMessage $message) { return null !== json_decode($message->getBody(), true); } public function denormalize(AMQPMessage $message) { if (!$this->supports($message)) { throw new DenormalizationFailedException('Unsupported message!'); } return new CustomEvent(json_decode($message->getBody(), true)); } }
- 使用
rabbitmq_simplebus.amqp_denormalizer
标记您的反序列化器服务
<!-- app/config/services.xml --> <?xml version="1.0" encoding="UTF-8"?> <container xmlns="https://symfony.ac.cn/schema/dic/services" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="https://symfony.ac.cn/schema/dic/services https://symfony.ac.cn/schema/dic/services/services-1.0.xsd"> <services> <service id="acme.custom_denormalizer" class="Acme\CustomDenormalizer"> <tag name="rabbitmq_simplebus.amqp_denormalizer" /> </service> </services> </container>
# app/config/services.yml services: acme.custom_denormalizer: class: Acme\CustomDenormalizer tags: - { name: rabbitmq_simplebus.amqp_denormalizer }
# app/config/config.yml old_sound_rabbit_mq: connections: default: host: 'localhost' port: 5672 user: 'guest' password: 'guest' consumers: rabbitmq_simplebus: connection: default exchange_options: { name: 'rabbitmq-simplebus', type: direct } queue_options: { name: 'rabbitmq-simplebus' } callback: rabbitmq_simplebus.consumer
- 运行 RabbitMQ 消费者
$ bin/console rabbitmq:consumer rabbitmq_simplebus