sylius-labs/rabbitmq-simplebus-bundle

将 RabbitMQ 与 SimpleBus 集成。

安装次数: 50,309

依赖者: 1

建议者: 0

安全: 0

星标: 11

关注者: 5

分支: 4

开放问题: 1

类型:symfony-bundle

v1.1.0 2018-09-06 12:18 UTC

This package is auto-updated.

Last update: 2024-09-06 09:29:30 UTC


README

将来自 RabbitMQ 的 AMQP 消息转换为由 SimpleBus 处理的事件。

安装

  1. 需要此包
$ composer require sylius-labs/rabbitmq-simplebus-bundle
  1. 将捆绑添加到 AppKernel.php
public function registerBundles()
{
    $bundles = [
        new \SyliusLabs\RabbitMqSimpleBusBundle\RabbitMqSimpleBusBundle(),
    ];

    return array_merge(parent::registerBundles(), $bundles);
}

使用方法

  1. 创建自定义的 AMQP 消息反序列化器
// src/Acme/CustomDenormalizer.php

namespace Acme;

use PhpAmqpLib\Message\AMQPMessage;
use SyliusLabs\RabbitMqSimpleBusBundle\Denormalizer\DenormalizationFailedException;
use SyliusLabs\RabbitMqSimpleBusBundle\Denormalizer\DenormalizerInterface;

class CustomDenormalizer implements DenormalizerInterface
{
    public function supports(AMQPMessage $message)
    {
        return null !== json_decode($message->getBody(), true);
    }

    public function denormalize(AMQPMessage $message)
    {
        if (!$this->supports($message)) {
            throw new DenormalizationFailedException('Unsupported message!');
        }

        return new CustomEvent(json_decode($message->getBody(), true));
    }
}
  1. 使用 rabbitmq_simplebus.amqp_denormalizer 标记您的反序列化器服务
<!-- app/config/services.xml -->

<?xml version="1.0" encoding="UTF-8"?>
<container xmlns="https://symfony.ac.cn/schema/dic/services" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="https://symfony.ac.cn/schema/dic/services https://symfony.ac.cn/schema/dic/services/services-1.0.xsd">
    <services>
        <service id="acme.custom_denormalizer" class="Acme\CustomDenormalizer">
            <tag name="rabbitmq_simplebus.amqp_denormalizer" />
        </service>
    </services>
</container>
# app/config/services.yml

services:
    acme.custom_denormalizer:
        class: Acme\CustomDenormalizer
        tags:
            - { name: rabbitmq_simplebus.amqp_denormalizer }
  1. 配置 RabbitMQ 消费者:
# app/config/config.yml

old_sound_rabbit_mq:
    connections:
        default:
            host: 'localhost'
            port: 5672
            user: 'guest'
            password: 'guest'
    consumers:
        rabbitmq_simplebus:
            connection: default
            exchange_options: { name: 'rabbitmq-simplebus', type: direct }
            queue_options: { name: 'rabbitmq-simplebus' }
            callback: rabbitmq_simplebus.consumer
  1. 运行 RabbitMQ 消费者
$ bin/console rabbitmq:consumer rabbitmq_simplebus