medeiroz/laravel-amqp-toolkit

Laravel AMQP Toolkit 用于发布和消费 RabbitMQ 消息

1.1.2 2024-07-11 20:43 UTC

This package is auto-updated.

Last update: 2024-09-11 21:02:32 UTC


README

Latest Version on Packagist GitHub Tests Action Status GitHub Code Style Action Status Total Downloads

此包旨在简化 Laravel 应用程序与 RabbitMQ 的集成,提供消费和发布消息的功能,同时还通过架构迁移的方式(灵感来源于 Laravel 的数据库迁移)提供了一种简单的方式来管理 AMQP / RabbitMQ 基础设施。

如果您需要从 RabbitMQ 队列中消费消息、在交换中发布消息或管理 RabbitMQ 基础设施,此包适合您。

如果您在管理 RabbitMQ 架构(包括队列、交换和推土机)方面遇到问题,此包也适合您。

主要功能

  1. 架构迁移: 使用架构迁移来创建、删除和管理 RabbitMQ 中的队列、交换和推土机,类似于 Laravel 为数据库提供的服务。
  2. 消费队列: 以简单高效的方式从 RabbitMQ 队列中消费消息的能力。
  3. 发布消息: 在交换中或直接在 RabbitMQ 队列中发布消息的能力。
  4. 事件监听器: 监听从 AMQP / RabbitMQ 队列和交换接收到的消息的能力,类似于 Laravel 的事件监听器。

要求

  • PHP >= 8.1
  • Laravel >= 10

安装

按照以下步骤安装包

  1. 使用 composer 安装 medeiroz/laravel-amqp-toolkit
  2. 发布并运行迁移
  3. 发布配置文件
  4. 环境变量 .env
  5. 配置 AMQP 队列和 Laravel 监听器

1. 安装包

运行以下命令通过 composer 安装包

composer require medeiroz/laravel-amqp-toolkit

2. 发布迁移

您必须使用以下命令发布数据库迁移

php artisan vendor:publish --tag="amqp-toolkit-migrations"

然后运行以下命令在数据库中创建表。

php artisan migrate

3. 发布配置文件

您可以使用以下命令发布配置文件

php artisan vendor:publish --tag="amqp-toolkit-config"

这是已发布的配置文件的内容

<?php

// config for Medeiroz/AmqpToolkit

return [
    'schemas' => base_path('amqp-toolkit-schemas'),
    'table_name' => env('AMQP_TABLE_NAME', 'amqp_schemas'),
    'max-attempts' => env('AMQP_MAX_ATTEMPTS', 10),
    'heartbeat' => env('AMQP_HEARTBEAT', 30),
    'keepalive' => env('AMQP_KEEPALIVE', true),

    /**
     * The default connection to use when no connection is provided to the AMQP client.
     */
    'connection' => env('AMQP_CONNECTION', 'rabbitmq'),

    /**
     * The default logging channel to use when no channel is provided to the AMQP client.
     * You can use the same channels as the Laravel logging configuration
     * Like as 'stack', 'single', 'daily' etc...
     */
    'logging-channel' => env('AMQP_LOG_CHANNEL', env('LOG_CHANNEL')),

    /**
     * The queues to be consumed by the consumer command without arguments.
     */
    'consumer-queues' => [
        // 'payment-received' => \App\Listeners\PaymentReceivedListener::class,
    ],

    'connections' => [
        'rabbitmq' => [
            'host' => env('AMQP_HOST', 'localhost'),
            'port' => env('AMQP_PORT', 5672),
            'api-port' => env('AMQP_API_PORT', 15672),
            'user' => env('AMQP_USER', 'guest'),
            'password' => env('AMQP_PASSWORD', ''),
            'vhost' => env('AMQP_VHOST', '/'),
        ],
    ],
];

4. 环境变量 .env

编辑 .env 文件并添加您的 AMQP / RabbitMQ 服务器环境变量。

