vrok / messenger-reply
Symfony 消息中间件与邮票,用于回复消息
Requires
- php: ^7.4
- symfony/messenger: ^5.1.2
Requires (Dev)
- friendsofphp/php-cs-fixer: ^2.16.3
- phpunit/phpunit: ^9.2.3
This package is auto-updated.
Last update: 2024-08-29 11:22:53 UTC
README
这是一个库,允许 symfony/messenger 使用结果回复消息。
它旨在用于具有 AMQP 传输和两个通过同一代理相互通信的 symfony 实例的设置:例如,一个网页前端和一个微服务,其中前端向服务发送任务并请求回复,例如生成的 PDF 文件。
设置
在双方安装(您需要在请求方有 ReplyToStamp,在接收方有中间件和邮票):composer require vrok/messenger-reply
您需要请求和回复消息类在双方相等,例如使用共享的 composer 包
namespace MyNamespace\Message; class GeneratePdfMessage { /** * @var string */ private string $latex; public function __construct(string $latex) { $this->latex = $latex; } /** * @return string */ public function getLatex(): string { return $this->latex; } } ... namespace MyNamespace\Message; class PdfResultMessage { private string $pdfContent; public function __construct(string $pdfContent) { $this->pdfContent = $pdfContent; } /** * @return string */ public function getPdfContent(): string { return $this->pdfContent; } }
请求方
添加一个用于共享 AMQP 代理的传输,路由到接收者的 input(具有与接收者的 input 队列相同的交换和队列名称)。
framework: messenger: transports: pdf-requests: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' options: exchange: name: pdf-service type: direct default_publish_routing_key: input queues: input: binding_keys: [input] retry_strategy: max_retries: 3 # milliseconds delay delay: 1000 # causes the delay to be higher before each retry # e.g. 1 second delay, 2 seconds, 4 seconds multiplier: 2 max_delay: 0
添加一个用于共享 AMQP 代理的传输,用于接收回复(匹配接收者的 output 队列的交换和队列名称):我们需要单独的传输,因为 messenger:consume [transportname]
会消耗该传输的所有队列中的所有消息。
framework: messenger: transports: pdf-results: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' options: exchange: name: pdf-service type: direct default_publish_routing_key: output queues: input: binding_keys: [output] retry_strategy: max_retries: 3 # milliseconds delay delay: 1000 # causes the delay to be higher before each retry # e.g. 1 second delay, 2 seconds, 4 seconds multiplier: 2 max_delay: 0
将请求路由到共享传输/队列
framework: messenger: routing: # e.g. 'MyNamespace\GeneratePdfMessage': pdf-requests
回复方
配置中间件服务
services: Vrok\MessengerReply\ReplyMiddleware: tags: - { name: monolog.logger, channel: messenger } calls: - [setLogger, ['@logger']]
在您的消息总线中启用中间件
(我们必须禁用默认中间件并明确定义顺序,因为没有优先级选项,只是将我们的服务添加到中间件选项中会将它添加到 send_middleware 之前。请参阅 symfony/symfony#28568)
framework: messenger: buses: messenger.bus.default: default_middleware: false middleware: - {id: 'add_bus_name_stamp_middleware', arguments: ['messenger.bus.default']} - reject_redelivered_message_middleware - dispatch_after_current_bus - failed_message_processing_middleware - send_message - handle_message - Vrok\MessengerReply\ReplyMiddleware
配置一个 input 传输,其中您从外部应用程序发送消息,这些消息由您的工人消费。还有一个 output 传输,用于发送回复,最好为每个发送请求的应用程序使用一个队列,以便它们只消费针对它们的回复。我们需要单独的传输,因为 messenger:consume [transportname]
会消耗该传输的所有队列中的所有消息。
framework: messenger: transports: input: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' options: exchange: name: pdf-service type: direct default_publish_routing_key: input queues: input: binding_keys: [input] retry_strategy: max_retries: 3 # milliseconds delay delay: 1000 # causes the delay to be higher before each retry # e.g. 1 second delay, 2 seconds, 4 seconds multiplier: 2 max_delay: 0 output: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' options: exchange: name: pdf-service type: direct default_publish_routing_key: output queues: output: binding_keys: [output] output_1: binding_keys: [output_1] retry_strategy: max_retries: 3 # milliseconds delay delay: 1000 # causes the delay to be higher before each retry # e.g. 1 second delay, 2 seconds, 4 seconds multiplier: 2 max_delay: 0
所有回复都应该路由到输出传输
framework: messenger: routing: # Route your messages to the transports '*': output
用法
使用附加的 ReplyStamp 分发请求消息,以便接收者知道在哪里发送回复
use MyNamespace\GeneratePdfMessage; use Vrok\MessengerReply\ReplyToStamp; $e = new Envelope(new GeneratePdfMessage('LaTeX content')); $this->bus->dispatch($e ->with(new ReplyToStamp('output')) );
实现一个 MessageHandler,用于处理请求并返回回复消息对象
use MyNamespace\GeneratePdfMessage use MyNamespace\PdfResultMessage class GeneratePdfMessageHandler implements MessageHandlerInterface { public function __invoke(GeneratePdfMessage $message): PdfResultMessage { $LaTeX = $message->getLatex(); $pdfContent = "<fakepdf>$LaTeX</fakePdf>"; $reply = new PdfResultMessage($pdfContent); return $reply; } }
在接收方消费请求(只在 input 队列上!)
./bin/console messenger:consume input
在请求方消费回复(只在 output 队列上!)
./bin/console messenger:consume pdf-results
如果您需要在请求方知道要恢复哪个任务等,可以使用接收到的回复,您可以在请求和回复消息类上实现 Vrok\MessengerReply\TaskIdentifierMessageInterface
(并使用 Vrok\MessengerReply\TaskIdentifierMessageTrait
),以自动将请求上给出的 task 和/或 identifier 属性传输到回复。我们不能使用邮票,因为邮票不能由 MessageHandlers 访问。