avto-dev/amqp-rabbit-manager

RabbitMQ 管理器

v2.10.0 2024-04-25 11:14 UTC

This package is auto-updated.

Last update: 2024-08-25 12:03:52 UTC


README

Laravel

Laravel 应用程序的 RabbitMQ 管理器

Version PHP Version Build Status Coverage Downloads count License

此包可用于轻松访问 RabbitMQ 实体,如连接或队列。

需要安装 php 扩展 ext-amqp。安装步骤可以在 Dockerfile 中找到。

安装

使用以下命令通过 composer 安装此包

$ composer require avto-dev/amqp-rabbit-manager "^2.0"

需要安装 composer (如何安装 composer)。

您需要修复包的主要版本。

之后,您应使用以下命令“发布”包配置文件

$ php ./artisan vendor:publish --provider='AvtoDev\AmqpRabbitManager\ServiceProvider'

并在文件 ./config/rabbitmq.php 中进行配置。

用法

首先,您应执行命令 rabbit:setup 以在 RabbitMQ 服务器上创建所有队列和交换。

然后,在应用程序的任何部分,您都可以解析连接或队列/交换工厂。例如,在 artisan 命令中

<?php

namespace App\Console\Commands;

use AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface;

class SomeCommand extends \Illuminate\Console\Command
{
    /**
     * The console command name.
     *
     * @var string
     */
    protected $name = 'some:command';

    /**
     * Execute the console command.
     *
     * @param ConnectionsFactoryInterface $connections
     *
     * @return void
     */
    public function handle(ConnectionsFactoryInterface $connections): void
    {
        $connections->default(); // Get the default RabbitMQ connection instance
    }
}

手动创建队列

声明队列操作在代理端创建队列(使用命令 rabbit:setup 代替此操作)

<?php

/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */

$exchange = $connections
    ->default()
    ->declareQueue($queues->make('some-queue-id'));

手动创建交换

声明交换操作在代理端创建主题(使用命令 rabbit:setup 代替此操作)

<?php

/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\ExchangesFactoryInterface $exchanges */

$exchange = $connections
    ->default()
    ->declareTopic($exchanges->make('some-exchange-id'));

将队列绑定到交换

将队列连接到交换。因此,来自该主题的消息将进入队列并可以被处理(使用命令 rabbit:setup 事件 \AvtoDev\AmqpRabbitManager\Commands\Events\* 代替此操作)

<?php

/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */
/** @var \AvtoDev\AmqpRabbitManager\ExchangesFactoryInterface $exchanges */

$connections
    ->default()
    ->bind(new \Interop\Amqp\Impl\AmqpBind(
        $exchanges->make('some-exchange-id'),
        $queues->make('some-queue-id')
    ));

向交换发送消息

创建消息并将其发送到交换

<?php

/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\ExchangesFactoryInterface $exchanges */

$connection = $connections->default();
$message    = $connection->createMessage('Hello world!');

$connection
    ->createProducer()
    ->send($exchanges->make('some-exchange-id'), $message);

向队列发送消息

创建消息并将其发送到队列

<?php

/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */

$connection = $connections->default();
$message    = $connection->createMessage('Hello world!');

$connection
    ->createProducer()
    ->send($queues->make('some-queue-id'), $message);

发送优先消息

消息优先级用于消息排序

<?php

/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */

$connection = $connections->default();
$message    = $connection->createMessage('Hello world!');

$connection
    ->createProducer()
    ->setPriority(10)
    // ...
    ->send($queues->make('some-queue-id'), $message);

发送过期消息

也称为消息 TTL

<?php

/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */

$connection = $connections->default();
$message    = $connection->createMessage('Hello world!');

$connection
    ->createProducer()
    ->setTimeToLive(60000) // 60 sec
    // ...
    ->send($queues->make('some-queue-id'), $message);

发送延迟消息

如果可能,请避免使用 enqueue/amqp-tools 延迟策略。如果您手动操作,您可以完全控制它。

获取(消费)单个消息

获取一条消息并继续脚本执行

<?php

/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */

$consumer = $connections->default()->createConsumer($queues->make('some-queue-id'));

$message = $consumer->receive();

try {
    // .. process a message ..

    $consumer->acknowledge($message);
} catch (\Exception $e) {
    // .. process exception ..

    $consumer->reject($message);
}

订阅消费者

启动(几乎是)无限循环以处理消息(您可以在同一时间启动多个消费者,只需调用即可)

<?php

/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */

$connection = $connections->default();
$queue      = $queues->make('some-queue-id');
$consumer   = $connection->createConsumer($queue);
$subscriber = $connection->createSubscriptionConsumer();

$subscriber->subscribe(
    $consumer,
    function(\Interop\Amqp\AmqpMessage $message, \Enqueue\AmqpExt\AmqpConsumer $consumer): bool {
        try {
            // .. process a message ..

            $consumer->acknowledge($message);
        } catch (\Exception $e) {
            // .. process exception ..

            $consumer->reject($message);

            return false; // Subscription will be cancelled
        }

        return true; // Subscription will be continued
    }
);

$subscriber->consume(); // You can pass timeout in milliseconds

清除队列消息

移除队列中的所有消息

<?php

/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */

$connection = $connections->default();

$connection->purgeQueue($queues->make('some-queue-id'));

测试

对于包测试,我们使用 phpunit 框架和 docker-ce + docker-compose 作为开发环境。因此,在克隆存储库后,只需在终端中写入

$ make build
$ make latest # or 'make lowest'
$ make test

变更日志

Release date Commits since latest release

变更日志可以在 此处找到

支持

Issues Issues

如果您在此包中发现任何错误,请在此存储库中 创建一个问题

许可

这是一个开源软件,受 MIT 许可证 许可。