sroze/messenger-enqueue-transport

Symfony Messenger 组件的入队适配器

安装次数: 7,481,913

依赖: 2

建议者: 0

安全: 0

星星: 191

关注者: 17

分支: 55

开放问题: 40

类型:symfony-bundle

0.8.0 2023-12-18 11:42 UTC

This package is auto-updated.

Last update: 2024-09-18 13:24:59 UTC


README

此 Symfony Messenger 传输允许您使用 Enqueue 从所有支持的代理发送和接收消息。

使用方法

  1. 安装传输
composer req sroze/messenger-enqueue-transport
  1. 按照常规方式配置 Enqueue 包(查看 Enqueue 的包文档)。如果您使用的是配方,您只需配置环境变量以配置 default Enqueue 传输
# .env
# ...

###> enqueue/enqueue-bundle ###
ENQUEUE_DSN=amqp://guest:guest@localhost:5672/%2f
###< enqueue/enqueue-bundle ###
  1. 配置 Messenger 的传输(我们将命名为 amqp),以使用 Enqueue 的 default 传输
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            amqp: enqueue://default
  1. 路由需要通过消息队列的消息
# config/packages/messenger.yaml
framework:
    messenger:
        # ...

        routing:
            'App\Message\MyMessage': amqp
  1. 消费!
bin/console messenger:consume amqp

高级使用

配置队列和交换

在传输 DSN 中,您可以添加额外的配置。以下是常见的参考 DSN(请注意,这些值仅用于示例)

enqueue://default
    ?queue[name]=queue_name
    &topic[name]=topic_name
    &deliveryDelay=1800
    &delayStrategy=Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy
    &timeToLive=3600
    &receiveTimeout=1000
    &priority=1

在消息上设置自定义配置

每个 Enqueue 传输(例如 amqp、redis 等)都有自己的消息对象,通常可以通过调用设置方法(例如 $message->setDeliveryDelay(5000))进行配置。但在 Messenger 中,您无法直接访问这些对象。相反,您可以通过 TransportConfiguration 标记间接设置它们

use Symfony\Component\Messenger\Envelope;
use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration;

// ...

// create your message like normal
$message = // ...

$transportConfig = (new TransportConfiguration())
    // commmon options have a convenient method
    ->setDeliveryDelay(5000)

    // other transport-specific options are set via metadata
    // example custom option for AmqpMessage
    // each "metadata" will map to a setter on your message
    // will result in setDeliveryMode(AmqpMessage::DELIVERY_MODE_PERSISTENT)
    // being called
    ->addMetadata('deliveryMode', AmqpMessage::DELIVERY_MODE_PERSISTENT)
;

$bus->dispatch((new Envelope($message))->with($transportConfig));

在特定主题上发送消息

您可以使用带有您的消息的 TransportConfiguration 封装项目在特定主题上发送消息

use Symfony\Component\Messenger\Envelope;
use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration;

// ...

$transportConfig = (new TransportConfiguration())
    ->setTopic('specific-topic')
;

$bus->dispatch((new Envelope($message))->with($transportConfig));

使用 AMQP 主题交换

请参阅https://rabbitmq.cn/tutorials/tutorial-five-php.html

您可以使用特定的主题和队列选项来配置您的 AMQP 交换在 topic 模式下,并将其绑定

enqueue://default
    ?queue[name]=queue_name
    &queue[bindingKey]=foo.#
    &topic[name]=topic_name
    &topic[type]=topic
    &deliveryDelay=1800
    &delayStrategy=Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy
    &timeToLive=3600
    &receiveTimeout=1000
    &priority=1

以下是将消息发送到匹配此消费者的路由键的方法

$bus->dispatch((new Envelope($message))->with(new TransportConfiguration([
    'topic' => 'topic_name',
    'metadata' => [
        'routingKey' => 'foo.bar'
    ]
])));

配置自定义 Kafka 消息

以下是发送具有一些自定义选项的消息的方法

$this->bus->dispatch((new Envelope($message))->with(new TransportConfiguration([
    'topic' => 'test_topic_name',
    'metadata' => [
        'key' => 'foo.bar',
        'partition' => 0,
        'timestamp' => (new \DateTimeImmutable())->getTimestamp(),
        'messageId' => uniqid('kafka_', true),
    ]
])))