brash-creative/rabbit-queue

此包的最新版本(2.0.0)没有可用的许可证信息。

一个基本包,用于通过RabbitMQ发布和/或消费AMQP消息

2.0.0 2017-11-10 14:41 UTC

This package is auto-updated.

Last update: 2024-09-05 01:06:21 UTC


README

Build Status

基本的RabbitMQ接口

一个基本包,用于通过RabbitMQ发布和/或消费AMQP消息

首先,您需要创建您的队列类,该类扩展了Rabbit Brash\RabbitQueue\RabbitQueue类,并将队列参数设置为您的所需队列名称。

<?php
use Brash\RabbitQueue\RabbitQueue

class MyQueue extends RabbitQueue {

    protected $queue = 'EXAMPLE_QUEUE';

    public function __construct(AMQPConnection $connection)
    {
        parent::__construct($connection)
    }
    
    public function getQueue(): string 
    {
        return $this->queue;
    }
}

然后,可以使用此类将所有消息推入或拉出该队列。

使用示例 - 发布

<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use Brash\RabbitQueue\QueueException;

try {
    // AMQPConnection(host, port, username, password)
    $message    = "This is my message";
    $amqp       = new AMQPStreamConnection('http://myrabbithost', 5672, 'guest', 'guest');
    $publish    = new MyQueue($amqp);
    $publish->push($message);
} catch (QueueException $e) {
    // Catch publish errors
} catch (\Exception $e) {
    // Catch all other errors
}

消费者将从队列中检索消息并将它们传递给选择的处理器。

处理器方法包括传递可调用的方法、对象/方法对或可调用类的类路径常量,例如。

$consume->pull([$class, 'method']);
$consume->pull(function (AMQPMessage $message){
    // Some code
})
$consume->pull(ExampleConsumer::class);

在最后的例子中,ExampleConsumer类将如下所示

例如

class ExampleConsumer
{
    public function __invoke(AMPQMessage $message)
    {
        // Code
    }   
}

您也可以选择带或不带确认地拉取消息。

使用示例 - 消费(确认)

<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use Brash\RabbitQueue\QueueException;

// A class containing a method that the consumer can send the retrieved message body
try {
    $amqp           = new AMQPStreamConnection('http://myrabbithost', 5672, 'guest', 'guest');
    $consume        = new MyQueue($amqp);
    $consume->pull(ExampleConsumer::class);

    // Keep listening to the queue...
    $consume->poll();
} catch (QueueException $e) {
    // Catch consume errors
} catch (\Exception $e) {
    // Catch all other errors
}

当使用此方法时,您必须在您定义的方法中处理完消息后发送确认。

在这个例子中...

<?php
use PhpAmqpLib\Message\AMQPMessage;

class ExampleConsumer {
    public function __invoke(AMQPMessage $message)
    {
        $body = $message->body;

        // Do something with the message

        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
    }
}

或者,您可以扩展提供的AcknowledgableConsumer抽象类并调用acknowledge方法

<?php
use PhpAmqpLib\Message\AMQPMessage;
use Brash\RabbitQueue\AcknowledgableConsumer;

class ExampleConsumer extends AcknowledgableConsumer
{
    public function __invoke(AMQPMessage $message)
    {
        $body = $message->body;

        // Do something with the message

        $this->acknowledge($message);
    }
}

使用示例 - 消费(不确认)

<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use Brash\RabbitQueue\QueueException;

// A class containing a method that the consumer can send the retrieved message body
try {
    $amqp           = new AMQPStreamConnection('http://myrabbithost', 5672, 'guest', 'guest');
    $consume        = new MyQueue($amqp);
    $consume->pullNoAck(ExampleConsumer::class);

    // Keep listening to the queue...
    $consume->poll();
} catch (QueueException $e) {
    // Catch consume errors
} catch (\Exception $e) {
    // Catch all other errors
}