anik/laravel-amqp

anik/amqp 为 Laravel 类框架提供的包装器

v2.0 2024-03-17 16:44 UTC

This package is auto-updated.

Last update: 2024-09-20 14:22:19 UTC


README

anik/laravel-amqp codecov PHP 版本要求 最新稳定版本 总下载量 Jetbrains

anik/amqp 为 Laravel 类框架提供的包装器。

示例

查看 仓库 以获取示例。

文档

安装

要安装此包,运行

composer require anik/laravel-amqp

Laravel

Anik\Laravel\Amqp\Providers\AmqpServiceProvider::class 服务提供者应自动注册。如果没有,则可以在您的 config/app.php 提供者数组中手动添加服务提供者

'providers' => [
    // ... 
    Anik\Laravel\Amqp\Providers\AmqpServiceProvider::class,
]
  • 使用 php artisan vendor:publish --provider "Anik\Laravel\Amqp\Providers\AmqpServiceProvider" 命令发布配置。

Lumen

  • bootstrap/app.php 文件中注册 Anik\Laravel\Amqp\Providers\AmqpServiceProvider::class 服务提供者。
$app->register(Anik\Laravel\Amqp\Providers\AmqpServiceProvider::class);
  • 将配置 amqp.phpvendor/anik/laravel-amqp/src/config/amqp.php 复制到您的配置目录。

  • bootstrap/app.php 中使用 $app->configure('amqp'); 导入您的配置。

Laravel Zero

  • config/app.php 提供者数组中注册 Anik\Laravel\Amqp\Providers\AmqpServiceProvider::class 服务提供者。
'providers' => [
    /// ... 
    Anik\Laravel\Amqp\Providers\AmqpServiceProvider::class,
]
  • 将配置 amqp.phpvendor/anik/laravel-amqp/src/config/amqp.php 复制到您的配置目录。

配置

在您的 config/amqp.php 中,您可以定义多个连接,并通过指向连接名称来在代码中使用它们。

  • amqp.default 表示默认连接。如果没有指定连接,则会在生产或消费消息时使用。
  • amqp.connections.*.connection.class 表示要使用的底层 Amqp 连接。默认情况下,它使用延迟连接。您可以将其更改为 PhpAmqpLib\Connection\AbstractConnection 的任何实现。
  • amqp.connections.*.connection.hosts 可以有多个主机配置。每个主机配置必须包含 hostportuserpassword 键。它还可以包含可选的 vhost。延迟连接不能有多个主机配置,否则将引发错误。
  • 您还可以通过 amqp.connections.*.connection.options 在内部创建 amqp.connections.*.connection.class 实例时传递可选的参数数组。
  • amqp.connections.*.message 持有发布时消息的默认属性。
  • amqp.connections.*.exchange 持有发布和消费时交换的默认属性。
  • amqp.connections.*.queue 存储了在消费时队列的默认属性。
  • amqp.connections.*.consumer 存储了在消费时消费者的默认属性。
  • amqp.connections.*.qos 存储了在消费时QoS的默认属性。

Octane 支持

此包默认支持laravel octane。为了保持AMQP连接活跃,您需要将octane配置为预热连接,通过将'amqp'添加到octane配置中的预热数组中来实现。保持AMQP连接活跃

// config/octane.php
// ... 
'warm' => [
    // ... 
    'amqp', // <-- this line
],

用法

以下内容功能相同。

use Anik\Amqp\ConsumableMessage;
use Anik\Laravel\Amqp\Facades\Amqp;

$messages = 'my message';
// $messages = ['my first message', 'my second message'];
// $messages = new Anik\Amqp\ProducibleMessage('my message');
// $messages = ['another message', new Anik\Amqp\ProducibleMessage('also another message')];

Amqp::publish($messages); // publishes to default connection
Amqp::connection('rabbitmq')->publish($messages); // publishes to rabbitmq connection

app('amqp')->publish($messages); // publishes to default connection
app('amqp')->connection('rabbitmq')->publish($messages); // publishes to rabbitmq connection

app()->make('amqp')->publish($messages); // publishes to default connection
app()->make('amqp')->connection('rabbitmq')->publish($messages); // publishes to rabbitmq connection

