autoprotect-group/messenger-enqueue-transport

Symfony Messenger 组件的入队适配器

0.7.0 2022-04-13 09:52 UTC

This package is auto-updated.

Last update: 2024-09-13 15:08:03 UTC


README

此 Symfony Messenger 传输允许您使用 Enqueue 从所有支持的代理发送和接收您的消息。

用法

  1. 安装传输
composer req sroze/messenger-enqueue-transport
  1. 按照常规方式配置 Enqueue 包(查看 Enqueue 的包文档)。如果您使用的是食谱,您只需配置环境变量以配置 default Enqueue 传输
# .env
# ...

###> enqueue/enqueue-bundle ###
ENQUEUE_DSN=amqp://guest:guest@localhost:5672/%2f
###< enqueue/enqueue-bundle ###
  1. 配置 Messenger 的传输(我们将命名为 amqp)以使用 Enqueue 的 default 传输
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            amqp: enqueue://default
  1. 将消息路由到消息队列
# config/packages/messenger.yaml
framework:
    messenger:
        # ...

        routing:
            'App\Message\MyMessage': amqp
  1. 消费!
bin/console messenger:consume amqp

高级用法

配置队列和交换机

在传输 DSN 中,您可以添加额外的配置。以下是一个常见的参考 DSN(请注意,这些值仅用于示例)

enqueue://default
    ?queue[name]=queue_name
    &topic[name]=topic_name
    &deliveryDelay=1800
    &delayStrategy=Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy
    &timeToLive=3600
    &receiveTimeout=1000
    &priority=1

在您的消息上设置自定义配置

每个 Enqueue 传输(例如 amqp、redis 等)都有自己的消息对象,通常可以通过调用设置方法进行配置(例如 $message->setDeliveryDelay(5000))。但在 Messenger 中,您无法直接访问这些对象。相反,您可以通过 TransportConfiguration 标记间接设置它们

use Symfony\Component\Messenger\Envelope;
use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration;

// ...

// create your message like normal
$message = // ...

$transportConfig = (new TransportConfiguration())
    // commmon options have a convenient method
    ->setDeliveryDelay(5000)

    // other transport-specific options are set via metadata
    // example custom option for AmqpMessage
    // each "metadata" will map to a setter on your message
    // will result in setDeliveryMode(AmqpMessage::DELIVERY_MODE_PERSISTENT)
    // being called
    ->addMetadata('deliveryMode', AmqpMessage::DELIVERY_MODE_PERSISTENT)
;

$bus->dispatch((new Envelope($message))->with($transportConfig));

在特定主题上发送消息

您可以使用 TransportConfiguration 信封项与您的消息一起在特定主题上发送消息

use Symfony\Component\Messenger\Envelope;
use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration;

// ...

$transportConfig = (new TransportConfiguration())
    ->setTopic('specific-topic')
;

$bus->dispatch((new Envelope($message))->with($transportConfig));

使用 AMQP 主题交换机

请参阅 https://rabbitmq.cn/tutorials/tutorial-five-php.html

您可以使用特定的主题和队列选项来配置您的 AMQP 交换机在 topic 模式下,并将其绑定

enqueue://default
    ?queue[name]=queue_name
    &queue[bindingKey]=foo.#
    &topic[name]=topic_name
    &topic[type]=topic
    &deliveryDelay=1800
    &delayStrategy=Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy
    &timeToLive=3600
    &receiveTimeout=1000
    &priority=1

以下是如何发送与消费者匹配的路由键的消息的方法

$bus->dispatch((new Envelope($message))->with(new TransportConfiguration([
    'topic' => 'topic_name',
    'metadata' => [
        'routingKey' => 'foo.bar'
    ]
])));

配置自定义 Kafka 消息

以下是如何发送带有一些自定义选项的消息的方法

$this->bus->dispatch((new Envelope($message))->with(new TransportConfiguration([
    'topic' => 'test_topic_name',
    'metadata' => [
        'key' => 'foo.bar',
        'partition' => 0,
        'timestamp' => (new \DateTimeImmutable())->getTimestamp(),
        'messageId' => uniqid('kafka_', true),
    ]
])))