Rabbitmq 的 Laravel 封装器

dev-master 2018-03-26 01:10 UTC

This package is not auto-updated.

Last update: 2024-10-02 05:20:39 UTC


README

A Laravel 包,使用 Rabbitmq 服务器实现 AMQP 协议,简单高效。基于 php-amqplib 构建,旨在提供一个简单的抽象层。

Bowler 允许您

  • 自定义消息发布。
  • 自定义消息消费。
  • 自定义消息死信处理。
  • 处理应用程序错误并相应地处理消息。
  • 提供可表达性强的消费者队列设置。
  • 从命令行注册队列并生成其消息处理器。
  • 使用默认的 Pub/Sub 消息。

除了上述功能外,Bowler 还提供有限的行政管理功能。

这些功能将极大地改变您使用 Rabbitmq 的方式,并扩展其功能。此包不打算接管用户设计消息架构的责任。

如 Rabbitmq 的 管理 插件之类的工具,无疑将帮助您监控服务器活动并可视化设置。

安装

Composer

{
    "require": {
        "universaltechnology/bowler"
    }
}

用法

配置

为了配置 rabbitmq 主机、端口、用户名和密码,在 config/queue.php 文件中的 connections 数组内添加以下内容

'rabbitmq' => [
            'host' => 'host',
            'port' => port,
            'username' => 'username',
            'password' => 'password',
        ],

并通过将 UniversalTechnology\Bowler\BowlerServiceProvider::class 添加到 config/app 文件中的 providers 数组来注册服务提供者。

生产者

为了能够发送消息,需要创建一个生产者实例并设置一个交换机。

// Initialize a Bowler Connection
$connection = new UniversalTechnology\Bowler\Connection();

// Initialize a Producer object with a connection
$bowlerProducer = new UniversalTechnology\Bowler\Producer($connection);

// Setup the producer's exchange name and other optional parameters: exchange type, passive, durable, auto delete and delivery mode
$bowlerProducer->setup('reporting_exchange', 'direct', false, true, false, 2);

// Send a message with an optional routingKey
$bowlerProducer->send($data, 'warning');

或注入生产者并让 IOC 解决连接

use UniversalTechnology\Bowler\Producer;

class DoSomethingJob extends Job
{
    protected $data;

    public function __construct($data)
    {
        $this->data = $data;
    }

    public function handle(Producer $producer)
    {
        $producer->setup('reporting_exchange');

        $producer->send(json_encode($this->data));
    }
}

确保此处设置的交换机与消费者的设置匹配,否则会抛出 UniversalTechnology\Bowler\Exceptions\DeclarationMismatchException

如果您错误地设置了未定义的值,例如设置交换机时 $exchangeType='noneExistingType',则会抛出 UniversalTechnology\Bowler\Exceptions\InvalidSetupException

消费者

'Registrator' => UniversalTechnology\Bowler\Facades\Registrator::class, 添加到 config/app 中的 aliases 数组。

为了消费消息,需要设置一个交换机和队列,并创建一个消息处理器。

可以通过手动或从命令行配置消费者。

