edipoelwes/laravel-rabbitmq-worker

该项目是一个Laravel包,用于简化RabbitMQ工作进程的执行。它简化了工作进程的配置、管理和监控,使您能够轻松地将RabbitMQ集成到Laravel应用程序中,以高效且可扩展地处理异步任务。

v1.0.0 2024-04-29 13:50 UTC

This package is auto-updated.

Last update: 2024-09-29 14:47:21 UTC


README

基于php-amqplib库,用于在PHP中简化使用RabbitMQ的库。

安装

composer require edipoelwes/laravel-rabbitmq-worker

如何在Laravel中配置

运行发布者以生成配置文件

运行发布者命令时,它将自动在您的Laravel应用程序的config目录中创建一个名为laravel-rabbitmq-worker.php的配置文件。

php artisan vendor:publish --provider="Edipoelwes\LaravelRabbitmqWorker\CommandServiceProvider"

清除设置缓存

在根据您的环境配置RabbitMQ连接设置之前,清除Laravel配置缓存对于确保任何更改正确生效至关重要。

php artisan config:cache

然后只需根据您的环境进行配置即可。

<?php

return [
    'connections' => [
        'host' => env('RABBITMQ_HOST', 'localhost'),
        'port' => env('RABBITMQ_PORT', 5672),
        'user' => env('RABBITMQ_LOGIN', 'guest'),
        'password' => env('RABBITMQ_PASSWORD', 'guest'),
        'vhost' => env('RABBITMQ_VHOST', '/'),
    ]
];

使用示例

创建简单的发布者

<?php

use Edipoelwes\LaravelRabbitmqWorker\Services\RabbitMQ\RabbitMQService;

$rabbitMQService = new RabbitMQService(
    'queue',       // Queue
    'route-key',   // Routing key
    '',            // Exchange
    '',            // Exchange Type
    '',            // Consumer Tag
    false,         // Passive
    true,          // Durable
    false,         // Exclusive
    false          // Auto delete
);

// Prepare response payload
$payload = "your message";

$rabbitMQService->publish($payload);
$rabbitMQService->destruct(); // Clean up resources

创建消费者

<?php

use Edipoelwes\LaravelRabbitmqWorker\Services\RabbitMQ\RabbitMQService;

$rabbitMQService = new RabbitMQService(
    'queue',       // Queue
    'route-key',   // Routing key
    '',            // Exchange
    '',            // Exchange Type
    '',            // Consumer Tag
    false,         // Passive
    true,          // Durable
    false,         // Exclusive
    false          // Auto delete
);

$callback = function ($msg) {
    //  $msg->body
    // your code here
    
    $msg->ack();
}

$rabbitMQService->consume($callback);
$rabbitMQService->destruct(); // Clean up resources

将消费者创建为Laravel命令

<?php

namespace App\Console\Commands;

use Edipoelwes\LaravelRabbitmqWorker\Services\RabbitMQ\RabbitMQService;
use Illuminate\Console\Command;

class Consume extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'your-command';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Command description';

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return int
     */
    public function handle()
    {
        $rabbitMQService = new RabbitMQService(
            'queue',     // Queue
            'route-key', // Routing key
            '',          // Exchange
            '',          // Exchange Type
            '',          // Consumer Tag
            false,       // Passive
            true,        // Durable
            false,       // Exclusive
            false        // Auto delete
        );

        $callback = function ($msg) {
            //  $msg->body
            // your code here
            
            $msg->ack();
        }
        
        $rabbitMQService->consume($callback);
        $rabbitMQService->destruct(); // Clean up resources
    }
}

远程过程调用(RPC)

RPC配置

对于使用RPC进行发布和消费,请确保RabbitMQ类配置了'durable = false'和'auto_delete = true'设置。

在设置远程过程调用(RPC)功能时,正确配置RabbitMQ至关重要,以确保通信的无缝性。为了获得最佳性能和资源管理,建议将RabbitMQ设置调整为'durable = false'和'auto_delete = true'。

创建RPC消费者

<?php

use Edipoelwes\LaravelRabbitmqWorker\Services\RabbitMQ\RabbitMQService;

$rabbitMQService = new RabbitMQService(
    'rpc_queue',       // Queue
    'rpc_queue',       // Routing key
    '',                // Exchange
    '',                // Exchange Type
    'rpc_queue',       // Consumer Tag
    false,             // Passive
    false,             // Durable
    false,             // Exclusive
    true               // Auto delete
);

// Define RPC Consumer Callback
$rabbitMQService->consume(function ($req) {
    
    // Your code here
    
    // Prepare response payload
    $payload = "your message";

    // Publish response to the specified channel and correlation ID
    $msg = new AMQPMessage(
        $payload,
        array('correlation_id' => $req->get('correlation_id'))
    );

    $req->getChannel()->basic_publish(
        $msg,
        '',
        $req->get('reply_to')
    );

    $req->ack(); // Acknowledge message processing
});

$rabbitMQService->destruct(); // Clean up resources

创建RPC发布者

<?php

// Configure RabbitMQ for RPC Publisher
$rabbitMQService = new RabbitMQService(
    'rpc_queue',       // Queue
    'rpc_queue',       // Routing key
    '',                // Exchange
    '',                // Exchange Type
    'rpc_queue',       // Consumer Tag
    false,             // Passive
    false,             // Durable
    false,             // Exclusive
    true               // Auto delete
);

// Prepare response payload
$payload = "your message";

$response = $rabbitMQService->publishRpc($payload);

// your code here

$rabbitMQService->destruct(); // Clean up resources