tahseen9 / rabbit-queue
一个用于以简单方式分发和监听 Rabbit MQ 队列的 Laravel 扩展包!
1.0.0
2023-12-08 07:37 UTC
Requires
- php: >=7.2
- php-amqplib/php-amqplib: ^3.6
Requires (Dev)
- orchestra/testbench: ~7
- phpunit/phpunit: ~9.0
This package is not auto-updated.
Last update: 2024-09-30 09:37:19 UTC
README
一个简单优雅的 Laravel 包装器,围绕 php-amqplib 为分发到和监听 rabbitmq 队列。
避免高连接切换
使用AMQProxy,这是一个具有连接和通道池/重用的代理库。这在使用 php-amqplib 时可以降低连接和通道切换,从而减少 RabbitMQ 的 CPU 使用。
安装
通过 Composer 依赖
composer require tahseen9/rabbit-queue
发布资产(配置文件)
php artisan vendor:publish --provider="Tahseen9\RabbitQueue\RabbitQueueServiceProvider"
使用方法
分发队列
通过依赖注入
use Tahseen9\RabbitQueue\Contracts\T9RMQProducerInterface; class MyProducer { private string $exchange = 'my_exchange'; private string $queue = 'my_queue'; private T9RMQConsumerInterface $producer; public function __construct(T9RMQProducerInterface $producer){ # you can define the exchange and queue here $this->producer = $producer->setExchange($this->producer) ->onQueue($this->queue); # if you are running multiple queues, # pass the queue name in second argument of dispatch method like shown in the facade section } public function fooProducer() { $this-producer->dispatch([ 'lang' => 'php', 'framework' => 'laravel', ]); # return type void } # fooProducer end } # class end
通过外观
use Tahseen9\RabbitQueue\Facades\RabbitQueue; ... RabbitQueue::dispatch( [ 'lang' => 'php', 'framework' => 'laravel', ], # Message Array $queue_name = "my_queue" # Queue Name ); # return type void
监听队列
通过依赖注入
use Tahseen9\RabbitQueue\Contracts\T9RMQConsumerInterface; class MyListener { private string $exchange = 'my_exchange'; private string $queue = 'my_queue'; private T9RMQConsumerInterface $consumer; public function __construct(T9RMQConsumerInterface $consumer){ # you can define the exchange and queue explicitly here $this->consumer = $consumer->setExchange($this->exchange) ->onQueue($this->queue); # if you are running multiple queues, # pass the queue name in second argument of listen method like shown in the facade section } public function barListener() { $this-consumer->jsonArray(true) # want json array, set true, default null : used json_encode() inside ->listen(function($message, $handler){ echo $message['lang']; # php echo $message['framework']; # laravel $handler->ack(); # acknowledge message after using it, so it will be removed from the queue # $handler->stopWhenProcessed(); # use this if you want to stop execution after listening the whole queue }); # return type void } # barListener end } # class end
通过外观
use Tahseen9\RabbitQueue\Facades\RabbitQueue; ... RabbitQueue::listen(function($message, $handler){ echo $message->lang; # php echo $message->framework; # laravel $handler->ack(); # acknowledge message after using it, so it will be removed from the queue # $handler->stopWhenProcessed(); # use this if you want to stop execution after listening the whole queue }, $queue_name = "my_queue"); # return type void
通过类实例使用
$rabbitQueue = new \Tahseen9\RabbitQueue\RabbitQueue(); #available methods: $rabbitQueue->producer(); # returns producer instance $rabbitQueue->consumer(); # returns consumer instance # by default queue_name is nullable you can skip this param if set via method e.g. # $rabbitQueue->producer()->onQueue('my_queue')->dispatch($msg); # $rabbitQueue->consumer()->onQueue('my_queue')->listen(Closure $closure); $rabbitQueue->dispatch(array $message, string $queueName); # dispatch queue directly on default exchange $rabbitQueue->listen(Closure $closure, string $queueName); # start listening instantly on default exchange
配置文件(rabbit-queue.php)
#publish file with this command: php artisan vendor:publish --provider="Tahseen9\RabbitQueue\RabbitQueueServiceProvider" <?php return [ # Connection - Set these in your env "host" => env("RABBITMQ_HOST", "localhost"), "port" => env("RABBITMQ_PORT", "5672"), "username" => env("RABBITMQ_USERNAME", "guest"), "password" => env("RABBITMQ_PASSWORD", "guest"), # define exchange name or use method to define if dealing with multiple exchanges "exchange" => env("EXCHANGE_NAME", env("APP_NAME", "LARAVEL_RABBIT_EXCHANGE")), "exchange_type" => env("EXCHANGE_TYPE", "direct"), # this option is only available via env for now "exchange_passive" => false, "exchange_durable" => true, # persistent exchange "exchange_auto_delete" => false, "routing_key_postfix" => env("ROUTING_KEY_POSTFIX", "_key"), "consumer_tag_post_fix" => env("ROUTING_KEY_POSTFIX", "_tag"), "qos" => true, # this will apply prefetch count and prefetch size # These will work if qos is true "qos_prefetch_size" => 0, # unlimited multiple of prefetch count "qos_prefetch_count" => 1, # process 1 job by 1 worker at a time, increasing this number will pre load x amount of jobs in memory for worker "qos_a_global" => false, # Queue Declaration "queue_passive" => false, "queue_durable" => true, # persistent queue "queue_exclusive" => false, "queue_auto_delete" => false, # Consumer declaration "consumer_no_local" => false, "consumer_no_ack" => false, # default: must acknowledge else true "consumer_exclusive" => false, "consumer_no_wait" => false, "message_delivery_mode" => 2 // DELIVERY MODE PERSISTENT = 2 | DELIVERY MODE NON PERSISTENT = 1 ];
变更日志
版本 1.0.0 发布
安全
如果您发现任何安全相关的问题,请通过tehzu9@hotmail.com发送电子邮件,而不是使用问题跟踪器。
致谢
- AMQProxy 团队
- php-amqplib 团队
许可证
MIT。请参阅许可证文件以获取更多信息。