anik / laravel-amqp
anik/amqp 为 Laravel 类框架提供的包装器
v2.0
2024-03-17 16:44 UTC
Requires
- php: ^8.1
- anik/amqp: ^2.0
- illuminate/support: ^10.0|^11.0
Requires (Dev)
- orchestra/testbench: ^8.0|^9.0
- phpunit/phpunit: ^10.0
README
anik/laravel-amqp

anik/amqp 为 Laravel 类框架提供的包装器。
示例
查看 仓库 以获取示例。
文档
安装
要安装此包,运行
composer require anik/laravel-amqp
Laravel
Anik\Laravel\Amqp\Providers\AmqpServiceProvider::class 服务提供者应自动注册。如果没有,则可以在您的 config/app.php 提供者数组中手动添加服务提供者
'providers' => [ // ... Anik\Laravel\Amqp\Providers\AmqpServiceProvider::class, ]
- 使用
php artisan vendor:publish --provider "Anik\Laravel\Amqp\Providers\AmqpServiceProvider"命令发布配置。
Lumen
- 在
bootstrap/app.php文件中注册Anik\Laravel\Amqp\Providers\AmqpServiceProvider::class服务提供者。
$app->register(Anik\Laravel\Amqp\Providers\AmqpServiceProvider::class);
-
将配置
amqp.php从vendor/anik/laravel-amqp/src/config/amqp.php复制到您的配置目录。 -
在
bootstrap/app.php中使用$app->configure('amqp');导入您的配置。
Laravel Zero
- 在
config/app.php提供者数组中注册Anik\Laravel\Amqp\Providers\AmqpServiceProvider::class服务提供者。
'providers' => [ /// ... Anik\Laravel\Amqp\Providers\AmqpServiceProvider::class, ]
- 将配置
amqp.php从vendor/anik/laravel-amqp/src/config/amqp.php复制到您的配置目录。
配置
在您的 config/amqp.php 中,您可以定义多个连接,并通过指向连接名称来在代码中使用它们。
amqp.default表示默认连接。如果没有指定连接,则会在生产或消费消息时使用。amqp.connections.*.connection.class表示要使用的底层 Amqp 连接。默认情况下,它使用延迟连接。您可以将其更改为PhpAmqpLib\Connection\AbstractConnection的任何实现。amqp.connections.*.connection.hosts可以有多个主机配置。每个主机配置必须包含host、port、user、password键。它还可以包含可选的vhost。延迟连接不能有多个主机配置,否则将引发错误。- 您还可以通过
amqp.connections.*.connection.options在内部创建amqp.connections.*.connection.class实例时传递可选的参数数组。 amqp.connections.*.message持有发布时消息的默认属性。amqp.connections.*.exchange持有发布和消费时交换的默认属性。amqp.connections.*.queue存储了在消费时队列的默认属性。amqp.connections.*.consumer存储了在消费时消费者的默认属性。amqp.connections.*.qos存储了在消费时QoS的默认属性。
Octane 支持
此包默认支持laravel octane。为了保持AMQP连接活跃,您需要将octane配置为预热连接,通过将'amqp'添加到octane配置中的预热数组中来实现。保持AMQP连接活跃
// config/octane.php // ... 'warm' => [ // ... 'amqp', // <-- this line ],
用法
以下内容功能相同。
use Anik\Amqp\ConsumableMessage; use Anik\Laravel\Amqp\Facades\Amqp; $messages = 'my message'; // $messages = ['my first message', 'my second message']; // $messages = new Anik\Amqp\ProducibleMessage('my message'); // $messages = ['another message', new Anik\Amqp\ProducibleMessage('also another message')]; Amqp::publish($messages); // publishes to default connection Amqp::connection('rabbitmq')->publish($messages); // publishes to rabbitmq connection app('amqp')->publish($messages); // publishes to default connection app('amqp')->connection('rabbitmq')->publish($messages); // publishes to rabbitmq connection app()->make('amqp')->publish($messages); // publishes to default connection app()->make('amqp')->connection('rabbitmq')->publish($messages); // publishes to rabbitmq connection /** @var \Anik\Laravel\Amqp\AmqpManager $amqpManager */ $amqpManager->publish($messages); // publishes to default connection $amqpManager->connection('rabbitmq')->publish($messages); // publishes to rabbitmq connection Amqp::consume(function(ConsumableMessage $message) { var_dump($message->getMessageBody()); $message->ack(); }); // consumes from default connection Amqp::connection('rabbitmq')->consume(function(ConsumableMessage $message) { var_dump($message->getMessageBody()); $message->ack(); }); // consumes from rabbitmq connection app('amqp')->consume(function(ConsumableMessage $message) { var_dump($message->getMessageBody()); $message->ack(); }); // consumes from default connection app('amqp')->connection('rabbitmq')->consume(function(ConsumableMessage $message) { var_dump($message->getMessageBody()); $message->ack(); }); // consumes from rabbitmq connection app()->make('amqp')->consume(function(ConsumableMessage $message) { var_dump($message->getMessageBody()); $message->ack(); }); // consumes from default connection app()->make('amqp')->connection('rabbitmq')->consume(function(ConsumableMessage $message) { var_dump($message->getMessageBody()); $message->ack(); }); // consumes from rabbitmq connection /** @var \Anik\Laravel\Amqp\AmqpManager $amqpManager */ $amqpManager->consume(function(ConsumableMessage $message) { var_dump($message->getMessageBody()); $message->ack(); }); // consumes from default connection $amqpManager->connection('rabbitmq')->consume(function(ConsumableMessage $message) { var_dump($message->getMessageBody()); $message->ack(); }); // consumes from rabbitmq connection
注意
在此文档中,之后将使用 FACADE。如果您正在使用 Lumen,则可以使用其他方法。该包 不要求启用FACADE。
发布消息
要发布消息,
use Anik\Laravel\Amqp\Facades\Amqp; Amqp::publish($messages, $routingKey, $exchange, $options); Amqp::connection('rabbitmq')->publish($messages, $routingKey, $exchange, $options);
$messages类型:mixed。 必需。它可以是单个消息,也可以是任何标量类型或实现Anik\Amqp\Producible的消息数组。$routingKey类型:string。 可选。默认:''(空字符串)。$exchange类型:null | Anik\Amqp\Exchanges\Exchange。 可选。默认:null。$options类型:array。 可选。默认:[]。- 键
message- 接受:array。有效的属性为PhpAmqpLib\Message\AMQPMessage。 - 键
exchange- 接受:array。参阅amqp.connections.*.exchange。 - 键
publish- 接受:array。参阅Anik\Amqp\Producer::publishBatch
- 键
注意
- 如果
$messages中的任何消息都不是Anik\Amqp\Producible的实现,则该消息将使用Anik\Amqp\ProducibleMessage转换为Anik\Amqp\Producible。 - 在转换为
Anik\Amqp\Producible时,它将尝试使用$options['message']作为消息属性。如果没有设置,则尝试使用可用的amqp.connections.*.message属性。 - 如果
$exchange设置为null,则检查$options['exchange']是否已设置。如果没有设置,则尝试使用可用的amqp.connections.*.exchange属性。 - 如果
$options['publish']未设置,则尝试使用可用的amqp.connections.*.publish属性。
消费消息
要消费消息,
use Anik\Laravel\Amqp\Facades\Amqp; Amqp::consume($handler, $bindingKey, $exchange, $queue, $qos , $options); Amqp::connection('rabbitmq')->consume($handler, $bindingKey, $exchange, $queue, $qos , $options);
$handler类型:callable | Anik\Amqp\Consumable。 必需。$bindingKey类型:string。 可选。默认:''(空字符串)。$exchange类型:null | Anik\Amqp\Exchanges\Exchange。 可选。默认:null。$queue类型:null | Anik\Amqp\Queues\Queue。 可选。默认:null。$qos类型:null | Anik\Amqp\Qos\Qos。 可选。默认:null。$options类型:array。 可选。默认:[]。- 键
exchange- 接受:array。参阅amqp.connections.*.exchange。 - 键
queue- 接受:array。参阅amqp.connections.*.queue。 - 键
qos- 接受:array。参阅amqp.connections.*.qos。 - 键
consumer- 接受:array。参阅amqp.connections.*.consumer。 - 键
bind- 接受:array。参阅Anik\Amqp\Consumer::consume
- 键
注意
- 如果
$handler不是Anik\Amqp\Consumable的实现,则使用Anik\Amqp\ConsumableMessage将处理程序转换为Anik\Amqp\Consumable。 - 如果
$exchange设置为null,则检查$options['exchange']是否已设置。如果没有设置,则尝试使用可用的amqp.connections.*.exchange属性。 - 如果
$queue设置为null,则检查$options['queue']是否已设置。如果没有设置,则使用可用的amqp.connections.*.queue属性。 - 如果
$qos设置为null,则检查$options['qos']是否已设置。如果没有设置,则使用amqp.connections.*.qos属性,前提是amqp.connections.*.qos.enabled设置为 真 值。 - 如果
$options['bind']未设置,则使用可用的amqp.connections.*.bind属性。 - 如果
$options['consumer']未设置,则使用可用的amqp.connections.*.consumer属性。
测试
该包允许断言一些场景。在运行这些断言之前,您需要使用 Amqp::fake()。
<?php use Anik\Laravel\Amqp\Facades\Amqp; use PHPUnit\Framework\TestCase; class MyTest extends TestCase { public function testIfMessageWasProduced () { Amqp::fake(); // ... Your code Amqp::assertPublished(); // Amqp::assertPublished("my-message"); // Amqp::assertPublishedCount(5, "my-message"); // Amqp::assertPublished(Anik\Amqp\ProducibleMessage::class); // Amqp::assertPublished(Anik\Amqp\Producible::class); Amqp::assertPublishedOnConnection('rabbitmq'); } }
Anik\Laravel\Amqp\Facades\Amqp::assertPublishedOnConnection(string $name)- 检查是否至少有一条消息在连接$name上已发布。Anik\Laravel\Amqp\Facades\Amqp::assertPublishedOnExchange(string $name)- 检查是否至少有一条消息在交换机$name上已发布。Anik\Laravel\Amqp\Facades\Amqp::assertPublishedOnExchangeType(string $type)- 检查是否至少有一条消息在交换机类型$type上已发布。Anik\Laravel\Amqp\Facades\Amqp::assertPublishedWithRoutingKey(string $key)- 检查是否至少有一条消息使用路由键$key已发布。Anik\Laravel\Amqp\Facades\Amqp::assertPublished($message = null)- 如果
$message是null,则检查是否至少有一条消息已发布。 - 否则,按照以下顺序进行检查。
- 如果消息与
$message完全匹配。 - 如果消息与
get_class($message)完全匹配。 - 如果消息是
$message的实现。
- 如果消息与
- 如果
Anik\Laravel\Amqp\Facades\Amqp::assertNotPublished($message = null)- 如果
$message是null,则检查是否没有消息已发布。 - 否则,按照以下顺序进行检查。
- 没有消息与
$message完全匹配。 - 没有消息与
get_class($message)完全匹配。 - 没有消息是
$message的实现。
- 没有消息与
- 如果
Anik\Laravel\Amqp\Facades\Amqp::assertPublishedCount(int $count, $message = null)- 如果
$message是null,则检查是否恰好发布了$count条消息。 - 否则,按照以下顺序进行检查。
- 如果消息与
$message完全匹配。 - 如果消息与
get_class($message)完全匹配。 - 如果消息是
$message的实现。
- 如果消息与
- 如果
注意
在 Anik\Laravel\Amqp\Facades\Amqp::fake() 之后使用 Anik\Laravel\Amqp\Facades\Amqp::consume() 将抛出异常。