tahseen9/rabbit-queue

一个用于以简单方式分发和监听 Rabbit MQ 队列的 Laravel 扩展包!

1.0.0 2023-12-08 07:37 UTC

This package is not auto-updated.

Last update: 2024-09-30 09:37:19 UTC


README

Latest Version on Packagist

一个简单优雅的 Laravel 包装器,围绕 php-amqplib 为分发到和监听 rabbitmq 队列。

避免高连接切换

使用AMQProxy,这是一个具有连接和通道池/重用的代理库。这在使用 php-amqplib 时可以降低连接和通道切换,从而减少 RabbitMQ 的 CPU 使用。

安装

通过 Composer 依赖

composer require tahseen9/rabbit-queue

发布资产(配置文件)

php artisan vendor:publish --provider="Tahseen9\RabbitQueue\RabbitQueueServiceProvider"

使用方法

分发队列

通过依赖注入

use Tahseen9\RabbitQueue\Contracts\T9RMQProducerInterface;

class MyProducer {
    
    private string $exchange = 'my_exchange';

    private string $queue = 'my_queue';
    
    private T9RMQConsumerInterface $producer;
    
    public function __construct(T9RMQProducerInterface $producer){
        # you can define the exchange and queue here
        $this->producer = $producer->setExchange($this->producer)
                                   ->onQueue($this->queue); 
       # if you are running multiple queues,
       # pass the queue name in second argument of dispatch method like shown in the facade section
    }
    
    public function fooProducer() {
        
        $this-producer->dispatch([
          'lang' => 'php',
          'framework' => 'laravel',
        ]); # return type void
    
    } # fooProducer end
} # class end

通过外观

use Tahseen9\RabbitQueue\Facades\RabbitQueue;
...

RabbitQueue::dispatch(
    [
      'lang' => 'php',
      'framework' => 'laravel',
    ], # Message Array
    $queue_name = "my_queue" # Queue Name
 ); # return type void

监听队列

通过依赖注入

use Tahseen9\RabbitQueue\Contracts\T9RMQConsumerInterface;

class MyListener {
    
    private string $exchange = 'my_exchange';

    private string $queue = 'my_queue';
    
    private T9RMQConsumerInterface $consumer;
    
    public function __construct(T9RMQConsumerInterface $consumer){
        
        # you can define the exchange and queue explicitly here
        $this->consumer = $consumer->setExchange($this->exchange)
                                   ->onQueue($this->queue); 
       # if you are running multiple queues,
       # pass the queue name in second argument of listen method like shown in the facade section
    
    }
    
    public function barListener() {
        
        $this-consumer->jsonArray(true) # want json array, set true, default null : used json_encode() inside
        ->listen(function($message, $handler){
        
            echo $message['lang']; # php
            echo $message['framework']; # laravel
            
            $handler->ack(); # acknowledge message after using it, so it will be removed from the queue
            
    #        $handler->stopWhenProcessed(); # use this if you want to stop execution after listening the whole queue    
    
          }); # return type void
          
    } # barListener end
} # class end

通过外观

use Tahseen9\RabbitQueue\Facades\RabbitQueue;
...

RabbitQueue::listen(function($message, $handler){
    
    echo $message->lang; # php
    echo $message->framework; # laravel
    
    $handler->ack(); # acknowledge message after using it, so it will be removed from the queue
    
#    $handler->stopWhenProcessed(); # use this if you want to stop execution after listening the whole queue    

}, $queue_name = "my_queue"); # return type void

通过类实例使用

 $rabbitQueue = new \Tahseen9\RabbitQueue\RabbitQueue();
 
 #available methods:
 $rabbitQueue->producer(); # returns producer instance
 $rabbitQueue->consumer(); # returns consumer instance
 
 # by default queue_name is nullable you can skip this param if set via method e.g.
 # $rabbitQueue->producer()->onQueue('my_queue')->dispatch($msg);
 # $rabbitQueue->consumer()->onQueue('my_queue')->listen(Closure $closure);
 
 $rabbitQueue->dispatch(array $message, string $queueName); # dispatch queue directly on default exchange
 $rabbitQueue->listen(Closure $closure, string $queueName); # start listening instantly on default exchange

配置文件(rabbit-queue.php)

#publish file with this command:
php artisan vendor:publish --provider="Tahseen9\RabbitQueue\RabbitQueueServiceProvider"

<?php

return [
    # Connection - Set these in your env
    "host" => env("RABBITMQ_HOST", "localhost"),
    "port" => env("RABBITMQ_PORT", "5672"),
    "username" => env("RABBITMQ_USERNAME", "guest"),
    "password" => env("RABBITMQ_PASSWORD", "guest"),

    # define exchange name or use method to define if dealing with multiple exchanges
    "exchange" => env("EXCHANGE_NAME", env("APP_NAME", "LARAVEL_RABBIT_EXCHANGE")),
    "exchange_type" => env("EXCHANGE_TYPE", "direct"), # this option is only available via env for now
    "exchange_passive" => false,
    "exchange_durable" => true, # persistent exchange
    "exchange_auto_delete" => false,

    "routing_key_postfix" => env("ROUTING_KEY_POSTFIX", "_key"),
    "consumer_tag_post_fix" => env("ROUTING_KEY_POSTFIX", "_tag"),

    "qos" => true, # this will apply prefetch count and prefetch size

    # These will work if qos is true
    "qos_prefetch_size" => 0, # unlimited multiple of prefetch count
    "qos_prefetch_count" => 1, # process 1 job by 1 worker at a time, increasing this number will pre load x amount of jobs in memory for worker
    "qos_a_global" => false,

    # Queue Declaration
    "queue_passive" => false,
    "queue_durable" => true, # persistent queue
    "queue_exclusive" => false,
    "queue_auto_delete" => false,

    # Consumer declaration
    "consumer_no_local" => false,
    "consumer_no_ack" => false, # default: must acknowledge else true
    "consumer_exclusive" => false,
    "consumer_no_wait" => false,

    "message_delivery_mode" => 2 // DELIVERY MODE PERSISTENT = 2 | DELIVERY MODE NON PERSISTENT = 1
];

变更日志

版本 1.0.0 发布

安全

如果您发现任何安全相关的问题,请通过tehzu9@hotmail.com发送电子邮件,而不是使用问题跟踪器。

致谢

  • AMQProxy 团队
  • php-amqplib 团队

许可证

MIT。请参阅许可证文件以获取更多信息。