AMQP_HOST=your-amqp-host
AMQP_PORT=5672
AMQP_API_PORT=15672
AMQP_USER=user
AMQP_PASSWORD=password
AMQP_VHOST=/

请根据您的环境替换值。

5. 配置 AMQP 队列和 Laravel 监听器

编辑 config/amqp-toolkit.php 文件并添加您想要消费的队列和当收到消息时将被执行的监听器。

数组键是队列名称,值是当收到消息时将被执行的监听器类。

示例:将监听器 PaymentReceivedListener 绑定到队列 payment-received

    /**
     * The queues to be consumed by the consumer command without arguments.
     */
    'consumer-queues' => [
        'payment-received' => \App\Listeners\PaymentReceivedListener::class,
    ],

请参阅配置文件以获取更多详细信息。

使用方法

架构迁移

允许的架构类型包括

  • 队列
  • 交换
  • 推土机

创建架构

php artisan amqp:make-schema {type} {name}

示例

php artisan amqp:make-schema queue my-first-queue
php artisan amqp:make-schema exchange my-exchange
php artisan amqp:make-schema shovel my-shovel

运行迁移

php artisan amqp:migrate 

回滚迁移

php artisan amqp:migrate --rollback --step=1

回滚所有迁移

php artisan amqp:migrate --refresh

向队列或交换发布消息

use Medeiroz\AmqpToolkit\Facades\AmqpPublisher;

AmqpPublisher::onQueue(['say' => 'Hello World'], 'my-queue-name');
AmqpPublisher::onExchange(['say' => 'Hello World'], 'my-exchange-name');
AmqpPublisher::onExchange(['say' => 'Hello World with routing key'], 'my-exchange-name', 'my-routing-key');

开始消费 AMQP / RabbitMQ 队列

运行所有队列的消费者

要开始消费所有队列,必须运行以下 artisan 命令

php artisan amqp:consumer

要开始消费特定队列,必须运行以下 artisan 命令

其中 my-first-queuepayment-received 是您想要消费的队列名称。

php artisan amqp:consumer my-first-queue payment-received

监听消息

本包提供了一种监听从 AMQP / RabbitMQ 队列和交换机接收到的消息的方法,类似于 Laravel 的事件监听器。

1. 自动监听注册

当在 config/amqp-toolkit.php 文件中配置了队列和监听器时,监听器将在执行消费者时自动注册。

2. 手动监听注册

编辑 app/Providers/EventServiceProvider.php 文件并添加您想监听的事件。

事件名称应为 amqp.QUEUE_NAME,其中 QUEUE_NAME 是您想监听的队列名称。

app/Providers/EventServiceProvider.php

public function boot() {
    Event::listen(
        'amqp:payment-received',
        \App\Listeners\PaymentReceivedListener::class,
    );
    
    Event::listen(
        'amqp:my-queue',
        \App\Listeners\MyQueueListener::class,
    );
    
    Event::listen(
        'amqp:*',
        \App\Listeners\AllListener::class,
    );
}

注意:amqp:* 事件是一个特殊的事件,它监听所有队列接收到的消息。只有当消费者正在执行时才会调用队列事件。

创建事件监听器

要为要监听的事件创建监听器,请运行以下命令

php artisan make:listener PaymentReceivedListener

此包提供了一个事件对象 Medeiroz\AmqpToolkit\Events\AmqpReceivedMessageEvent,它包含队列名称和消息体。

<?php

namespace App\Listeners;

use Illuminate\Contracts\Queue\ShouldQueue;
use Medeiroz\AmqpToolkit\Events\AmqpReceivedMessageEvent;


class PaymentReceivedListener
{
    public function handle(AmqpReceivedMessageEvent $event): void
    {
        \Log::debug('Queue' . $event->queue);
        \Log::debug('Message Body', $event->messageBody);
    }
}

如果您想使用 Laravel Horizon 异步执行事件,可以使用 ShouldQueue 接口。

<?php