/** @var \Anik\Laravel\Amqp\AmqpManager $amqpManager */
$amqpManager->publish($messages); // publishes to default connection
$amqpManager->connection('rabbitmq')->publish($messages); // publishes to rabbitmq connection


Amqp::consume(function(ConsumableMessage $message) {
    var_dump($message->getMessageBody());
    $message->ack();
}); // consumes from default connection
Amqp::connection('rabbitmq')->consume(function(ConsumableMessage $message) {
    var_dump($message->getMessageBody());
    $message->ack();
}); // consumes from rabbitmq connection

app('amqp')->consume(function(ConsumableMessage $message) {
    var_dump($message->getMessageBody());
    $message->ack();
}); // consumes from default connection
app('amqp')->connection('rabbitmq')->consume(function(ConsumableMessage $message) {
    var_dump($message->getMessageBody());
    $message->ack();
}); // consumes from rabbitmq connection

app()->make('amqp')->consume(function(ConsumableMessage $message) {
    var_dump($message->getMessageBody());
    $message->ack();
}); // consumes from default connection
app()->make('amqp')->connection('rabbitmq')->consume(function(ConsumableMessage $message) {
    var_dump($message->getMessageBody());
    $message->ack();
}); // consumes from rabbitmq connection

/** @var \Anik\Laravel\Amqp\AmqpManager $amqpManager */
$amqpManager->consume(function(ConsumableMessage $message) {
    var_dump($message->getMessageBody());
    $message->ack();
}); // consumes from default connection
$amqpManager->connection('rabbitmq')->consume(function(ConsumableMessage $message) {
    var_dump($message->getMessageBody());
    $message->ack();
}); // consumes from rabbitmq connection

注意

在此文档中,之后将使用 FACADE。如果您正在使用 Lumen,则可以使用其他方法。该包 不要求启用FACADE

发布消息

要发布消息,

use Anik\Laravel\Amqp\Facades\Amqp;

Amqp::publish($messages, $routingKey, $exchange, $options);
Amqp::connection('rabbitmq')->publish($messages, $routingKey, $exchange, $options);
  • $messages 类型:mixed必需。它可以是单个消息,也可以是任何标量类型或实现 Anik\Amqp\Producible 的消息数组。
  • $routingKey 类型:string可选。默认:''(空字符串)。
  • $exchange 类型:null | Anik\Amqp\Exchanges\Exchange可选。默认:null
  • $options 类型:array可选。默认:[]
    • message - 接受:array。有效的属性为 PhpAmqpLib\Message\AMQPMessage
    • exchange - 接受:array。参阅 amqp.connections.*.exchange
    • publish - 接受:array。参阅 Anik\Amqp\Producer::publishBatch

注意

  • 如果 $messages 中的任何消息都不是 Anik\Amqp\Producible 的实现,则该消息将使用 Anik\Amqp\ProducibleMessage 转换为 Anik\Amqp\Producible
  • 在转换为 Anik\Amqp\Producible 时,它将尝试使用 $options['message'] 作为消息属性。如果没有设置,则尝试使用可用的 amqp.connections.*.message 属性。
  • 如果 $exchange 设置为 null,则检查 $options['exchange'] 是否已设置。如果没有设置,则尝试使用可用的 amqp.connections.*.exchange 属性。
  • 如果 $options['publish'] 未设置,则尝试使用可用的 amqp.connections.*.publish 属性。

消费消息

要消费消息,

use Anik\Laravel\Amqp\Facades\Amqp;

Amqp::consume($handler, $bindingKey, $exchange, $queue, $qos , $options);
Amqp::connection('rabbitmq')->consume($handler, $bindingKey, $exchange, $queue, $qos , $options);
  • $handler 类型:callable | Anik\Amqp\Consumable必需
  • $bindingKey 类型:string可选。默认:''(空字符串)。
  • $exchange 类型:null | Anik\Amqp\Exchanges\Exchange可选。默认:null
  • $queue 类型:null | Anik\Amqp\Queues\Queue可选。默认:null
  • $qos 类型:null | Anik\Amqp\Qos\Qos可选。默认:null
  • $options 类型:array可选。默认:[]
    • exchange - 接受:array。参阅 amqp.connections.*.exchange
    • queue - 接受:array。参阅 amqp.connections.*.queue
    • qos - 接受:array。参阅 amqp.connections.*.qos
    • consumer - 接受:array。参阅 amqp.connections.*.consumer
    • bind - 接受:array。参阅 Anik\Amqp\Consumer::consume

