kunalvarma05/laravel-rabbitmq

在 Laravel 中与 RabbitMQ 一起工作。

1.5.0 2023-12-11 05:48 UTC

README

Latest Version on Packagist Travis (.com) Coveralls github StyleCI Quality Score Total Downloads

一个易于使用的 Laravel 扩展包,用于处理 RabbitMQ。

功能

  • 生产者
  • 消费者
  • 发布/订阅
  • 交换机
    • 默认
    • 直接
    • 主题
    • 扇出

需求

  • PHP 7.4+
  • Laravel 6.0+

设置

1. 安装

composer require kunalvarma05/laravel-rabbitmq

2. 默认配置

php artisan vendor:publish --tag=config

快速开始

初始化

有几种初始化库的方式

// A. Direct instantiation
$rabbitMQ = new RabbitMQManager(app());

// B. Binding
$rabbitMQ = app('rabbitmq');

// C. Dependency injection (Controller, Command, Job, etc.)
public function __consturct(RabbitMQManager $rabbitMQ) { ... }

// D. Facade
// All the public methods of the `RabbitMQManager` class
// are available through the `RabbitMQ` facade.
RabbitMQ::getConnections();

发布

$message = new RabbitMQMessage('message body');

// Publish to the default exchange/topic/queue
$rabbitMQ->publisher()->publish($message);

// Publish bulk messages
$messages = [new RabbitMQMessage('message 1'), new RabbitMQMessage('message 2')];
$rabbitMQ->publisher()->publish($messages);

消费

// A. Consume through a closure
$handler = new RabbitMQGenericMessageConsume(function (RabbitMQIncomingMessage $message) {
  $content = $message->getStream();
});

// B. Consume through a class
class MyMessageConsumer extends RabbitMQMessageConsumer {
  public function handle(RabbitMQIncomingMessage $message) {
    $content = $message->getStream();
  }
}
$handler = new MyMessageConsumer();

// Starts a blocking loop `while (true)`
$rabbitMQ->consumer()->consume($handler);

交互

// Resolve the default connection
// @see: AMQPSSLConnection https://github.com/php-amqplib/php-amqplib/blob/master/PhpAmqpLib/Connection/AMQPSSLConnection.php
$amqpConnection = $rabbitMQ->resolveConnection();

// Resolve the default channel
// @see: AMQPChannel https://github.com/php-amqplib/php-amqplib/blob/master/PhpAmqpLib/Channel/AMQPChannel.php
$amqpChannel = $rabbitMQ->resolveChannel();

配置

连接配置

$connectionName = 'custom_connection'; // Set to `null` for default connection
// Override the default connection config
$connectionConfig = new ConnectionConfig(['username' => 'quest', 'password' => 'quest']);
$connectionConfig->setHost('localhost');
$customConnection = $rabbitMQ->resolveConnection($connectionName, $connectionConfig);

消息配置

$config = [
  'content_encoding' => 'UTF-8',
  'content_type'     => 'text/plain',
  'delivery_mode'    => AMQPMessage::DELIVERY_MODE_PERSISTENT,
];
$message = new RabbitMQMessage('message body', $config);

// Set message exchange
$exchangeConfig = ['type' => AMQPExchangeType::DIRECT];
$exchange = new RabbitMQExchange('my_exchange', $exchangeConfig);
$message->setExchange($exchange);

发布配置

$publisher = $rabbitMQ->publisher();

$message = new RabbitMQMessage('message body');

$exchangeConfig = ['type' => AMQPExchangeType::TOPIC];
$exchange = new RabbitMQExchange('my_exchange', $exchangeConfig);

$message->setExchange($exchange);

$routingKey = 'key'; // Can be an empty string, but not null
$connectionName = 'custom_connection'; // Set to null for default connection

// The publish config allows you to any override default configuration
//
// The following precendence works for the configuration:
// Message exchange config > Publish config > Connection config > Default config
//
// In this case, the exchange type used would be AMQPExchangeType::TOPIC
$publishConfig = new PublishConfig(['exchange' => ['type' => AMQPExchangeType::FANOUT]]);

