enqueue/messenger-adapter

该包已被弃用且不再维护。作者建议使用sroze/messenger-enqueue-transport包。

用于Symfony Messenger组件的Enqueue适配器

安装数: 237,481

依赖者: 0

建议者: 0

安全: 0

星标: 164

关注者: 17

分支: 49

类型:symfony-bundle

0.5.1 2021-08-03 17:59 UTC

This package is auto-updated.

Last update: 2022-02-01 13:13:08 UTC


README

这个Symfony Messenger传输组件允许您使用Enqueue从所有支持的消息代理发送和接收消息。

使用方法

  1. 安装传输
composer req sroze/messenger-enqueue-transport
  1. 按照常规方式配置Enqueue包(查看Enqueue的Bundle文档)。如果您使用的是食谱,您只需要配置环境变量来配置default Enqueue传输
# .env
# ...

###> enqueue/enqueue-bundle ###
ENQUEUE_DSN=amqp://guest:guest@localhost:5672/%2f
###< enqueue/enqueue-bundle ###
  1. 配置Messenger的传输(我们将其命名为amqp),使其使用Enqueue的default传输
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            amqp: enqueue://default
  1. 路由需要通过消息队列的消息
# config/packages/messenger.yaml
framework:
    messenger:
        # ...

        routing:
            'App\Message\MyMessage': amqp
  1. 消费!
bin/console messenger:consume amqp

高级使用

配置队列和交换机

在传输DSN中,您可以添加额外的配置。以下是一个常见的参考DSN(注意,值仅为示例)

enqueue://default
    ?queue[name]=queue_name
    &topic[name]=topic_name
    &deliveryDelay=1800
    &delayStrategy=Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy
    &timeToLive=3600
    &receiveTimeout=1000
    &priority=1

在您的消息上设置自定义配置

每个Enqueue传输(例如amqp、redis等)都有自己的消息对象,通常可以通过调用setter方法(例如$message->setDeliveryDelay(5000))进行配置。但在Messenger中,您不能直接访问这些对象。相反,您可以通过TransportConfiguration标记间接设置它们

use Symfony\Component\Messenger\Envelope;
use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration;

// ...

// create your message like normal
$message = // ...

$transportConfig = (new TransportConfiguration())
    // commmon options have a convenient method
    ->setDeliveryDelay(5000)

    // other transport-specific options are set via metadata
    // example custom option for AmqpMessage
    // each "metadata" will map to a setter on your message
    // will result in setDeliveryMode(AmqpMessage::DELIVERY_MODE_PERSISTENT)
    // being called
    ->addMetadata('deliveryMode', AmqpMessage::DELIVERY_MODE_PERSISTENT)
;

$bus->dispatch((new Envelope($message))->with($transportConfig));

在特定主题上发送消息

您可以使用带有消息的TransportConfiguration信封项在特定主题上发送消息

use Symfony\Component\Messenger\Envelope;
use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration;

// ...

$transportConfig = (new TransportConfiguration())
    ->setTopic('specific-topic')
;

$bus->dispatch((new Envelope($message))->with($transportConfig));

使用AMQP主题交换机

参见https://rabbitmq.cn/tutorials/tutorial-five-php.html

您可以使用特定的主题和队列选项来配置您的AMQP交换机以在topic模式下运行并将其绑定

enqueue://default
    ?queue[name]=queue_name
    &queue[bindingKey]=foo.#
    &topic[name]=topic_name
    &topic[type]=topic
    &deliveryDelay=1800
    &delayStrategy=Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy
    &timeToLive=3600
    &receiveTimeout=1000
    &priority=1

以下是发送与该消费者匹配的路由键的消息的方式

$bus->dispatch((new Envelope($message))->with(new TransportConfiguration([
    'topic' => 'topic_name',
    'metadata' => [
        'routingKey' => 'foo.bar'
    ]
])));

配置自定义Kafka消息

以下是发送带有一些自定义选项的消息的方式

$this->bus->dispatch((new Envelope($message))->with(new TransportConfiguration([
    'topic' => 'test_topic_name',
    'metadata' => [
        'key' => 'foo.bar',
        'partition' => 0,
        'timestamp' => (new \DateTimeImmutable())->getTimestamp(),
        'messageId' => uniqid('kafka_', true),
    ]
])))