手动
  1. queues.php 文件中注册您的队列和处理程序(将队列文件视为 Laravel 的路由文件),注意 queues.php 文件应在 App\Messaging 目录下。

    Registrator::queue('books', 'App\Messaging\Handlers\BookHandler', []);
    
    Registrator::queue('reporting', 'App\Messaging\Handlers\ErrorReportingHandler', [
                                                            'exchangeName' => 'main_exchange',
                                                            'exchangeType'=> 'direct',
                                                            'bindingKeys' => [
                                                                'warning',
                                                                'notification'
                                                            ],
                                                            'pasive' => false,
                                                            'durable' => true,
                                                            'autoDelete' => false,
                                                            'deadLetterQueueName' => 'dlx_queue',
                                                            'deadLetterExchangeName' => 'dlx',
                                                            'deadLetterExchangeType' => 'direct',
                                                            'deadLetterRoutingKey' => 'warning',
                                                            'messageTTL' => null
                                                        ]);

    使用选项数组设置您的队列和交换机。所有这些都是可选的,默认值将应用于此处未指定的任何参数。这些参数的描述和默认值将在本文件的后面提供。

  2. 创建处理接收到的消息的处理程序类

    //This is an example handler class
    
    namespace App\Messaging\Handlers;
    
    class AuthorHandler {
    
    	public function handle($msg)
    	{
    		echo "Author: ".$msg->body;
    	}
    
        public function handleError($e, $broker)
        {
            if($e instanceof InvalidInputException) {
                $broker->rejectMessage();
            } elseif($e instanceof WhatEverException) {
                $broker->ackMessage();
            } elseif($e instanceof WhatElseException) {
                $broker->nackMessage();
            } else {
                $msg = $borker->getMessage();
                if($msg->body) {
                    //
                }
            }
        }
    }

    类似于上面,也为消费者的处理程序提供了额外的功能,如 deleteExchangepurgeQueuedeleteQueue。请明智地使用这些功能,并利用 unusedempty 参数。请记住,不建议通过操作服务器的设置来处理应用程序异常。

控制台

使用php artisan bowler:make:queue analytics_queue AnalyticsData注册队列和处理器。

上一个命令

  1. Registrator::queue('analytics_queue', 'App\Messaging\Handlers\AnalyticsDataHandler', []);添加到App\Messaging\queues.php

    如果没有提供交换机名称,则默认使用队列名称。

    如果指定了选项数组,它将覆盖通过命令行设置的任何参数。此外,它还充当设置参考。

  2. App\Messaging\Handlers目录中创建App\Messaging\Handlers\AnalyticsDataHandler.php

现在,为了监听任何队列,请在您的控制台中运行以下命令:php artisan bowler:consume analytics_queue。您需要指定队列名称和任何其他可选参数(如果适用)。

bowler:consume完整参数列表说明

bowler:consume
queueName : The queue NAME
--N|exchangeName : The exchange NAME. Defaults to queueName.
--T|exchangeType : The exchange TYPE. Supported exchanges: fanout, direct, topic. Defaults to fanout.
--K|bindingKeys : The consumer\'s BINDING KEYS array.
--p|passive : If set, the server will reply with Declare-Ok if the exchange and queue already exists with the same name, and raise an error if not. Defaults to 0.
--d|durable : Mark exchange and queue as DURABLE. Defaults to 1.
--D|autoDelete : Set exchange and queue to AUTO DELETE when all queues and consumers, respectively have finished using it. Defaults to 0.
--deadLetterQueueName : The dead letter queue NAME. Defaults to deadLetterExchangeName.
--deadLetterExchangeName : The dead letter exchange NAME. Defaults to deadLetterQueueName.
--deadLetterExchangeType : The dead letter exchange TYPE. Supported exchanges: fanout, direct, topic. Defaults to fanout.
--deadLetterRoutingKey : The dead letter ROUTING KEY.
--messageTTL : If set, specifies how long, in milliseconds, before a message is declared dead letter.

尝试消费未注册的队列将抛出UniversalTechnology\Bowler\Exceptions\UnregisteredQueueException

如果您想根据消息发布的路由键处理消息,可以在处理器的handle方法中使用switch case,如下所示

public function handle($msg)
{
    switch ($msg->delivery_info['routing_key']) {
        case 'key 1': //do something
            break;
        case 'key 2': //do something else
            break;
    }
}

发布/订阅

Bowler提供了一个默认的发布/订阅实现,用户无需关心设置。

简而言之,使用routingKey发布,使用匹配的bindingKeys消费。

1. 发布消息

在您的生产者中

// Initialize a Bowler object with the rabbitmq server ip and port
$connection = new Bowler\Connection();

// Initialize a Pubisher object with a connection and a routingKey
$bowlerPublisher = new Publisher($connection);

