vrok/messenger-reply

Symfony 消息中间件与邮票,用于回复消息

v1.0.0 2020-06-18 15:32 UTC

This package is auto-updated.

Last update: 2024-08-29 11:22:53 UTC


README

这是一个库,允许 symfony/messenger 使用结果回复消息。
它旨在用于具有 AMQP 传输和两个通过同一代理相互通信的 symfony 实例的设置:例如,一个网页前端和一个微服务,其中前端向服务发送任务并请求回复,例如生成的 PDF 文件。

CI Status Coverage Status

设置

在双方安装(您需要在请求方有 ReplyToStamp,在接收方有中间件和邮票):composer require vrok/messenger-reply

您需要请求和回复消息类在双方相等,例如使用共享的 composer 包

namespace MyNamespace\Message;

class GeneratePdfMessage
{
    /**
     * @var string
     */
    private string $latex;

    public function __construct(string $latex)
    {
        $this->latex = $latex;
    }

    /**
     * @return string
     */
    public function getLatex(): string
    {
        return $this->latex;
    }
}

...

namespace MyNamespace\Message;

class PdfResultMessage
{
    private string $pdfContent;

    public function __construct(string $pdfContent)
    {
        $this->pdfContent = $pdfContent;
    }

    /**
     * @return string
     */
    public function getPdfContent(): string
    {
        return $this->pdfContent;
    }
}

请求方

添加一个用于共享 AMQP 代理的传输,路由到接收者的 input(具有与接收者的 input 队列相同的交换和队列名称)。

framework:
    messenger:
        transports:
            pdf-requests:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    exchange:
                        name: pdf-service
                        type: direct
                        default_publish_routing_key: input
                    queues:
                        input:
                            binding_keys: [input]
                retry_strategy:
                    max_retries: 3
                    # milliseconds delay
                    delay: 1000
                    # causes the delay to be higher before each retry
                    # e.g. 1 second delay, 2 seconds, 4 seconds
                    multiplier: 2
                    max_delay: 0

添加一个用于共享 AMQP 代理的传输,用于接收回复(匹配接收者的 output 队列的交换和队列名称):我们需要单独的传输,因为 messenger:consume [transportname] 会消耗该传输的所有队列中的所有消息。

framework:
    messenger:
        transports:
            pdf-results:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    exchange:
                        name: pdf-service
                        type: direct
                        default_publish_routing_key: output
                    queues:
                        input:
                            binding_keys: [output]
                retry_strategy:
                    max_retries: 3
                    # milliseconds delay
                    delay: 1000
                    # causes the delay to be higher before each retry
                    # e.g. 1 second delay, 2 seconds, 4 seconds
                    multiplier: 2
                    max_delay: 0

将请求路由到共享传输/队列

framework:
    messenger:
        routing:
            # e.g. 
            'MyNamespace\GeneratePdfMessage': pdf-requests

回复方

配置中间件服务

services:
    Vrok\MessengerReply\ReplyMiddleware:
        tags:
            - { name: monolog.logger, channel: messenger }
        calls:
            - [setLogger, ['@logger']]

在您的消息总线中启用中间件
(我们必须禁用默认中间件并明确定义顺序,因为没有优先级选项,只是将我们的服务添加到中间件选项中会将它添加到 send_middleware 之前。请参阅 symfony/symfony#28568

framework:
    messenger:
        buses:
            messenger.bus.default:
                default_middleware: false
                middleware:
                    - {id: 'add_bus_name_stamp_middleware', arguments: ['messenger.bus.default']}
                    - reject_redelivered_message_middleware
                    - dispatch_after_current_bus
                    - failed_message_processing_middleware
                    - send_message
                    - handle_message
                    - Vrok\MessengerReply\ReplyMiddleware

配置一个 input 传输,其中您从外部应用程序发送消息,这些消息由您的工人消费。还有一个 output 传输,用于发送回复,最好为每个发送请求的应用程序使用一个队列,以便它们只消费针对它们的回复。我们需要单独的传输,因为 messenger:consume [transportname] 会消耗该传输的所有队列中的所有消息。

framework:
    messenger:
        transports:
            input:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    exchange:
                        name: pdf-service
                        type: direct
                        default_publish_routing_key: input
                    queues:
                        input:
                            binding_keys: [input]
                retry_strategy:
                    max_retries: 3
                    # milliseconds delay
                    delay: 1000
                    # causes the delay to be higher before each retry
                    # e.g. 1 second delay, 2 seconds, 4 seconds
                    multiplier: 2
                    max_delay: 0

            output:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    exchange:
                        name: pdf-service
                        type: direct
                        default_publish_routing_key: output
                    queues:
                        output:
                            binding_keys: [output]
                        output_1:
                            binding_keys: [output_1]
                retry_strategy:
                    max_retries: 3
                    # milliseconds delay
                    delay: 1000
                    # causes the delay to be higher before each retry
                    # e.g. 1 second delay, 2 seconds, 4 seconds
                    multiplier: 2
                    max_delay: 0

所有回复都应该路由到输出传输

framework:
    messenger:
        routing:
            # Route your messages to the transports
            '*': output

用法

使用附加的 ReplyStamp 分发请求消息,以便接收者知道在哪里发送回复

    use MyNamespace\GeneratePdfMessage;
    use Vrok\MessengerReply\ReplyToStamp;

    $e = new Envelope(new GeneratePdfMessage('LaTeX content'));
    $this->bus->dispatch($e
        ->with(new ReplyToStamp('output'))
    );

实现一个 MessageHandler,用于处理请求并返回回复消息对象

use MyNamespace\GeneratePdfMessage
use MyNamespace\PdfResultMessage

class GeneratePdfMessageHandler implements
    MessageHandlerInterface
{
    public function __invoke(GeneratePdfMessage $message): PdfResultMessage
    {
        $LaTeX = $message->getLatex();
        $pdfContent = "<fakepdf>$LaTeX</fakePdf>";
        $reply = new PdfResultMessage($pdfContent);
        return $reply;
    }
}

在接收方消费请求(只在 input 队列上!)

./bin/console messenger:consume input

在请求方消费回复(只在 output 队列上!)

./bin/console messenger:consume pdf-results

如果您需要在请求方知道要恢复哪个任务等,可以使用接收到的回复,您可以在请求和回复消息类上实现 Vrok\MessengerReply\TaskIdentifierMessageInterface(并使用 Vrok\MessengerReply\TaskIdentifierMessageTrait),以自动将请求上给出的 task 和/或 identifier 属性传输到回复。我们不能使用邮票,因为邮票不能由 MessageHandlers 访问。