namespace App\Listeners;

use Illuminate\Contracts\Queue\ShouldQueue;
use Medeiroz\AmqpToolkit\Events\AmqpReceivedMessageEvent;


class PaymentReceivedListener implements ShouldQueue
{ /* ... */ }

有关 AMQP 迁移架构的更多信息

创建交换机架构

使用以下命令创建一个新的交换机架构

php artisan amqp:make-schema exchange my-exchange

生成的文件

<?php

use Medeiroz\AmqpToolkit\SchemaMigration\SchemaMigration;

return new class extends SchemaMigration
{
    private const NAME = 'my-exchange';

    public function up(): void
    {
        $this->createExchangeIfNonExists(self::NAME);
    }

    public function down(): void
    {
        $this->deleteExchangeIfExists(self::NAME);
    }
};

创建队列架构

使用以下命令创建一个新的队列架构

php artisan amqp:make-schema queue payment-received

生成的文件

<?php


use Medeiroz\AmqpToolkit\SchemaMigration\SchemaMigration;

return new class extends SchemaMigration
{
    private const NAME = 'payment-received';

    public function up(): void
    {
        $this->createQueueIfNonExists(self::NAME)
            ->withRetry()
            ->withTtl(seconds: 5)
            ->withDlq();
    }

    public function down(): void
    {
        $this->deleteQueueIfExists(self::NAME);
        $this->deleteQueueIfExists(self::NAME . '.retry');
        $this->deleteQueueIfExists(self::NAME . '.dlq');
    }
};

此架构创建了一个名为 payment-received 的队列,具有重试、TTL 和 DLQ。RabbitMQ 将自动创建 payment-received.retrypayment-received.dlq 队列。

您可以通过调用 bind 方法将队列绑定到交换机。

// ...
    public function up(): void
    {
        $this->createQueue('my-queue')
            ->bind('my-exchange', 'my-route-key');
    }
    
    // or
    
    public function up(): void
    {
        $this->createQueue('my-queue');

        $this->bind('my-queue', 'my-exchange', 'my-route-key');
    }

创建 Shovel 架构

使用以下命令创建一个新的 Shovel 架构

php artisan amqp:make-schema shovel my-shovel

生成的文件

<?php

use Medeiroz\AmqpToolkit\SchemaMigration\SchemaMigration;
use Medeiroz\AmqpToolkit\SchemaMigration\Shovel\Resource0dot9;
use Medeiroz\AmqpToolkit\SchemaMigration\Shovel\Resource1dot0;

return new class extends SchemaMigration
{
    private const NAME = 'my-shovel';

    public function up(): void
    {
        $this->createShovelIfNonExists(
            name: self::NAME,
            source: new Resource0dot9(
                type: 'queue',
                uri: 'amqp://',
                queue: 'my-queue-on-amqp-0-9',
                autoDelete: 'never',
                addForwardingHeaders: 'No',
            ),
            destination: new Resource1dot0(
                uri: 'amqps://user:password@my-host.servicebus.windows.net:5671/?verify=verify_none',
                address: 'my-topic-on-service-bus',
            ),
        );
    }

    public function down(): void
    {
        $this->deleteShovelIfExists(self::NAME);
    }
};

您可以使用 createShovelcreateShovelIfNonExists 方法创建 Shovel,传入 Shovel 名称、源和目标资源。

sourcedestination 是表示消息 broker 资源的对象。

可用资源

  • Resource0dot9
    • AMQP 0-9-1 的消息代理资源,与 RabbitMQ、Qpid 等相同。
  • Resource1dot0
    • AMQP 1.0 的消息代理资源,与 Azure Service Bus、ActiveMQ 等相同。

例如,您创建一个 Shovel 从 Broker A 将消息移动到 Broker B。

  • 将消息从 RabbitMQ 移动到 Azure Service Bus
  • 将消息从 Azure Service Bus 移动到 RabbitMQ
  • 将 AWS Cloud 中的 RabbitMQ 消息移动到 Azure Cloud 中的 RabbitMQ
  • 将 Azure Service Bus 中的消息移动到 Azure Service Bus 中的另一个订阅