$publisher->publish($message, $routingKey, $connectionName, $publishConfig);

消费者配置

$consumer = $rabbitMQ->consumer();
$routingKey = 'key';

$exchange = new RabbitMQExchange('test_exchange', ['declare' => true, 'durable' => true]);
$queue = new RabbitMQQueue('my_queue', ['declare' => true, 'durable' => true]);

$messageConsumer = new RabbitMQGenericMessageConsumer(
    function (RabbitMQIncomingMessage $message) {
      // Acknowledge a message
      $message->getDelivery()->acknowledge();
      // Reject a message
      $requeue = true; // Reject and Requeue
      $message->getDelivery()->reject($requeue);
    },
    $this,
);

// A1. Set the exchange and the queue directly
$messageConsumer
    ->setExchange($exchange)
    ->setQueue($queue);

// OR

// A2. Set the exchange and the queue through config
$consumeConfig = new ConsumeConfig(
  [
    'queue' => [
      'name' => 'my_queue',
      'declare' => true,
      'durable' => true,
  ],
  'exchange' => [
      'name' => 'test_exchange',
      'declare' => true,
    ],
  ],
);

$consumer->consume(
    $messageConsumer,
    $routingKey,
    null,
    $consumeConfig,
);

示例

运行消费者

  • 创建自定义命令
php artisan make:command MyRabbitConsumer --command "rabbitmq:my-consumer {--queue=} {--exchange=} {--routingKey=}"
  • app/Console/Kernel.php 中注册命令
protected $commands = [
  MyRabbitConsumer::class,
];
  • 通过处理器消费
<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Kunnu\RabbitMQ\RabbitMQQueue;
use Kunnu\RabbitMQ\RabbitMQExchange;
use Kunnu\RabbitMQ\RabbitMQIncomingMessage;
use Kunnu\RabbitMQ\RabbitMQGenericMessageConsumer;

class MyRabbitConsumer extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'rabbitmq:my-consumer {--queue} {--exchange} {--routingKey}';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'My consumer command';

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    {
        $rabbitMQ = app('rabbitmq');
        $messageConsumer = new RabbitMQGenericMessageConsumer(
            function (RabbitMQIncomingMessage $message) {
                // Handle message
                $this->info($message->getStream()); // Print to console
            },
            $this, // Scope the closure to the command
        );

        $routingKey = $this->option('routingKey') ?? '';
        $queue = new RabbitMQQueue($this->option('queue') ?? '', ['declare' => true]);
        $exchange = new RabbitMQExchange($this->option('exchange') ?? '', ['declare' => true]);

        $messageConsumer
            ->setExchange($exchange)
            ->setQueue($queue);

        $rabbitMQ->consumer()->consume($messageConsumer, $routingKey);
    }
}
  • 从控制台调用命令
php artisan rabbitmq:my-consumer --queue='my_queue' --exchange='test_exchange' --routingKey='key'

发布消息

  • 创建路由与控制器绑定
Route::get('/publish', 'MyRabbitMQController@publish');
  • 创建发布消息的控制器
class MyRabbitMQController extends Controller {

  public function publish(Request $request)
  {
    $rabbitMQ = app('rabbitmq');
    $consumer = $rabbitMQ->consumer();
    $routingKey = 'key'; // The key used by the consumer

    // The exchange (name) used by the consumer
    $exchange = new RabbitMQExchange('test_exchange', ['declare' => true]);

    $contents = $request->get('message', 'random message');

    $message = new RabbitMQMessage($contents);
    $message->setExchange($exchange);

    $rabbitMQ->publisher()->publish(
        $message,
        $routingKey
    );

    return ['message' => "Published {$contents}"];
  }
}
  • 发送请求或浏览至: http://localhost:8000/publish?message=Hello

  • 在控制台查看是否打印了消息 Hello

测试

composer run-script test

致谢

许可证

MIT 许可证 (MIT)。请参阅 许可证文件 获取更多信息。