robiningelbrecht / drupal-amqp-rabbitmq
Drupal AMQP 示例
Requires
- php: >=8.1
- composer/installers: ^1.9
- drupal/admin_toolbar: ^3.1
- drupal/core-composer-scaffold: ^9.3
- drupal/core-project-message: ^9.3
- drupal/core-recommended: ^9.3
- drupal/devel: ^4.1
- drupal/migrate_plus: ^6.0
- drupal/migrate_tools: ^6.0
- drush/drush: ^11.0
- php-amqplib/php-amqplib: ^3.1
Requires (Dev)
Conflicts
This package is auto-updated.
Last update: 2024-09-11 00:01:57 UTC
README
此仓库旨在说明如何在 Drupal 中设置 AMQP。它包含一个基本结构,以及一些使用 CommandHandlers 处理 AMQP 消息的工作示例。
安装
- 首先安装 Docker 和 Lando
- 克隆此仓库
git clone git@github.com:robiningelbrecht/drupal-amqp-rabbitmq.git
- 运行
lando start
来构建必要的 docker 容器 - 运行
lando composer install
来下载供应商依赖项 - 确保将以下配置添加到
settings.php
$databases['default']['default'] = [ 'database' => 'drupal9', 'username' => 'drupal9', 'password' => 'drupal9', 'prefix' => '', 'host' => 'database', 'port' => '', 'namespace' => 'Drupal\\Core\\Database\\Driver\\mysql', 'driver' => 'mysql', ]; $settings['config_sync_directory'] = '../config/sync'; $settings['amqp_credentials'] = [ 'host' => '172.21.0.3', // The AMQP host IP address is outputted in your CLI while running `lando start` 'port' => '5672', 'username' => 'guest', 'password' => 'guest', 'vhost' => '/', 'api' => 'http://rabbit.lndo.site/', ];
- 通过运行
lando drush sql-cli < init.sql
导入数据库转储
基本思想和设置
基本上有三个重要术语需要记住
- Worker:一个处理消息的特定类,也处理无法处理消息时的失败
- Queue:一个表示 RabbitMQ 队列的类,允许将消息推送到相应的队列。每个队列都链接到一个工作器
- Consumer:消费特定队列及其消息的进程。每个队列可以有零个或多个消费者
amqp
模块提供了一个基本框架,允许您
- 定义队列和工作器
- 将消息推送到队列
- 使用 drush 命令消费队列
推送消息和消费消息
amqp
模块包含一个 SimpleQueue
和一个 SimpleQueueWorker
。让我们看看推送和消费消息的示例
添加新的队列
建议为每种类型的任务添加一个队列,例如
- 发送通知:
send-notification-queue
- 迁移文章:
migrate-article-queue
- 计算产品价格:
calculate-product-price-queue
- ...
这种方法确保了同一类型的任务不会阻塞其他任务。它还具有优势,即您可以在每个队列的相应失败队列上记录失败的消息
send-notification-queue-failed
migrate-article-queue-failed
calculate-product-price-queue-failed
要声明新的队列,只需向您的 services.yml
中添加一个新的条目,并将其标记为 ampq_queue
Drupal\your_module\Queue\NewQueue: autowire: true tags: - { name: amqp_queue }
确保此类扩展 BaseQueue
,这样您就不必自己处理队列消息。
@TODO: 解释如何向队列推送消息
将消息推送到相应的失败队列
如果由于某种原因,消息无法处理,您可能希望将其记录在某个地方。一个“失败队列”可能是解决方案。
要向相应的失败队列推送消息,您可以使用 FailedQueueFactory
$this->failedQueueFactory->buildFor($queue)->queue(message);
此工厂可以在工作器的 processFailure
回调中使用
public function processFailure(Envelope $envelope, AMQPMessage $message, \Throwable $exception, Queue $queue): void { /** @var Command $command */ $command = $envelope; $command->setMetaData([ 'exceptionMessage' => $exception->getMessage(), 'traceAsString' => $exception->getTraceAsString(), ]); $failedQueue = $this->failedQueueFactory->buildFor($queue)->queue($command); }
注意:失败队列没有附加工作器,因此不能被消费。这意味着消息将留在队列上,直到手动删除。
使用延迟队列延迟消费消息
在某些更高级的使用场景中,您可能希望延迟消费消息,例如
- 一个摘要邮件,总结过去30分钟内发生的内容变更
- 在15秒后自动重新排队失败的邮件
- ...
您可以通过将消息推送到相应的延迟队列来实现这一点
$this->delayedQueueFactory->buildWithDelayForQueue(15, $queue)->queue($message);
为了让延迟队列正常工作,您需要做两件事
- 添加一个名为
dlx
的新交换机 - 确保队列被定义为绑定在
dlx
交换机上,其中绑定的路由键是要路由到的命令队列名称。
定义一个新的 CommandHandler
我喜欢使用命令和 CommandHandler 将更改持久化到数据库中。这基本上是 cqrs
模块的作用。它提供了一个简单的框架,
- 允许您定义新的命令及其对应的命令处理器
- 允许您将消息推送到命令队列
- 提供命令工人和分发器来处理来自不同队列的命令
要添加新的命令(和命令处理器),只需在您的 services.yml
中添加一个新的条目,并用 cqrs_command_handler
标记它
Drupal\your_module\DoSomething\DoSomethingCommandHandler: autowire: true tags: - { name: cqrs_command_handler }
实时迁移示例
示例模块包含...一个示例(duh),展示如何实现“实时”迁移到“突发新闻”内容类型。
导航到 admin/content/generate-migration-message
。此表单允许您将迁移消息推送到队列。它模拟第三方如何将消息推送到 Drupal 迁移队列,其中它将被消费者获取。迁移框架然后将进行重负载。
以后台进程运行消费者
通常您希望将消费者作为后台进程运行,并保持它们“活跃”,直到您的服务器运行。这可以通过 systemd
实现,但我选择使用 supervisord
Supervisor 是一个客户端/服务器系统,它允许其用户在类 UNIX 操作系统上监控和控制多个进程。
要将所有消费者注册为进程,只需运行 lando consumers-start
。这将启动 supervisord 并自动为您的所有队列创建必要的消费者。
当添加/删除队列或更新队列配置时,您需要运行 lando consumers-restart
以使新的设置生效。
重要:每次您对代码进行更改时,请确保运行重启命令,因为您不希望消费者运行旧代码。
检查消费者的状态
您只需运行 lando consumers-status
,这将输出类似以下的内容
ampq-consume-queue-one:ampq-consume-queue-one-00 RUNNING pid 1219, uptime 0:00:06
ampq-consume-queue-one:ampq-consume-queue-one-01 RUNNING pid 1215, uptime 0:00:07
ampq-consume-queue-one:ampq-consume-queue-two-01 RUNNING pid 1216, uptime 0:00:07