// Publish the message and set its required routingKey
$bowlerPublisher->publish('warning', $data);

或者像这里[这里](### 生产者)所看到的,以类似的方式注入发布者。

如你所见,这里我们实例化了一个Publisher而不是一个Producer对象。发布者是生产者规范,它包含默认的发布/订阅交换设置。

2. 消费消息

在您的消费者中

(i)注册队列并生成其消息处理器

在您的消费者中;从命令行使用bowler:make:subscriber命令。

php artisan bowler:make:subscriber reporting ReportingMessage --expressive

使用--expressive-E选项将使队列名称反映它是用于发布/订阅的。结果生成队列名称为reporting-pub-sub;否则将使用提供的队列名称。

(ii)将bindingKeys数组参数添加到已注册的队列中,如下所示

Registrator::subscriber('reporting-pub-sub', 'App\Messaging\Handlers\ReportingMessageHandler', ['warning']);
(iii)处理消息

像我们之前[这里](##### 手动)看到的。

(iv)运行队列

从命令行使用bowler:consume命令。

php artisan bowler:consume reporting-pub-sub

发布/订阅实现旨在直接使用。您可以将消费者的bindingKeys数组设置为['*']以消费所有发布的消息。

如果没有提供bindingKeys,则抛出UniversalTechnology\Bowler\Exception\InvalidSubscriberBindingException

如果您想手动进行配置,您可以像之前[这里](## 使用)解释的那样设置生产者和消费者。

测试

如果您想静音生产者/发布者以限制其不向交换发送/发布消息,可以在测试中将其绑定到模拟,本地或全局。

全局:在App\Tests\TestCase中使用UniversalTechnology\Bowler\Publisher

将以下内容添加到App\Tests\TestCase::createApplication()

$app->bind(Publisher::class, function () {
    return $this->createMock(Publisher::class);
});

死信队列

死信队列完全是消费者的责任,也是其队列配置的一部分。在消费者上启用死信队列是通过使用运行消费者的相同命令并使用专用可选参数或在之前提到的选项数组中设置相应的可选参数来完成的。应指定至少一个--deadLetterQueueName--deadLetterExchangeName选项。

php artisan bowler:consume my_app_queue --deadLetterQueueName=my_app_dlq --deadLetterExchangeName=dlx --deadLetterExchangeType=direct --deadLetterRoutingKey=invalid --messageTTL=10000

如果只设置了其中一个可选参数,则第二个将默认为它。导致相同的dlxdlq名称。

错误处理

Bowler中的错误处理仅限于应用程序异常。

Handler::handleError($e, $broker)允许您对队列执行操作。是否确认、否定确认或拒绝消息取决于您。

不建议在响应应用程序异常的情况下修改Rabbitmq配置,例如,对于InvalidInputException异常清除队列!在任何情况下,如果认为对于使用场景有必要,应谨慎使用,因为您将丢失所有队列中的消息,甚至更糟,您的交换机。

虽然会抛出服务器异常。Bowler未包装的服务器错误将作为UniversalTechnology\Bowler\Exceptions\BowlerGeneralException抛出。

错误报告

Bowler支持应用级别的错误报告。

为此,通常位于app\Exceptions\Handler中的默认Laravel异常处理器应实现UniversalTechnology\Bowler\Contracts\BowlerExceptionHandler。显然,实现其方法。

ExceptionHandler::reportQueue(Exception $e, AMQPMessage $msg)允许您按需报告错误。同时提供异常和队列消息本身,以提供最大灵活性。

重要提示

  1. 对于此包的用户来说,承担交换机和队列之间的映射责任是最重要的。并确保交换机的声明在生产者和消费者端匹配,否则将抛出UniversalTechnology\Bowler\Exceptions\DeclarationMismatchException

  2. 此包不支持使用无名的交换机和队列。以后可能重新考虑。

待办事项

  • 编写测试。
  • 成为无框架的。