vinelab/bowler

为 Laravel 定制的 Rabbitmq 包装器

v0.9.0 2022-06-16 15:20 UTC

README

A Laravel 扩展包,它通过 Rabbitmq 服务器轻松高效地实现 AMQP 协议。在 Rabbitmq 服务器 上构建,旨在提供一个简单的抽象层来工作。

Build Status

Bowler 允许您

  • 自定义消息发布。
  • 自定义消息消费。
  • 自定义消息死信队列。
  • 处理应用程序错误并相应地处理消息。
  • 提供表达式的消费者队列配置。
  • 从命令行注册队列并生成其消息处理程序。
  • 使用默认的 Pub/Sub 消息。

除此之外,Bowler 还提供有限的行政功能。

这些功能将极大地简化您使用 Rabbitmq 的方式,并扩展其功能。此包无意取代用户设计消息架构的责任。

例如,Rabbitmq 的 管理 插件等工具,无疑将帮助您监控服务器活动并可视化配置。

目录

设置
开发
用法
   生产者
   消费者
      手册
      控制台
   发布者/订阅者
   调度器
   死信队列
   错误处理
   错误报告
   健康检查
   生命周期钩子
   测试
重要说明
待办事项

支持的 Laravel 版本

从版本 v0.4.2 开始,此库需要 Laravel 5.4 或更高版本。

从版本 v0.5.0 开始,此库需要 Laravel 5.7 或更高版本。

从版本 v0.9.0 开始,此库需要 Laravel 6.0 或更高版本。

设置

通过 Composer 安装此包

composer require vinelab/bowler

Laravel 5.4 用户还需要在 config/app.php 中的 providers 数组中添加服务提供者

Vinelab\Bowler\BowlerServiceProvider::class,

安装后,您可以使用 vendor:publish 命令发布包配置。此命令将 bowler.php 配置文件发布到您的配置目录

php artisan vendor:publish --provider="Vinelab\Bowler\BowlerServiceProvider"

您可以在 .env 文件中配置 RabbitMQ 凭证

RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USERNAME=guest
RABBITMQ_PASSWORD=guest

开发

创建 Docker 容器

docker-compose up -d

进入容器

docker-compose exec app bash

安装依赖

composer install

运行测试

./vendor/bin/phpunit

用法

生产者

为了能够发送消息,需要创建生产者实例并设置交换机。

// Initialize a Bowler Connection
$connection = new Vinelab\Bowler\Connection();

// Initialize a Producer object with a connection
$bowlerProducer = new Vinelab\Bowler\Producer($connection);

// Setup the producer's exchange name and other optional parameters: exchange type, passive, durable, auto delete and delivery mode
$bowlerProducer->setup('reporting_exchange', 'direct', false, true, false, 2);

// Send a message with an optional routingKey
$bowlerProducer->send($data, 'warning');

或注入生产者,让 IOC 解析连接

use Vinelab\Bowler\Producer;

class DoSomethingJob extends Job
{
    protected $data;

    public function __construct($data)
    {
        $this->data = $data;
    }

    public function handle(Producer $producer)
    {
        $producer->setup('reporting_exchange');

        $producer->send(json_encode($this->data));
    }
}

请确保此处设置的交换机与消费者的设置匹配,否则将抛出 Vinelab\Bowler\Exceptions\DeclarationMismatchException 异常。

如果您错误地设置了一个未定义的值,例如设置交换机时 $exchangeType='noneExistingType',将抛出 Vinelab\Bowler\Exceptions\InvalidSetupException 异常。

消费者

'Registrator' => Vinelab\Bowler\Facades\Registrator::class, 添加到 config/app 中的别名数组。

为了消费消息,需要设置交换机和队列,并创建消息处理程序。

可以通过手动或命令行配置消费者。

手册

  1. queues.php 文件中注册您的队列和处理程序(可以将队列文件视为Laravel的路由文件),注意 queues.php 文件应位于 App\Messaging 目录下。

    Registrator::queue('books', 'App\Messaging\Handlers\BookHandler', []);
    
    Registrator::queue('reporting', 'App\Messaging\Handlers\ErrorReportingHandler', [
                                                            'exchangeName' => 'main_exchange',
                                                            'exchangeType'=> 'direct',
                                                            'bindingKeys' => [
                                                                'warning',
                                                                'notification'
                                                            ],
                                                            'pasive' => false,
                                                            'durable' => true,
                                                            'autoDelete' => false,
                                                            'deadLetterQueueName' => 'dlx_queue',
                                                            'deadLetterExchangeName' => 'dlx',
                                                            'deadLetterExchangeType' => 'direct',
                                                            'deadLetterRoutingKey' => 'warning',
                                                            'messageTTL' => null
                                                        ]);

    使用选项数组来设置您的队列和交换。所有这些都是可选的,未在此指定的任何参数都将应用默认值。这些参数的描述和默认值将在本文件的后续部分提供。

  2. 创建处理接收到的消息的处理程序类。

    // This is an example handler class
    
    namespace App\Messaging\Handlers;
    
    class AuthorHandler {
    
    	public function handle($msg)
    	{
    		echo "Author: ".$msg->body;
    	}
    
        public function handleError($e, $broker)
        {
            if($e instanceof InvalidInputException) {
                $broker->rejectMessage();
            } elseif($e instanceof WhatEverException) {
                $broker->ackMessage();
            } elseif($e instanceof WhatElseException) {
                $broker->nackMessage();
            } else {
                $msg = $broker->getMessage();
                if($msg->body) {
                    //
                }
            }
        }
    }

    类似于上述内容,还向消费者处理程序提供了其他功能,如 deleteExchangepurgeQueuedeleteQueue。请明智地使用这些功能,并利用 unusedempty 参数。请注意,不建议通过操作服务器的设置来处理应用程序异常。

