wlgns5376/laravel-amqp

laravel amqp 包

0.1.0 2021-08-19 02:04 UTC

This package is auto-updated.

Last update: 2024-09-19 09:17:27 UTC


README

这是一个用于在Laravel中使用RabbitMQ进行发布/订阅的包。

支持

  • Laravel 8

安装

安装包

composer require wlgns5376/laravel-amqp

配置发布

php artisan vendor:publish --tag=amqp

Docker RabbitMQ

docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 rabbitmq:3

配置

环境

.env

AMQP_HOST=localhost
AMQP_PORT=5672
AMQP_USER=guest
AMQP_PASSWORD=guest
AMQP_VHOST=/

配置

config/amqp.php

<?php
return [
    'host'     => env('AMQP_HOST', 'localhost'),
    'port'     => env('AMQP_PORT', 5672),
    'user'     => env('AMQP_USER', 'guest'),
    'password' => env('AMQP_PASSWORD', 'guest'),
    'vhost'    => env('AMQP_VHOST', '/'),
    'options'  => [
        'queue'         => '',
        'exchange'      => '',
        'exchange_type' => 'direct',
        'consumer_tag'  => '',
        'passive'       => false,
        'durable'       => false,
        'auto_delete'   => true,
        'exclusive'     => false,
        'binding_key'   => [],
    ],
];

主题

示例参考: RabbitMQ Topic - PHP

config/amqp.php

<?php
return [
    ...
    'options'  => [
        'queue'         => '',
        'exchange'      => 'topic_logs',
        'exchange_type' => 'topic',
        ...
        'binding_key'   => [
            'kern.*',
            '*.critical'
        ],
    ],
];

使用Job传递消息

app/Jobs/SampleJob.php

<?php

namespace App\Jobs;

...
use Wlgns5376\LaravelAmqp\Publisher;

class SampleJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct()
    {
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle(Publisher $publisher)
    {
        $publisher->publish('A critical kernel error', 'kern.critical');
    }
}

选项扩展

$publisher->publish('A critical kernel error', 'kern.critical', [
    'exchange' => 'other_topic_logs',
    'durable' => true,
]);

通过命令接收消息

app/Console/Commands/ConsumeCommand.php

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Wlgns5376\LaravelAmqp\Consumer;

class ConsumeCommand extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'amqp:consume';

    /**
     * Execute the console command.
     * 
     * @param Wlgns5376\LaravelAmqp\Consumer $consumer
     *
     * @return int
     */
    public function handle(Consumer $consumer)
    {
        $consumer->consume(function($message) {
            echo ' [x] ', $message->delivery_info['routing_key'], ':', $message->body, "\n";
        });

        return 0;
    }
}

选项扩展

$consumer->consume(function($message) {
    echo ' [x] ', $message->delivery_info['routing_key'], ':', $message->body, "\n";
}, [
    'exchange' => 'other_topic_logs',
    'durable' => true,
]);