t3n / jobqueue-rabbitmq
一个支持RabbitMQ作为后端的Flowpack.JobQueue.Common实现
Requires
- ext-json: *
- flowpack/jobqueue-common: ^3.0.0
- php-amqplib/php-amqplib: ^2.11
Requires (Dev)
- t3n/coding-standard: ~1.1
This package is auto-updated.
Last update: 2024-09-10 10:52:22 UTC
README
t3n.JobQueue.RabbitMQ
基于Flowpack.JobQueue.Common的RabbitMQ的作业队列后端。如果您还不知道RabbitMQ,可以查看他们的教程这里。
免责声明:这是一个由ttreeagency制作的初始版本的分支/重制!
设置
要使用RabbitMQ作为JobQueue的后端,您需要一个正在运行的RabbitMQ服务。您可以使用我们的docker镜像,它还包括管理控制台t3n/rabbitmq
使用composer安装包
composer require t3n/jobqueue-rabbitmq
配置
RabbitMQ允许广泛的配置和设置。查看Settings.yaml
以了解所有可用配置选项的概述。
作业队列的最简单配置可能如下所示
Flowpack: JobQueue: Common: queues: simple-queue: className: 't3n\JobQueue\RabbitMQ\Queue\RabbitQueue' executeIsolated: true options: queueOptions: # we want the queue to persist messages so they are stored even if no consumer (aka worker) is connected durable: true # multiple worker should be able to consume a queue at the same time exclusive: false autoDelete: false client: host: localhost port: 5672 username: guest password: guest
更复杂的配置也可以配置交换机和一些路由键。在这个例子中,我们将配置一个所有生产者都连接到的交换机。如果您的RabbitMQ服务器启用了rabbitmq_delayed_message_exchange
插件(我们的docker镜像默认包含它),我们还可以使用延迟功能。
此配置将为一个交换机配置几个队列。消息根据routingKey
路由到队列。我们将使用三部分:<project>.<type>.<name>
。我们将使用类型job
来表示它是要执行的作业。我们也可以使用像log
之类的任何东西。路由键的形状由您决定!
所以最终,我们将配置几个生产者/作业队列。生产者队列打算与$jobManager->queue()
一起使用。消费者队列将在./flow job:work
命令中使用。
在这个例子中,我们将最终得到两个队列。一个队列包含与路由键vendor.jobs.*
匹配的所有消息,因此每当使用producer-import
或producer-tags
队列时,消息将最终进入此队列。另一个队列将获取所有供应商的作业。
要实际启动一个消费者,请运行./flow job:work --queue all-jobs
。
此模式允许您实现不同类型的先进pub/sup实现。
Flowpack: JobQueue: Common: presets: rabbit: className: 't3n\JobQueue\RabbitMQ\Queue\RabbitQueue' executeIsolated: true maximumNumberOfReleases: 3 releaseOptions: delay: 5 options: routingKey: '' useDLX: true # enable support for dead letter exchange, requires configuration in RabbitMQ queueOptions: # don't declare a queue by default # all messages are forwarded to the exchange "t3n" declare: false exchangeName: 't3n' # the queue should be durable and don't delete itself durable: true autoDelete: false # several worker are allowed per queue exclusive: false exchange: name: 't3n' # this exchange should support delayed messages type: 'x-delayed-message' passive: false durable: true autoDelete: false arguments: # the origin type is topic so we can use routingkeys including `*` or `#` x-delayed-type: 'topic' client: host: localhost port: 5672 username: guest password: guest heartbeat: 60 # Sets RabbitMQ connection heartbeat to 60 seconds queues: producer-import: preset: rabbit options: routingKey: 'vendor.jobs.import' producer-tags: preset: rabbit options: routingKey: 'vendor.jobs.tags' vendor-jobs: preset: rabbit options: routingKey: 'vendor.jobs.*' queueOptions: # this queue should actually be declared declare: true all-jobs: preset: rabbit options: routingKey: '*.jobs.*' queueOptions: # this queue should actually be declared declare: true