sroze / messenger-enqueue-transport
Symfony Messenger 组件的入队适配器
0.8.0
2023-12-18 11:42 UTC
Requires
- php: ^8.1
- enqueue/amqp-tools: ^0.10
- enqueue/enqueue-bundle: ^0.10
- symfony/messenger: ^5.4 || ^6.3 || ^7.0
- symfony/options-resolver: ^5.4 || ^6.3 || ^7.0
Requires (Dev)
- enqueue/snsqs: ^0.10.11
- phpspec/prophecy: ^1.15
- phpspec/prophecy-phpunit: ^2.0
- phpunit/phpunit: ^10.0
- symfony/yaml: ^5.4 || ^6.3 || ^7.0
Replaces
- enqueue/messenger-adapter: >0.2.2
README
此 Symfony Messenger 传输允许您使用 Enqueue 从所有支持的代理发送和接收消息。
使用方法
- 安装传输
composer req sroze/messenger-enqueue-transport
- 按照常规方式配置 Enqueue 包(查看 Enqueue 的包文档)。如果您使用的是配方,您只需配置环境变量以配置
default
Enqueue 传输
# .env # ... ###> enqueue/enqueue-bundle ### ENQUEUE_DSN=amqp://guest:guest@localhost:5672/%2f ###< enqueue/enqueue-bundle ###
- 配置 Messenger 的传输(我们将命名为
amqp
),以使用 Enqueue 的default
传输
# config/packages/messenger.yaml framework: messenger: transports: amqp: enqueue://default
- 路由需要通过消息队列的消息
# config/packages/messenger.yaml framework: messenger: # ... routing: 'App\Message\MyMessage': amqp
- 消费!
bin/console messenger:consume amqp
高级使用
配置队列和交换
在传输 DSN 中,您可以添加额外的配置。以下是常见的参考 DSN(请注意,这些值仅用于示例)
enqueue://default
?queue[name]=queue_name
&topic[name]=topic_name
&deliveryDelay=1800
&delayStrategy=Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy
&timeToLive=3600
&receiveTimeout=1000
&priority=1
在消息上设置自定义配置
每个 Enqueue 传输(例如 amqp、redis 等)都有自己的消息对象,通常可以通过调用设置方法(例如 $message->setDeliveryDelay(5000)
)进行配置。但在 Messenger 中,您无法直接访问这些对象。相反,您可以通过 TransportConfiguration
标记间接设置它们
use Symfony\Component\Messenger\Envelope; use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration; // ... // create your message like normal $message = // ... $transportConfig = (new TransportConfiguration()) // commmon options have a convenient method ->setDeliveryDelay(5000) // other transport-specific options are set via metadata // example custom option for AmqpMessage // each "metadata" will map to a setter on your message // will result in setDeliveryMode(AmqpMessage::DELIVERY_MODE_PERSISTENT) // being called ->addMetadata('deliveryMode', AmqpMessage::DELIVERY_MODE_PERSISTENT) ; $bus->dispatch((new Envelope($message))->with($transportConfig));
在特定主题上发送消息
您可以使用带有您的消息的 TransportConfiguration
封装项目在特定主题上发送消息
use Symfony\Component\Messenger\Envelope; use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration; // ... $transportConfig = (new TransportConfiguration()) ->setTopic('specific-topic') ; $bus->dispatch((new Envelope($message))->with($transportConfig));
使用 AMQP 主题交换
请参阅https://rabbitmq.cn/tutorials/tutorial-five-php.html
您可以使用特定的主题和队列选项来配置您的 AMQP 交换在 topic
模式下,并将其绑定
enqueue://default
?queue[name]=queue_name
&queue[bindingKey]=foo.#
&topic[name]=topic_name
&topic[type]=topic
&deliveryDelay=1800
&delayStrategy=Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy
&timeToLive=3600
&receiveTimeout=1000
&priority=1
以下是将消息发送到匹配此消费者的路由键的方法
$bus->dispatch((new Envelope($message))->with(new TransportConfiguration([ 'topic' => 'topic_name', 'metadata' => [ 'routingKey' => 'foo.bar' ] ])));
配置自定义 Kafka 消息
以下是发送具有一些自定义选项的消息的方法
$this->bus->dispatch((new Envelope($message))->with(new TransportConfiguration([ 'topic' => 'test_topic_name', 'metadata' => [ 'key' => 'foo.bar', 'partition' => 0, 'timestamp' => (new \DateTimeImmutable())->getTimestamp(), 'messageId' => uniqid('kafka_', true), ] ])))