anik / laravel-amqp
anik/amqp 为 Laravel 类框架提供的包装器
v2.0
2024-03-17 16:44 UTC
Requires
- php: ^8.1
- anik/amqp: ^2.0
- illuminate/support: ^10.0|^11.0
Requires (Dev)
- orchestra/testbench: ^8.0|^9.0
- phpunit/phpunit: ^10.0
README
anik/laravel-amqp

anik/amqp 为 Laravel 类框架提供的包装器。
示例
查看 仓库 以获取示例。
文档
安装
要安装此包,运行
composer require anik/laravel-amqp
Laravel
Anik\Laravel\Amqp\Providers\AmqpServiceProvider::class
服务提供者应自动注册。如果没有,则可以在您的 config/app.php
提供者数组中手动添加服务提供者
'providers' => [ // ... Anik\Laravel\Amqp\Providers\AmqpServiceProvider::class, ]
- 使用
php artisan vendor:publish --provider "Anik\Laravel\Amqp\Providers\AmqpServiceProvider"
命令发布配置。
Lumen
- 在
bootstrap/app.php
文件中注册Anik\Laravel\Amqp\Providers\AmqpServiceProvider::class
服务提供者。
$app->register(Anik\Laravel\Amqp\Providers\AmqpServiceProvider::class);
-
将配置
amqp.php
从vendor/anik/laravel-amqp/src/config/amqp.php
复制到您的配置目录。 -
在
bootstrap/app.php
中使用$app->configure('amqp');
导入您的配置。
Laravel Zero
- 在
config/app.php
提供者数组中注册Anik\Laravel\Amqp\Providers\AmqpServiceProvider::class
服务提供者。
'providers' => [ /// ... Anik\Laravel\Amqp\Providers\AmqpServiceProvider::class, ]
- 将配置
amqp.php
从vendor/anik/laravel-amqp/src/config/amqp.php
复制到您的配置目录。
配置
在您的 config/amqp.php
中,您可以定义多个连接,并通过指向连接名称来在代码中使用它们。
amqp.default
表示默认连接。如果没有指定连接,则会在生产或消费消息时使用。amqp.connections.*.connection.class
表示要使用的底层 Amqp 连接。默认情况下,它使用延迟连接。您可以将其更改为PhpAmqpLib\Connection\AbstractConnection
的任何实现。amqp.connections.*.connection.hosts
可以有多个主机配置。每个主机配置必须包含host
、port
、user
、password
键。它还可以包含可选的vhost
。延迟连接不能有多个主机配置,否则将引发错误。- 您还可以通过
amqp.connections.*.connection.options
在内部创建amqp.connections.*.connection.class
实例时传递可选的参数数组。 amqp.connections.*.message
持有发布时消息的默认属性。amqp.connections.*.exchange
持有发布和消费时交换的默认属性。amqp.connections.*.queue
存储了在消费时队列的默认属性。amqp.connections.*.consumer
存储了在消费时消费者的默认属性。amqp.connections.*.qos
存储了在消费时QoS的默认属性。
Octane 支持
此包默认支持laravel octane。为了保持AMQP连接活跃,您需要将octane配置为预热
连接,通过将'amqp'添加到octane配置中的预热数组中来实现。保持AMQP连接活跃
// config/octane.php // ... 'warm' => [ // ... 'amqp', // <-- this line ],
用法
以下内容功能相同。
use Anik\Amqp\ConsumableMessage; use Anik\Laravel\Amqp\Facades\Amqp; $messages = 'my message'; // $messages = ['my first message', 'my second message']; // $messages = new Anik\Amqp\ProducibleMessage('my message'); // $messages = ['another message', new Anik\Amqp\ProducibleMessage('also another message')]; Amqp::publish($messages); // publishes to default connection Amqp::connection('rabbitmq')->publish($messages); // publishes to rabbitmq connection app('amqp')->publish($messages); // publishes to default connection app('amqp')->connection('rabbitmq')->publish($messages); // publishes to rabbitmq connection app()->make('amqp')->publish($messages); // publishes to default connection app()->make('amqp')->connection('rabbitmq')->publish($messages); // publishes to rabbitmq connection /** @var \Anik\Laravel\Amqp\AmqpManager $amqpManager */ $amqpManager->publish($messages); // publishes to default connection $amqpManager->connection('rabbitmq')->publish($messages); // publishes to rabbitmq connection Amqp::consume(function(ConsumableMessage $message) { var_dump($message->getMessageBody()); $message->ack(); }); // consumes from default connection Amqp::connection('rabbitmq')->consume(function(ConsumableMessage $message) { var_dump($message->getMessageBody()); $message->ack(); }); // consumes from rabbitmq connection app('amqp')->consume(function(ConsumableMessage $message) { var_dump($message->getMessageBody()); $message->ack(); }); // consumes from default connection app('amqp')->connection('rabbitmq')->consume(function(ConsumableMessage $message) { var_dump($message->getMessageBody()); $message->ack(); }); // consumes from rabbitmq connection app()->make('amqp')->consume(function(ConsumableMessage $message) { var_dump($message->getMessageBody()); $message->ack(); }); // consumes from default connection app()->make('amqp')->connection('rabbitmq')->consume(function(ConsumableMessage $message) { var_dump($message->getMessageBody()); $message->ack(); }); // consumes from rabbitmq connection /** @var \Anik\Laravel\Amqp\AmqpManager $amqpManager */ $amqpManager->consume(function(ConsumableMessage $message) { var_dump($message->getMessageBody()); $message->ack(); }); // consumes from default connection $amqpManager->connection('rabbitmq')->consume(function(ConsumableMessage $message) { var_dump($message->getMessageBody()); $message->ack(); }); // consumes from rabbitmq connection
注意
在此文档中,之后将使用 FACADE。如果您正在使用 Lumen,则可以使用其他方法。该包 不要求启用FACADE。
发布消息
要发布消息,
use Anik\Laravel\Amqp\Facades\Amqp; Amqp::publish($messages, $routingKey, $exchange, $options); Amqp::connection('rabbitmq')->publish($messages, $routingKey, $exchange, $options);
$messages
类型:mixed
。 必需。它可以是单个消息,也可以是任何标量类型或实现Anik\Amqp\Producible
的消息数组。$routingKey
类型:string
。 可选。默认:''
(空字符串)。$exchange
类型:null | Anik\Amqp\Exchanges\Exchange
。 可选。默认:null
。$options
类型:array
。 可选。默认:[]
。- 键
message
- 接受:array
。有效的属性为PhpAmqpLib\Message\AMQPMessage
。 - 键
exchange
- 接受:array
。参阅amqp.connections.*.exchange
。 - 键
publish
- 接受:array
。参阅Anik\Amqp\Producer::publishBatch
- 键
注意
- 如果
$messages
中的任何消息都不是Anik\Amqp\Producible
的实现,则该消息将使用Anik\Amqp\ProducibleMessage
转换为Anik\Amqp\Producible
。 - 在转换为
Anik\Amqp\Producible
时,它将尝试使用$options['message']
作为消息属性。如果没有设置,则尝试使用可用的amqp.connections.*.message
属性。 - 如果
$exchange
设置为null
,则检查$options['exchange']
是否已设置。如果没有设置,则尝试使用可用的amqp.connections.*.exchange
属性。 - 如果
$options['publish']
未设置,则尝试使用可用的amqp.connections.*.publish
属性。
消费消息
要消费消息,
use Anik\Laravel\Amqp\Facades\Amqp; Amqp::consume($handler, $bindingKey, $exchange, $queue, $qos , $options); Amqp::connection('rabbitmq')->consume($handler, $bindingKey, $exchange, $queue, $qos , $options);
$handler
类型:callable | Anik\Amqp\Consumable
。 必需。$bindingKey
类型:string
。 可选。默认:''
(空字符串)。$exchange
类型:null | Anik\Amqp\Exchanges\Exchange
。 可选。默认:null
。$queue
类型:null | Anik\Amqp\Queues\Queue
。 可选。默认:null
。$qos
类型:null | Anik\Amqp\Qos\Qos
。 可选。默认:null
。$options
类型:array
。 可选。默认:[]
。- 键
exchange
- 接受:array
。参阅amqp.connections.*.exchange
。 - 键
queue
- 接受:array
。参阅amqp.connections.*.queue
。 - 键
qos
- 接受:array
。参阅amqp.connections.*.qos
。 - 键
consumer
- 接受:array
。参阅amqp.connections.*.consumer
。 - 键
bind
- 接受:array
。参阅Anik\Amqp\Consumer::consume
- 键
注意
- 如果
$handler
不是Anik\Amqp\Consumable
的实现,则使用Anik\Amqp\ConsumableMessage
将处理程序转换为Anik\Amqp\Consumable
。 - 如果
$exchange
设置为null
,则检查$options['exchange']
是否已设置。如果没有设置,则尝试使用可用的amqp.connections.*.exchange
属性。 - 如果
$queue
设置为null
,则检查$options['queue']
是否已设置。如果没有设置,则使用可用的amqp.connections.*.queue
属性。 - 如果
$qos
设置为null
,则检查$options['qos']
是否已设置。如果没有设置,则使用amqp.connections.*.qos
属性,前提是amqp.connections.*.qos.enabled
设置为 真 值。 - 如果
$options['bind']
未设置,则使用可用的amqp.connections.*.bind
属性。 - 如果
$options['consumer']
未设置,则使用可用的amqp.connections.*.consumer
属性。
测试
该包允许断言一些场景。在运行这些断言之前,您需要使用 Amqp::fake()
。
<?php use Anik\Laravel\Amqp\Facades\Amqp; use PHPUnit\Framework\TestCase; class MyTest extends TestCase { public function testIfMessageWasProduced () { Amqp::fake(); // ... Your code Amqp::assertPublished(); // Amqp::assertPublished("my-message"); // Amqp::assertPublishedCount(5, "my-message"); // Amqp::assertPublished(Anik\Amqp\ProducibleMessage::class); // Amqp::assertPublished(Anik\Amqp\Producible::class); Amqp::assertPublishedOnConnection('rabbitmq'); } }
Anik\Laravel\Amqp\Facades\Amqp::assertPublishedOnConnection(string $name)
- 检查是否至少有一条消息在连接$name
上已发布。Anik\Laravel\Amqp\Facades\Amqp::assertPublishedOnExchange(string $name)
- 检查是否至少有一条消息在交换机$name
上已发布。Anik\Laravel\Amqp\Facades\Amqp::assertPublishedOnExchangeType(string $type)
- 检查是否至少有一条消息在交换机类型$type
上已发布。Anik\Laravel\Amqp\Facades\Amqp::assertPublishedWithRoutingKey(string $key)
- 检查是否至少有一条消息使用路由键$key
已发布。Anik\Laravel\Amqp\Facades\Amqp::assertPublished($message = null)
- 如果
$message
是null
,则检查是否至少有一条消息已发布。 - 否则,按照以下顺序进行检查。
- 如果消息与
$message
完全匹配。 - 如果消息与
get_class($message)
完全匹配。 - 如果消息是
$message
的实现。
- 如果消息与
- 如果
Anik\Laravel\Amqp\Facades\Amqp::assertNotPublished($message = null)
- 如果
$message
是null
,则检查是否没有消息已发布。 - 否则,按照以下顺序进行检查。
- 没有消息与
$message
完全匹配。 - 没有消息与
get_class($message)
完全匹配。 - 没有消息是
$message
的实现。
- 没有消息与
- 如果
Anik\Laravel\Amqp\Facades\Amqp::assertPublishedCount(int $count, $message = null)
- 如果
$message
是null
,则检查是否恰好发布了$count
条消息。 - 否则,按照以下顺序进行检查。
- 如果消息与
$message
完全匹配。 - 如果消息与
get_class($message)
完全匹配。 - 如果消息是
$message
的实现。
- 如果消息与
- 如果
注意
在 Anik\Laravel\Amqp\Facades\Amqp::fake()
之后使用 Anik\Laravel\Amqp\Facades\Amqp::consume()
将抛出异常。