控制台

使用 php artisan bowler:make:queue {queue} {handler} 命令注册队列和处理程序,例如 php artisan bowler:make:queue analytics_queue AnalyticsData

前面的命令

  1. Registrator::queue('analytics_queue', 'App\Messaging\Handlers\AnalyticsDataHandler', []); 添加到 App\Messaging\queues.php

    如果没有提供交换名称,则默认使用队列名称。

    选项数组:如果指定了任何选项,则将覆盖命令中设置的参数。这有助于强调 queues.php 文件作为设置参考。

  2. App\Messaging\Handlers 目录下创建 App\Messaging\Handlers\AnalyticsDataHandler.php

现在,要监听任何队列,请在您的控制台中运行以下命令:php artisan bowler:consume {queue},例如 php artisan bowler:consume analytics_queue。您需要指定队列名称和任何其他可选参数(如果适用)。

bowler:consume 完整参数列表描述

bowler:consume
queueName : The queue NAME
--N|exchangeName : The exchange NAME. Defaults to queueName.
--T|exchangeType : The exchange TYPE. Supported exchanges: fanout, direct, topic. Defaults to fanout.
--K|bindingKeys : The consumer\'s BINDING KEYS array.
--p|passive : If set, the server will reply with Declare-Ok if the exchange and queue already exists with the same name, and raise an error if not. Defaults to 0.
--d|durable : Mark exchange and queue as DURABLE. Defaults to 1.
--D|autoDelete : Set exchange and queue to AUTO DELETE when all queues and consumers, respectively have finished using it. Defaults to 0.
--deadLetterQueueName : The dead letter queue NAME. Defaults to deadLetterExchangeName.
--deadLetterExchangeName : The dead letter exchange NAME. Defaults to deadLetterQueueName.
--deadLetterExchangeType : The dead letter exchange TYPE. Supported exchanges: fanout, direct, topic. Defaults to fanout.
--deadLetterRoutingKey : The dead letter ROUTING KEY.
--messageTTL : If set, specifies how long, in milliseconds, before a message is declared dead letter.

消费未注册的队列将抛出 Vinelab\Bowler\Exceptions\UnregisteredQueueException

如果您希望根据消息发布的路由键处理消息,您可以在处理程序的 handle 方法中使用 switch case,如下所示

public function handle($msg)
{
    switch ($msg->delivery_info['routing_key']) {
        case 'key 1': //do something
            break;
        case 'key 2': //do something else
            break;
    }
}

发布者/订阅者

Bowler 提供了一个默认的 Pub/Sub 实现,用户无需关心设置。

简而言之,使用 routingKey 发布,使用匹配的 bindingKey(s) 消费。

1. 发布

在您的生产者中

// Initialize a Bowler object with the rabbitmq server ip and port
$connection = new Bowler\Connection();

// Initialize a Pubisher object with a connection and a routingKey
$bowlerPublisher = new Publisher($connection);

// Publish the message and set its required routingKey
$bowlerPublisher->publish('warning', $data);

或以类似的方式注入发布者此处

如您所注意到的,这里我们实例化了一个 Publisher 而不是一个 Producer 对象。发布者是生产者规范,它包含默认 Pub/Sub 的 交换 设置。

签名
publish($routingKey, $data = null);

2. 订阅

在您的消费者中

(i)注册队列并生成其消息处理程序

从命令行使用 bowler:make:subscriber 命令。

php artisan bowler:make:subscriber reporting ReportingMessage --expressive

使用 --expressive-E 选项将使队列名称反映其用于 Pub/Sub。结果生成 reporting-pub-sub 作为生成的队列名称;否则将使用提供的队列名称。

queues.php 中添加 bindingKeys 数组参数到注册的队列中,如下所示

Registrator::subscriber('reporting-pub-sub', 'App\Messaging\Handlers\ReportingMessageHandler', ['warning']);

请注意,由于它使用默认的 pub-sub 交换,因此不需要指定交换。

(ii)处理消息

如我们先前所见此处

(iii)运行消费者

从命令行使用 bowler:consume 命令。

php artisan bowler:consume reporting-pub-sub

Pub/Sub 实现旨在直接使用。如果要将所有发布的消息消费到给定的交换中,可以将消费者的绑定键数组设置为 ['*']

如果没有提供绑定键,则抛出 Vinelab\Bowler\Exception\InvalidSubscriberBindingException

