ipunkt / laravel-rabbitmq-enqueue
通过 enqueue 使用主题交换和路由键的 RabbitMQ
Requires
- php: ^7.1.3|^8.0
- ext-amqp: *
- ext-json: *
- enqueue/amqp-ext: ^0.10.9
- laravel/framework: ^5.0.0|^6.0.0|^7.0.0|^8.0.0
Requires (Dev)
- mockery/mockery: ^1.2
- orchestra/testbench: ^3.5
- phpunit/phpunit: ^7.0
README
此软件包使用 php-enqueue 为 laravel 提供对 RabbitMQ 的支持。
它专门使用 enqueue amqp-ext 功能,因此需要 amqp php 扩展。
安装
此软件包通过 composer 提供。
composer require 'ipunkt/laravel-rabbitmq-enqueue:^1.0'
安装后,通过 vendor:publish
发布配置和提供者。它将创建 config/rabbitmq.php
和 app/Providers/RabbitMQProvider.php
。
将 RabbitMQProvider 添加到 config/app.php
中的提供者中,并在必要时将其命名空间从 App\
更改为您的应用程序使用的任何命名空间。
环境
默认的 config/rabbitmq.php
通过以下环境变量构建连接到 rabbitmq 所需的 dsn,默认值在名称后面
- RMQ_USERNAME - guest
- RMQ_PASSWORD - guest
- RMQ_HOST - rabbitmq
- RMQ_PORT - 5672
- RMQ_VHOST - /
使用
监听
通过运行 rabbitmq:listen
命令进行监听。它将连接到配置文件中指定的 dsn 并创建所有必需的队列、交换机和绑定。
它创建和监听哪些队列、交换机和绑定由发布的 RabbitMQProvider 定义
等待
由于 rabbitmq:listen
预期将在云环境中运行,因此它带来了开关 -w SECONDS
以等待给定秒数后再尝试连接,甚至根本不做任何事情。
可以通过在命令之前放置睡眠来实现相同的效果,但这种情况的使用频率使我包括了向需要设置 dns 和需要启动以方便使用 jaeger 代理的云环境的小提示。
处理器
通过处理器对消息做出反应。处理器是任何实现接口 Ipunkt\RabbitMQ\MessageHandler\Handler
的类。将您的处理器注册到发布的 RabbitMQProvider 中的路由键
它接收一个 AmqpMessage 对象,您很可能会想从 json 解码 $message->getBody(),因为它目前是此软件包发送者的唯一格式。
结果
您的处理器可以从命令返回以下结果
Processor::ACK
- 消息已成功处理。它将从队列中删除作为成功Processor::REJECT
- 消息与此服务无关或格式不正确。它将从队列中删除作为失败Processor::REQUEUE
- 消息有效且预期可工作,但外部环境阻止其在当前时间处理。它将被移动到等待队列,并在 10 秒后返回。- 可抛出异常或异常 - 当可抛出异常或异常传递到命令时,在重新抛出异常/异常到周围的 Laravel 代码之前,将作为
Processor::REQUEUE
处理。
这里的逻辑是,代码中的错误导致了这种情况,因此预计消息可以处理,但在新版本部署之前无法处理。
重新抛出异常应触发常规异常处理,例如通过像 Sentry 这样的系统通知开发者。 - 实现 DropsMessage 的异常 - 异常
发送
发送是通过 Ipunkt\RabbitMQ\Sender\RabbitMQ
提供的。它不是一个外观,因此建议将其作为依赖项注入。
发送到交换机
向交换机发送是预期的使用场景,因为它提供了与消费您消息的服务之间的松耦合。
$this-rabbitmq->publish([ 'some' => 'data', 'serializable' => 'as', 'json' ])->onExchange('exchange-name', 'routing-key')
发送到队列
也可以直接发送到目标队列。
$this-rabbitmq->publish([ 'some' => 'data', 'serializable' => 'as', 'json' ])->onQueue('queue-name')
为什么特别使用 RabbitMQ 而不是通用函数
通用的入队接口不支持基于路由键的路由。
可用的 SimpleClient 通过使用 fanout 交换和丢弃客户端代码中的不需要的消息来模拟基于路由键的路由。
它模拟了 RabbitMQ 本身支持的行为。因此,导致共享交换但只对某些路由键感兴趣的服务不必要的开销。