注意

  • 如果 $handler 不是 Anik\Amqp\Consumable 的实现,则使用 Anik\Amqp\ConsumableMessage 将处理程序转换为 Anik\Amqp\Consumable
  • 如果 $exchange 设置为 null,则检查 $options['exchange'] 是否已设置。如果没有设置,则尝试使用可用的 amqp.connections.*.exchange 属性。
  • 如果 $queue 设置为 null,则检查 $options['queue'] 是否已设置。如果没有设置,则使用可用的 amqp.connections.*.queue 属性。
  • 如果 $qos 设置为 null,则检查 $options['qos'] 是否已设置。如果没有设置,则使用 amqp.connections.*.qos 属性,前提是 amqp.connections.*.qos.enabled 设置为 值。
  • 如果 $options['bind'] 未设置,则使用可用的 amqp.connections.*.bind 属性。
  • 如果 $options['consumer'] 未设置,则使用可用的 amqp.connections.*.consumer 属性。

测试

该包允许断言一些场景。在运行这些断言之前,您需要使用 Amqp::fake()

<?php

use Anik\Laravel\Amqp\Facades\Amqp;
use PHPUnit\Framework\TestCase;

class MyTest extends TestCase 
{
    public function testIfMessageWasProduced () {
        Amqp::fake();
        // ... Your code
        
        Amqp::assertPublished();
        // Amqp::assertPublished("my-message");
        // Amqp::assertPublishedCount(5, "my-message");
        // Amqp::assertPublished(Anik\Amqp\ProducibleMessage::class);
        // Amqp::assertPublished(Anik\Amqp\Producible::class);
        Amqp::assertPublishedOnConnection('rabbitmq');
    }
}
  • Anik\Laravel\Amqp\Facades\Amqp::assertPublishedOnConnection(string $name) - 检查是否至少有一条消息在连接 $name 上已发布。
  • Anik\Laravel\Amqp\Facades\Amqp::assertPublishedOnExchange(string $name) - 检查是否至少有一条消息在交换机 $name 上已发布。
  • Anik\Laravel\Amqp\Facades\Amqp::assertPublishedOnExchangeType(string $type) - 检查是否至少有一条消息在交换机类型 $type 上已发布。
  • Anik\Laravel\Amqp\Facades\Amqp::assertPublishedWithRoutingKey(string $key) - 检查是否至少有一条消息使用路由键 $key 已发布。
  • Anik\Laravel\Amqp\Facades\Amqp::assertPublished($message = null)
    • 如果 $messagenull,则检查是否至少有一条消息已发布。
    • 否则,按照以下顺序进行检查。
      • 如果消息与 $message 完全匹配。
      • 如果消息与 get_class($message) 完全匹配。
      • 如果消息是 $message 的实现。
  • Anik\Laravel\Amqp\Facades\Amqp::assertNotPublished($message = null)
    • 如果 $messagenull,则检查是否没有消息已发布。
    • 否则,按照以下顺序进行检查。
      • 没有消息与 $message 完全匹配。
      • 没有消息与 get_class($message) 完全匹配。
      • 没有消息是 $message 的实现。
  • Anik\Laravel\Amqp\Facades\Amqp::assertPublishedCount(int $count, $message = null)
    • 如果 $messagenull,则检查是否恰好发布了 $count 条消息。
    • 否则,按照以下顺序进行检查。
      • 如果消息与 $message 完全匹配。
      • 如果消息与 get_class($message) 完全匹配。
      • 如果消息是 $message 的实现。

注意

Anik\Laravel\Amqp\Facades\Amqp::fake() 之后使用 Anik\Laravel\Amqp\Facades\Amqp::consume() 将抛出异常。