waseetnet / laravel-rabitmq
在 Laravel 中使用 RabbitMQ。
dev-main
2023-09-12 10:38 UTC
Requires
- php: ^8.0
- ext-json: *
- php-amqplib/php-amqplib: v3.2
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.0
- orchestra/testbench: ^7.0
- phpunit/phpunit: ^9.4.0
- squizlabs/php_codesniffer: ^3.5
This package is not auto-updated.
Last update: 2024-09-25 14:09:00 UTC
README
RabbitMQ Laravel 包是 kunalvarma05 的原始 laravel-rabbitmq 包的分支。它提供与 Laravel 应用程序中的 RabbitMQ 消息队列系统的无缝集成。它允许您在应用程序的不同部分之间高效地交换消息,促进异步和分布式处理。
通过利用 RabbitMQ 的强大功能,您可以提高 Laravel 应用程序的性能和可扩展性。使用此包,您可以轻松配置交换、队列和绑定,并使用熟悉的 Laravel 语法和约定发布/消费消息。
特性
- 生产者
- 消费者
- 发布/订阅
- 交换
- 默认
- 直接
- 主题
- 扇出
要求
- PHP 8.1+
- Laravel 10.0+
设置
1. 安装
将存储库 URL 添加到项目的 composer.json
文件中
"repositories": [
{
"type": "vcs",
"url": "git@github.com:suiiz-app/laravel-rabitmq.git"
}
]
在相同的 composer.json
文件中,在 "require" 部分,添加包并指定所需版本
"require": {
"suiiz-app/laravel-rabitmq": "dev-main"
}
2. 默认配置
php artisan vendor:publish --tag=rabbitmq-config
打开 config/app.php 文件,并在 providers 部分添加以下行
Waseet\RabbitMQ\RabbitMQServiceProvider::class,
快速入门
初始化
有几种初始化库的方法
// A. Direct instantiation
$rabbitMQ = new RabbitMQManager(app());
// B. Binding
$rabbitMQ = app('rabbitmq');
// C. Dependency injection (Controller, Command, Job, etc.)
public function __consturct(RabbitMQManager $rabbitMQ) { ... }
// D. Facade
// All the public methods of the `RabbitMQManager` class
// are available through the `RabbitMQ` facade.
RabbitMQ::getConnections();
发布
$message = new RabbitMQMessage('message body');
// Publish to the default exchange/topic/queue
$rabbitMQ->publisher()->publish($message);
// Publish bulk messages
$messages = [new RabbitMQMessage('message 1'), new RabbitMQMessage('message 2')];
$rabbitMQ->publisher()->publish($messages);
消费
// A. Consume through a closure
$handler = new RabbitMQGenericMessageConsume(function (RabbitMQIncomingMessage $message) {
$content = $message->getStream();
});
// B. Consume through a class
class MyMessageConsumer extends RabbitMQMessageConsumer {
public function handle(RabbitMQIncomingMessage $message) {
$content = $message->getStream();
}
}
$handler = new MyMessageConsumer();
// Starts a blocking loop `while (true)`
$rabbitMQ->consumer()->consume($handler);
交互
// Resolve the default connection
// @see: AMQPSSLConnection https://github.com/php-amqplib/php-amqplib/blob/master/PhpAmqpLib/Connection/AMQPSSLConnection.php
$amqpConnection = $rabbitMQ->resolveConnection();
// Resolve the default channel
// @see: AMQPChannel https://github.com/php-amqplib/php-amqplib/blob/master/PhpAmqpLib/Channel/AMQPChannel.php
$amqpChannel = $rabbitMQ->resolveChannel();
配置
连接配置
$connectionName = 'custom_connection'; // Set to `null` for default connection
// Override the default connection config
$connectionConfig = new ConnectionConfig(['username' => 'quest', 'password' => 'quest']);
$connectionConfig->setHost('localhost');
$customConnection = $rabbitMQ->resolveConnection($connectionName, $connectionConfig);
消息配置
$config = [
'content_encoding' => 'UTF-8',
'content_type' => 'text/plain',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
];
$message = new RabbitMQMessage('message body', $config);
// Set message exchange
$exchangeConfig = ['type' => AMQPExchangeType::DIRECT];
$exchange = new RabbitMQExchange('my_exchange', $exchangeConfig);
$message->setExchange($exchange);
发布配置
$publisher = $rabbitMQ->publisher();
$message = new RabbitMQMessage('message body');
$exchangeConfig = ['type' => AMQPExchangeType::TOPIC];
$exchange = new RabbitMQExchange('my_exchange', $exchangeConfig);
$message->setExchange($exchange);
$routingKey = 'key'; // Can be an empty string, but not null
$connectionName = 'custom_connection'; // Set to null for default connection
// The publish config allows you to any override default configuration
//
// The following precendence works for the configuration:
// Message exchange config > Publish config > Connection config > Default config
//
// In this case, the exchange type used would be AMQPExchangeType::TOPIC
$publishConfig = new PublishConfig(['exchange' => ['type' => AMQPExchangeType::FANOUT]]);
$publisher->publish($message, $routingKey, $connectionName, $publishConfig);
消费者配置
$consumer = $rabbitMQ->consumer();
$routingKey = 'key';
$exchange = new RabbitMQExchange('test_exchange', ['declare' => true, 'durable' => true]);
$queue = new RabbitMQQueue('my_queue', ['declare' => true, 'durable' => true]);
$messageConsumer = new RabbitMQGenericMessageConsumer(
function (RabbitMQIncomingMessage $message) {
// Acknowledge a message
$message->getDelivery()->acknowledge();
// Reject a message
$requeue = true; // Reject and Requeue
$message->getDelivery()->reject($requeue);
},
$this,
);
// A1. Set the exchange and the queue directly
$messageConsumer
->setExchange($exchange)
->setQueue($queue);
// OR
// A2. Set the exchange and the queue through config
$consumeConfig = new ConsumeConfig(
[
'queue' => [
'name' => 'my_queue',
'declare' => true,
'durable' => true,
],
'exchange' => [
'name' => 'test_exchange',
'declare' => true,
],
],
);
$consumer->consume(
$messageConsumer,
$routingKey,
null,
$consumeConfig,
);
示例
运行消费者
- 创建自定义命令
php artisan make:command MyRabbitConsumer --command "rabbitmq:my-consumer {--queue=} {--exchange=} {--routingKey=}"
- 在
app/Console/Kernel.php
中注册命令
protected $commands = [
MyRabbitConsumer::class,
];
- 通过处理器消费
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Waseet\RabbitMQ\RabbitMQQueue;
use Waseet\RabbitMQ\RabbitMQExchange;
use Waseet\RabbitMQ\RabbitMQIncomingMessage;
use Waseet\RabbitMQ\RabbitMQGenericMessageConsumer;
class MyRabbitConsumer extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'rabbitmq:my-consumer {--queue} {--exchange} {--routingKey}';
/**
* The console command description.
*
* @var string
*/
protected $description = 'My consumer command';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
$rabbitMQ = app('rabbitmq');
$messageConsumer = new RabbitMQGenericMessageConsumer(
function (RabbitMQIncomingMessage $message) {
// Handle message
$this->info($message->getStream()); // Print to console
},
$this, // Scope the closure to the command
);
$routingKey = $this->option('routingKey') ?? '';
$queue = new RabbitMQQueue($this->option('queue') ?? '', ['declare' => true]);
$exchange = new RabbitMQExchange($this->option('exchange') ?? '', ['declare' => true]);
$messageConsumer
->setExchange($exchange)
->setQueue($queue);
$rabbitMQ->consumer()->consume($messageConsumer, $routingKey);
}
}
- 从控制台调用命令
php artisan rabbitmq:my-consumer --queue='my_queue' --exchange='test_exchange' --routingKey='key'
发布消息
- 创建路由与控制器绑定
Route::get('/publish', 'MyRabbitMQController@publish');
- 创建一个发布消息的控制器
class MyRabbitMQController extends Controller {
public function publish(Request $request)
{
$rabbitMQ = app('rabbitmq');
$consumer = $rabbitMQ->consumer();
$routingKey = 'key'; // The key used by the consumer
// The exchange (name) used by the consumer
$exchange = new RabbitMQExchange('test_exchange', ['declare' => true]);
$contents = $request->get('message', 'random message');
$message = new RabbitMQMessage($contents);
$message->setExchange($exchange);
$rabbitMQ->publisher()->publish(
$message,
$routingKey
);
return ['message' => "Published {$contents}"];
}
}
请求或浏览到:
https://:8000/publish?message=Hello
检查控制台是否打印了消息
Hello
测试
composer run-script test