leberknecht / amqp-rpc-transporter-bundle
此包为消息传递组件实现了AMQP协议的RPC功能
v1.0.0
2020-11-24 16:55 UTC
Requires
- php: >=7.3
- ext-amqp: *
- symfony/framework-bundle: >=4.0
- symfony/messenger: >=4.3-dev
Requires (Dev)
This package is auto-updated.
Last update: 2024-09-23 11:46:26 UTC
README
一个为将RPC功能引入到Symfony消息传递组件的AMQP传输器的解决方案包
功能
此包引入了 ampqp-rpc
传输器,它与正常的 amqp
传输器相同,但会使用 reply_to
和 correlation_id
标头。接收时,将 HandledStamp
的内容发布到 reply_to
字段中指定的队列。发送时,将生成一个随机队列名称,发布原始消息后,我们将在该队列上等待响应,然后将带有结果的 ResponseStamp
添加到信封中。
安装
composer require leberknecht/amqp-rpc-transporter-bundle
使用方法
# messenger.yaml framework: messenger: transports: rpc_calls: dsn: 'amqp-rpc://user:password@rabbit-mq-host/%2f/rpc_calls' routing: # Route your messages to the transports 'App\Message\RpcCallMessage': rpc_calls
然后可以这样使用
// send $rpcCallMessage = new RpcCallMessage(); $envelope = $this->messageBus->dispatch($rpcCallMessage); /** @var ResponseStamp $response */ $response = $envelope->last(ResponseStamp::class); $result = $response->getResult();
要设置处理器的结果,只需返回一些内容即可
final public function __invoke(RpcCallMessage $message): void { [...] return 42; }
备注
这是一个正在进行中的解决方案,作为一个初步的尝试。更好的方法可能是覆盖 messenger.transport.amqp.factory
服务并添加 rpc: true
和 rpc_queue_name
到消息传递配置中,这样我们就扩展了现有的传输器而不是引入这个新的传输器。另外请注意:在这个状态下,我们总是会为响应生成一个具有随机名称的独占队列。这对于高负载队列来说不是很理想,请参阅 https://rabbitmq.cn/tutorials/tutorial-six-python.html