robiningelbrecht/drupal-amqp-rabbitmq

Drupal AMQP 示例

dev-master 2022-05-10 18:44 UTC

This package is auto-updated.

Last update: 2024-09-11 00:01:57 UTC


README

RabbitMQ

CI/CD codecov.io License PHP

此仓库旨在说明如何在 Drupal 中设置 AMQP。它包含一个基本结构,以及一些使用 CommandHandlers 处理 AMQP 消息的工作示例。

安装

  • 首先安装 DockerLando
  • 克隆此仓库 git clone git@github.com:robiningelbrecht/drupal-amqp-rabbitmq.git
  • 运行 lando start 来构建必要的 docker 容器
  • 运行 lando composer install 来下载供应商依赖项
  • 确保将以下配置添加到 settings.php
$databases['default']['default'] = [
  'database' => 'drupal9',
  'username' => 'drupal9',
  'password' => 'drupal9',
  'prefix' => '',
  'host' => 'database',
  'port' => '',
  'namespace' => 'Drupal\\Core\\Database\\Driver\\mysql',
  'driver' => 'mysql',
];
$settings['config_sync_directory'] = '../config/sync';

$settings['amqp_credentials'] = [
  'host' => '172.21.0.3', // The AMQP host IP address is outputted in your CLI while running `lando start`
  'port' => '5672',
  'username' => 'guest',
  'password' => 'guest',
  'vhost' => '/',
  'api' => 'http://rabbit.lndo.site/',
];
  • 通过运行 lando drush sql-cli < init.sql 导入数据库转储

基本思想和设置

基本上有三个重要术语需要记住

  • Worker:一个处理消息的特定类,也处理无法处理消息时的失败
  • Queue:一个表示 RabbitMQ 队列的类,允许将消息推送到相应的队列。每个队列都链接到一个工作器
  • Consumer:消费特定队列及其消息的进程。每个队列可以有零个或多个消费者

amqp 模块提供了一个基本框架,允许您

  • 定义队列和工作器
  • 将消息推送到队列
  • 使用 drush 命令消费队列

RabbitMQ

推送消息和消费消息

amqp 模块包含一个 SimpleQueue 和一个 SimpleQueueWorker。让我们看看推送和消费消息的示例

Consume - Push example

添加新的队列

建议为每种类型的任务添加一个队列,例如

  • 发送通知: send-notification-queue
  • 迁移文章: migrate-article-queue
  • 计算产品价格: calculate-product-price-queue
  • ...

这种方法确保了同一类型的任务不会阻塞其他任务。它还具有优势,即您可以在每个队列的相应失败队列上记录失败的消息

  • send-notification-queue-failed
  • migrate-article-queue-failed
  • calculate-product-price-queue-failed

要声明新的队列,只需向您的 services.yml 中添加一个新的条目,并将其标记为 ampq_queue

  Drupal\your_module\Queue\NewQueue:
    autowire: true
    tags:
      - { name: amqp_queue }

确保此类扩展 BaseQueue,这样您就不必自己处理队列消息。

@TODO: 解释如何向队列推送消息

将消息推送到相应的失败队列

如果由于某种原因,消息无法处理,您可能希望将其记录在某个地方。一个“失败队列”可能是解决方案。
要向相应的失败队列推送消息,您可以使用 FailedQueueFactory

  $this->failedQueueFactory->buildFor($queue)->queue(message);

此工厂可以在工作器的 processFailure 回调中使用

  public function processFailure(Envelope $envelope, AMQPMessage $message, \Throwable $exception, Queue $queue): void
  {
    /** @var Command $command */
    $command = $envelope;
    $command->setMetaData([
      'exceptionMessage' => $exception->getMessage(),
      'traceAsString' => $exception->getTraceAsString(),
    ]);

    $failedQueue = $this->failedQueueFactory->buildFor($queue)->queue($command);
  }

注意:失败队列没有附加工作器,因此不能被消费。这意味着消息将留在队列上,直到手动删除。

使用延迟队列延迟消费消息

在某些更高级的使用场景中,您可能希望延迟消费消息,例如

  • 一个摘要邮件,总结过去30分钟内发生的内容变更
  • 在15秒后自动重新排队失败的邮件
  • ...

您可以通过将消息推送到相应的延迟队列来实现这一点

  $this->delayedQueueFactory->buildWithDelayForQueue(15, $queue)->queue($message);

为了让延迟队列正常工作,您需要做两件事

  • 添加一个名为 dlx 的新交换机
  • 确保队列被定义为绑定在 dlx 交换机上,其中绑定的路由键是要路由到的命令队列名称。

Dlx binding example

定义一个新的 CommandHandler

我喜欢使用命令和 CommandHandler 将更改持久化到数据库中。这基本上是 cqrs 模块的作用。它提供了一个简单的框架,

  • 允许您定义新的命令及其对应的命令处理器
  • 允许您将消息推送到命令队列
  • 提供命令工人和分发器来处理来自不同队列的命令

要添加新的命令(和命令处理器),只需在您的 services.yml 中添加一个新的条目,并用 cqrs_command_handler 标记它

  Drupal\your_module\DoSomething\DoSomethingCommandHandler:
    autowire: true
    tags:
      - { name: cqrs_command_handler }

实时迁移示例

示例模块包含...一个示例(duh),展示如何实现“实时”迁移到“突发新闻”内容类型。

导航到 admin/content/generate-migration-message。此表单允许您将迁移消息推送到队列。它模拟第三方如何将消息推送到 Drupal 迁移队列,其中它将被消费者获取。迁移框架然后将进行重负载。

Real time migration

以后台进程运行消费者

通常您希望将消费者作为后台进程运行,并保持它们“活跃”,直到您的服务器运行。这可以通过 systemd 实现,但我选择使用 supervisord

Supervisor 是一个客户端/服务器系统,它允许其用户在类 UNIX 操作系统上监控和控制多个进程。

要将所有消费者注册为进程,只需运行 lando consumers-start。这将启动 supervisord 并自动为您的所有队列创建必要的消费者。

当添加/删除队列或更新队列配置时,您需要运行 lando consumers-restart 以使新的设置生效。

重要:每次您对代码进行更改时,请确保运行重启命令,因为您不希望消费者运行旧代码。

检查消费者的状态

您只需运行 lando consumers-status,这将输出类似以下的内容

ampq-consume-queue-one:ampq-consume-queue-one-00   RUNNING   pid 1219, uptime 0:00:06
ampq-consume-queue-one:ampq-consume-queue-one-01   RUNNING   pid 1215, uptime 0:00:07
ampq-consume-queue-one:ampq-consume-queue-two-01   RUNNING   pid 1216, uptime 0:00:07