kunalvarma05 / laravel-rabbitmq
在 Laravel 中与 RabbitMQ 一起工作。
1.5.0
2023-12-11 05:48 UTC
Requires
- php: ^8.1
- ext-json: *
- php-amqplib/php-amqplib: v3.6
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.0
- orchestra/testbench: ^8.0
- phpunit/phpunit: ^9.4.0
- squizlabs/php_codesniffer: ^3.5
README
一个易于使用的 Laravel 扩展包,用于处理 RabbitMQ。
功能
- 生产者
- 消费者
- 发布/订阅
- 交换机
- 默认
- 直接
- 主题
- 扇出
需求
- PHP 7.4+
- Laravel 6.0+
设置
1. 安装
composer require kunalvarma05/laravel-rabbitmq
2. 默认配置
php artisan vendor:publish --tag=config
快速开始
初始化
有几种初始化库的方式
// A. Direct instantiation $rabbitMQ = new RabbitMQManager(app()); // B. Binding $rabbitMQ = app('rabbitmq'); // C. Dependency injection (Controller, Command, Job, etc.) public function __consturct(RabbitMQManager $rabbitMQ) { ... } // D. Facade // All the public methods of the `RabbitMQManager` class // are available through the `RabbitMQ` facade. RabbitMQ::getConnections();
发布
$message = new RabbitMQMessage('message body'); // Publish to the default exchange/topic/queue $rabbitMQ->publisher()->publish($message); // Publish bulk messages $messages = [new RabbitMQMessage('message 1'), new RabbitMQMessage('message 2')]; $rabbitMQ->publisher()->publish($messages);
消费
// A. Consume through a closure $handler = new RabbitMQGenericMessageConsume(function (RabbitMQIncomingMessage $message) { $content = $message->getStream(); }); // B. Consume through a class class MyMessageConsumer extends RabbitMQMessageConsumer { public function handle(RabbitMQIncomingMessage $message) { $content = $message->getStream(); } } $handler = new MyMessageConsumer(); // Starts a blocking loop `while (true)` $rabbitMQ->consumer()->consume($handler);
交互
// Resolve the default connection // @see: AMQPSSLConnection https://github.com/php-amqplib/php-amqplib/blob/master/PhpAmqpLib/Connection/AMQPSSLConnection.php $amqpConnection = $rabbitMQ->resolveConnection(); // Resolve the default channel // @see: AMQPChannel https://github.com/php-amqplib/php-amqplib/blob/master/PhpAmqpLib/Channel/AMQPChannel.php $amqpChannel = $rabbitMQ->resolveChannel();
配置
连接配置
$connectionName = 'custom_connection'; // Set to `null` for default connection // Override the default connection config $connectionConfig = new ConnectionConfig(['username' => 'quest', 'password' => 'quest']); $connectionConfig->setHost('localhost'); $customConnection = $rabbitMQ->resolveConnection($connectionName, $connectionConfig);
消息配置
$config = [ 'content_encoding' => 'UTF-8', 'content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, ]; $message = new RabbitMQMessage('message body', $config); // Set message exchange $exchangeConfig = ['type' => AMQPExchangeType::DIRECT]; $exchange = new RabbitMQExchange('my_exchange', $exchangeConfig); $message->setExchange($exchange);
发布配置
$publisher = $rabbitMQ->publisher(); $message = new RabbitMQMessage('message body'); $exchangeConfig = ['type' => AMQPExchangeType::TOPIC]; $exchange = new RabbitMQExchange('my_exchange', $exchangeConfig); $message->setExchange($exchange); $routingKey = 'key'; // Can be an empty string, but not null $connectionName = 'custom_connection'; // Set to null for default connection // The publish config allows you to any override default configuration // // The following precendence works for the configuration: // Message exchange config > Publish config > Connection config > Default config // // In this case, the exchange type used would be AMQPExchangeType::TOPIC $publishConfig = new PublishConfig(['exchange' => ['type' => AMQPExchangeType::FANOUT]]); $publisher->publish($message, $routingKey, $connectionName, $publishConfig);
消费者配置
$consumer = $rabbitMQ->consumer(); $routingKey = 'key'; $exchange = new RabbitMQExchange('test_exchange', ['declare' => true, 'durable' => true]); $queue = new RabbitMQQueue('my_queue', ['declare' => true, 'durable' => true]); $messageConsumer = new RabbitMQGenericMessageConsumer( function (RabbitMQIncomingMessage $message) { // Acknowledge a message $message->getDelivery()->acknowledge(); // Reject a message $requeue = true; // Reject and Requeue $message->getDelivery()->reject($requeue); }, $this, ); // A1. Set the exchange and the queue directly $messageConsumer ->setExchange($exchange) ->setQueue($queue); // OR // A2. Set the exchange and the queue through config $consumeConfig = new ConsumeConfig( [ 'queue' => [ 'name' => 'my_queue', 'declare' => true, 'durable' => true, ], 'exchange' => [ 'name' => 'test_exchange', 'declare' => true, ], ], ); $consumer->consume( $messageConsumer, $routingKey, null, $consumeConfig, );
示例
运行消费者
- 创建自定义命令
php artisan make:command MyRabbitConsumer --command "rabbitmq:my-consumer {--queue=} {--exchange=} {--routingKey=}"
- 在
app/Console/Kernel.php
中注册命令
protected $commands = [ MyRabbitConsumer::class, ];
- 通过处理器消费
<?php namespace App\Console\Commands; use Illuminate\Console\Command; use Kunnu\RabbitMQ\RabbitMQQueue; use Kunnu\RabbitMQ\RabbitMQExchange; use Kunnu\RabbitMQ\RabbitMQIncomingMessage; use Kunnu\RabbitMQ\RabbitMQGenericMessageConsumer; class MyRabbitConsumer extends Command { /** * The name and signature of the console command. * * @var string */ protected $signature = 'rabbitmq:my-consumer {--queue} {--exchange} {--routingKey}'; /** * The console command description. * * @var string */ protected $description = 'My consumer command'; /** * Create a new command instance. * * @return void */ public function __construct() { parent::__construct(); } /** * Execute the console command. * * @return mixed */ public function handle() { $rabbitMQ = app('rabbitmq'); $messageConsumer = new RabbitMQGenericMessageConsumer( function (RabbitMQIncomingMessage $message) { // Handle message $this->info($message->getStream()); // Print to console }, $this, // Scope the closure to the command ); $routingKey = $this->option('routingKey') ?? ''; $queue = new RabbitMQQueue($this->option('queue') ?? '', ['declare' => true]); $exchange = new RabbitMQExchange($this->option('exchange') ?? '', ['declare' => true]); $messageConsumer ->setExchange($exchange) ->setQueue($queue); $rabbitMQ->consumer()->consume($messageConsumer, $routingKey); } }
- 从控制台调用命令
php artisan rabbitmq:my-consumer --queue='my_queue' --exchange='test_exchange' --routingKey='key'
发布消息
- 创建路由与控制器绑定
Route::get('/publish', 'MyRabbitMQController@publish');
- 创建发布消息的控制器
class MyRabbitMQController extends Controller { public function publish(Request $request) { $rabbitMQ = app('rabbitmq'); $consumer = $rabbitMQ->consumer(); $routingKey = 'key'; // The key used by the consumer // The exchange (name) used by the consumer $exchange = new RabbitMQExchange('test_exchange', ['declare' => true]); $contents = $request->get('message', 'random message'); $message = new RabbitMQMessage($contents); $message->setExchange($exchange); $rabbitMQ->publisher()->publish( $message, $routingKey ); return ['message' => "Published {$contents}"]; } }
-
发送请求或浏览至:
http://localhost:8000/publish?message=Hello
-
在控制台查看是否打印了消息
Hello
测试
composer run-script test
致谢
许可证
MIT 许可证 (MIT)。请参阅 许可证文件 获取更多信息。