dojiland/amqp-laravel

PHP RabbitMQ 客户端,适用于 Laravel 和 Lumen

v0.1.5 2021-04-13 07:18 UTC

This package is auto-updated.

Last update: 2024-09-13 15:23:20 UTC


README

基于 php-amqplib / php-amqplib,针对 Laravel 和 Lumen 框架封装 Rabbitmq 的调用方法,方便业务开发。

设置

首先在本地安装 composer,然后执行以下命令加载库

composer require dojiland/amqp-laravel

创建配置文件,执行生成 config/amqp.php 配置文件

php artisan amqp:init

.env 添加 AMQP 配置项

AMQP_HOST=localhost
AMQP_PORT=5672
AMQP_USER=guest
AMQP_PASSWORD=guest

Laravel

Laravel 版本 >= 5.5 将自动 package-discover,无需操作。

小于 5.5 的版本,请在 config/app.php 的 providers 数组中添加此内容

Dojiland\Amqp\AmqpServiceProvider::class

Lumen

bootstrap/app.php 中添加此内容

$app->register(Dojiland\Amqp\AmqpServiceProvider::class);

使用方法

注意:目前只支持使用 Fanout Exchange,默认配置使用消息持久化和 ACK。

注册消费异常回调

在某个初始化过程中,调用如下代码,实现自己的消费失败回调逻辑,例如在 app/Providers/AppServiceProviderboot 方法中增加 sentry 通知。

/**
* 含义说明
* $context = [
*    // amqp exchange
*    'exchange'  => '',
*    // amqp queue
*    'queue'     => '',
*    // amqp body (业务消息文本)
*    'payload'   => '',
* ]
*/
// amqp 消费异常上报
\Dojiland\Amqp\Events\AmqpEvent::registerConsumeFailedListener(function (\Throwable $e, array $context) {
    app('sentry')->captureException($e);
    // ...
});

发布

<?php
    /**
     * 单条信息发送
     * string $exchange:    RabbitMQ exchange 名称
     * array  $params:      一维参数数组
     * bool   $persistent:  消息持久化标识,默认true(持久化)
     */
    ...
    app('amqp')->publish(string $exchange, array $params, bool $persistent = true);
    ...


    /**
     * 批量消息发送
     * string $exchange:    RabbitMQ exchange 名称
     * array  $params:      二维参数数组
     * bool   $persistent:  消息持久化标识,默认true(持久化)
     */
    ...
    app('amqp')->batchPublish(string $exchange, array $params, bool $persistent = true);
    ...

订阅

创建指定订阅者,执行命令

php artisan make:amqp ExampleSubscribe

创建 app\MessageQueue\ExampleSubscribe.php 文件(如果 MessageQueue 文件夹不存在,则会默认生成)。

    /**
     * 订阅的exchange.
     * @var string
     */
    protected $exchange = '';

    /**
     * 订阅的queue.
     *
     * @var string
     */
    protected $queue = '';

    /**
     * 监听消息回调处理
     *
     * @return void
     */
    public function callback(AMQPMessage $msg)
    {
        echo $msg->body;
    }

填写指定的 exchangequeue,并在 callback 中添加消费逻辑。

amqp.php 中添加订阅对象配置项

    /*
    |--------------------------------------------------------------------------
    | 订阅消费类列表
    |--------------------------------------------------------------------------
    |
     */
    'subscribes'        =>  [
        \App\MessageQueue\ExampleSubscribe::class,
    ]

执行命令 php artisan amqp,启动消费进程。