leberknecht/amqp-rpc-transporter-bundle

此包为消息传递组件实现了AMQP协议的RPC功能

v1.0.0 2020-11-24 16:55 UTC

This package is auto-updated.

Last update: 2024-09-23 11:46:26 UTC


README

一个为将RPC功能引入到Symfony消息传递组件的AMQP传输器的解决方案包

功能

此包引入了 ampqp-rpc 传输器,它与正常的 amqp 传输器相同,但会使用 reply_tocorrelation_id 标头。接收时,将 HandledStamp 的内容发布到 reply_to 字段中指定的队列。发送时,将生成一个随机队列名称,发布原始消息后,我们将在该队列上等待响应,然后将带有结果的 ResponseStamp 添加到信封中。

安装

composer require leberknecht/amqp-rpc-transporter-bundle

使用方法

# messenger.yaml
framework:
    messenger:
        transports:
            rpc_calls:
                dsn: 'amqp-rpc://user:password@rabbit-mq-host/%2f/rpc_calls'
        routing:
            # Route your messages to the transports
            'App\Message\RpcCallMessage': rpc_calls

然后可以这样使用

    // send
    $rpcCallMessage = new RpcCallMessage();

    $envelope = $this->messageBus->dispatch($rpcCallMessage);
    /** @var ResponseStamp $response */
    $response = $envelope->last(ResponseStamp::class);
    $result = $response->getResult();

要设置处理器的结果,只需返回一些内容即可

    final public function __invoke(RpcCallMessage $message): void
    {
        [...]
        return 42;
    }

备注

这是一个正在进行中的解决方案,作为一个初步的尝试。更好的方法可能是覆盖 messenger.transport.amqp.factory 服务并添加 rpc: truerpc_queue_name 到消息传递配置中,这样我们就扩展了现有的传输器而不是引入这个新的传输器。另外请注意:在这个状态下,我们总是会为响应生成一个具有随机名称的独占队列。这对于高负载队列来说不是很理想,请参阅 https://rabbitmq.cn/tutorials/tutorial-six-python.html