如果您想手动配置,当然可以,通过设置生产者和消费者,如先前所述

签名
Registrator::subscriber($queue, $className, array $bindingKeys, $exchangeName = 'pub-sub', $exchangeType = 'topic');

调度器(工作队列)

类似于发布/订阅,但是您可以定义交换,消息将根据最不忙的消费者进行分发(见工作队列 - 公平分发)。

1. 分发

使用 routingKey 将消息发送到特定的交换,并通过匹配的 bindingKey(s) 进行消费。

// Initialize a Bowler object with the rabbitmq server ip and port
$connection = new Bowler\Connection();

// Initialize a Dispatcher object with a connection
$dispatcher = new Dispatcher($connection);

// Publish the message and set its required exchange name and routingKey
$dispatcher->dispatch('my-custom-exchange', 'warning', $data);
签名
dispatch($exchangeName, $routingKey, $data = null, $exchangeType = 'topic')

2. 消费

注册队列消费者与发布/订阅类似,但注册中交换的名称需要匹配。

// catch all the cows in the "farm" exchange
Registrator::subscriber('monitoring', 'App\Messaging\Handlers\MonitoringMessageHandler', [
    '*.cow.*',
], 'farm');

上述操作将捕获所有在 farm 交换中与路由键 *.cow.* 匹配的消息。

死信队列

死信队列完全是消费者的责任,也是其队列配置的一部分。在消费者上启用死信队列通过命令行使用与运行消费者相同的命令以及特定的可选参数,或者在上述选项数组中设置相应的可选参数完成。至少应指定 --deadLetterQueueName--deadLetterExchangeName 选项之一。

php artisan bowler:consume my_app_queue --deadLetterQueueName=my_app_dlq --deadLetterExchangeName=dlx --deadLetterExchangeType=direct --deadLetterRoutingKey=invalid --messageTTL=10000

如果仅设置了上述提到的某个可选参数,则第二个参数将默认为它,导致相同的 dlxdlq 名称。

如果您想避免使用死信队列,可以通过在队列的 MessageHandler::handleError() 中使用 $broker->rejectMessage(true) 来重新排队死消息,从而利用简化的行为。

错误处理

Bowler 中的错误处理仅限于应用程序异常。

Handler::handleError($e, $broker) 允许您对队列执行操作。是否确认、否定确认或拒绝消息取决于您。

不建议根据应用程序异常来更改 Rabbitmq 设置,例如,对于 InvalidInputException 来清空队列!在任何情况下,如果认为对于特定用例是必要的,则应谨慎使用,因为您将丢失所有队列中的消息,甚至更糟,您的交换。

而服务器异常将被抛出。未被 Bowler 包装的服务器错误将作为 Vinelab\Bowler\Exceptions\BowlerGeneralException 抛出。

错误报告

Bowler 支持应用程序级别的错误报告。

为此,通常位于 app\Exceptions\Handler 的默认 Laravel 异常处理器应实现 Vinelab\Bowler\Contracts\BowlerExceptionHandler

ExceptionHandler::reportQueue(Exception $e, AMQPMessage $msg) 允许您以您希望的方式报告错误。同时提供异常和队列消息本身,以提供最大灵活性。

健康检查

重要:需要安装管理插件才能执行健康检查。

根据此可靠性指南,Bowler 认为提供检查连接消费者健康状况的工具将是有益的,这通过 bowler:healthcheck:consumer 命令提供,该命令具有以下签名

bowler:healthcheck:consumer {queueName : The queue name}

示例: php artisan bowler:healthcheck:consumer the-queue

对于成功将返回退出代码 0,对于失败将返回 1 以及原因信息。

生命周期钩子

Bowler公开以下生命周期钩子

use Vinelab\Bowler\Facades\Message;

Message::beforePublish(function (AMQPMessage $msg, string $exchangeName, $routingKey = null) {
  return $msg;
});

Message::published(function (AMQPMessage $msg, string $exchangeName, $routingKey = null) {
  //
});

Message::beforeConsume(function (AMQPMessage $msg, string $queueName, string $handlerClass) {
  return $msg;
});

Message::consumed(function (AMQPMessage $msg, string $queueName, string $handlerClass, Ack $ack) {
  //
})

默认情况下,Bowler会记录并抑制回调内部发生的错误。您可以通过 bowler.lifecycle_hooks.fail_on_error 配置选项来配置此行为。

测试

如果您想禁用生产者/发布者,以限制它实际上向交换发送/发布消息,请将其绑定到模拟,本地在您的测试中或全局。

全局

App\Tests\TestCase 中使用 Vinelab\Bowler\Producer

将以下内容添加到 App\Tests\TestCase::createApplication()

$app->bind(Producer::class, function () {
    return $this->createMock(Producer::class);
});

重要说明

  1. 对于此包的用户来说,承担交换和队列之间的映射责任非常重要,并确保生产者和消费者侧的交换声明匹配,否则将抛出 Vinelab\Bowler\Exceptions\DeclarationMismatchException

  2. 此包不支持使用无名称的交换和队列。以后可能会重新考虑。

待办事项

  • 编写测试。
  • 成为框架无关。