有关 Shovel 配置的更多信息,请参阅 RabbitMQ Shovel 文档

注意:Shovel 架构仅适用于 RabbitMQ 3.8.0 或更高版本。

架构迁移中所有可用的方法

  • createQueue(string $name): Medeiroz\AmqpToolkit\SchemaMigration\Queue
    • 创建一个新的队列
  • createQueueIfNonExists(string $name): Medeiroz\AmqpToolkit\SchemaMigration\Queue
    • 如果不存在则创建一个新的队列
  • deleteQueue(string $name): void
    • 删除一个队列
  • deleteQueueIfExists(string $name): void
    • 如果存在则删除一个队列
  • createExchange(string $name): Medeiroz\AmqpToolkit\SchemaMigration\Exchange
    • 创建一个新的交换机
  • createExchangeIfNonExists(string $name): Medeiroz\AmqpToolkit\SchemaMigration\Exchange
    • 如果不存在则创建一个新的交换机
  • deleteExchange(string $name): void
    • 删除一个交换机
  • deleteExchangeIfExists(string $name): void
    • 如果存在则删除一个交换机
  • bind(string $queue, string $exchange, ?string $routingKey = null): Medeiroz\AmqpToolkit\SchemaMigration\Bind
    • 使用路由键将队列绑定到交换机
  • bindIfExist(string $queue, string $exchange, ?string $routingKey = null): Medeiroz\AmqpToolkit\SchemaMigration\Bind
    • 如果队列存在,则将其绑定到具有路由键的交换机
  • unbind(string $queue, string $exchange, ?string $routingKey = null): void
    • 解除队列与交换机及其路由键的绑定
  • unbindIfExists(string $queue, string $exchange, ?string $routingKey = null): void
    • 如果队列存在,则解除队列与交换机及其路由键的绑定
  • createShovel(string $name): Medeiroz\AmqpToolkit\SchemaMigration\Shovel
    • 创建一个新的shovel
  • createShovelIfNonExists(string $name): Medeiroz\AmqpToolkit\SchemaMigration\Shovel
    • 如果shovel不存在,则创建一个新的shovel
  • deleteShovel(string $name): void
    • 删除一个shovel
  • deleteShovelIfExists(string $name): void
    • 如果shovel存在,则删除它

关于Shovel的更多信息

shovel是一个功能,允许你在不同的代理之间移动消息,例如,从RabbitMQ到Azure Service Bus,从Azure Service Bus到RabbitMQ等。

如果你的消息源是Azure Service Bus,而你使用PHP/Laravel编写应用程序代码,你不能直接从Azure Service Bus消费消息。你可以创建一个shovel,将消息从Azure Service Bus移动到RabbitMQ,然后从RabbitMQ消费消息。

Azure Service Bus转发消息到RabbitMQ

你需要为你的shovel创建特定的资源,例如,你下面有这些资源

  • 在Azure Service Bus上
    • 订阅A (源)
  • 在RabbitMQ上
    • 交换机 (目标)
    • 队列A(供你的PHP应用程序消费消息)
    • Shovel "My First Shovel"

RabbitMQ转发消息到Azure Service Bus

你需要为你的shovel创建特定的资源,例如,你下面有这些资源

  • 在RabbitMQ上
    • 交换机
    • 队列D (源)
    • Shovel "My Second Shovel"
  • 在Azure Service Bus上
    • 主题 (目标)

测试

composer test

变更日志

有关最近更改的更多信息,请参阅CHANGELOG

贡献

有关详细信息,请参阅CONTRIBUTING

安全漏洞

有关如何报告安全漏洞的详细信息,请参阅我们的安全策略

鸣谢

许可

MIT许可(MIT)。有关更多信息,请参阅许可文件