AMQP 封装库,用于发布和消费消息

v0.0.5 2021-11-30 11:54 UTC

This package is auto-updated.

Last update: 2024-09-29 05:56:22 UTC


README

AMQP 封装库,用于发布和消费消息,特别是从 RabbitMQ

安装

Composer

将以下内容添加到 composer.json 文件的 require 部分

$ php composer require ryaremenko/amqp

集成

Lumen

创建一个 连接

根据您的需求调整属性。

class BaseConnection extends AmqpConnection
{
    protected function setConnectionOptions(): AmqpConnectionOptions
    {
        return  (new AmqpConnectionOptions())
            ->setHost('127.0.0.1')
            ->setPort(5672)
            ->setLogin('guest')
            ->setPassword('guest');
    }
}

将连接类注册为单例

/*
|--------------------------------------------------------------------------
|  Laravel example
|--------------------------------------------------------------------------
*/

//...

$this->app->singleton(BaseConnection::class);
$this->app->bind(AMQPConnectionInterface::class, BaseConnection::class);

//...

发布消息

    (new AmqpProducer)->publish(['data'], 'queue_name');

消费消息

class AMQPHandlersService 
{
    private const HANDLERS = [
        'queue_name' => TestHandler::class
    ];
    
    private const PRIORITY_HANDLERS = [
        'queue_name'
    ];
    
    private $amqpConsumer;
    
    public function __construct(AmqpConsumer $amqpConsumer) {
        $this->amqpConsumer = $amqpConsumer;
    }
    
    public function handle(string $queueName) {
        $properties = [];
        if (in_array($queueName, self::PRIORITY_HANDLERS, true)){
            $properties['priority'] = true;
        }
    
        $handler = app(self::HANDLERS[$queueName]);
        $this->amqpConsumer->consume($queueName, function ($message) use ($handler) {
            try {
                $handler->handle($message->body);
            } catch (\Exception $exception) {
                // exception handler
            }
        },
        $properties);
    }
}