brash-creative / rabbit-queue
此包的最新版本(2.0.0)没有可用的许可证信息。
一个基本包,用于通过RabbitMQ发布和/或消费AMQP消息
2.0.0
2017-11-10 14:41 UTC
Requires
- php: >=7.0
- php-amqplib/php-amqplib: ~2.7
Requires (Dev)
- mockery/mockery: ~1.0
- phpunit/phpunit: ~6.4
This package is auto-updated.
Last update: 2024-09-05 01:06:21 UTC
README
基本的RabbitMQ接口
一个基本包,用于通过RabbitMQ发布和/或消费AMQP消息
首先,您需要创建您的队列类,该类扩展了Rabbit Brash\RabbitQueue\RabbitQueue类,并将队列参数设置为您的所需队列名称。
<?php use Brash\RabbitQueue\RabbitQueue class MyQueue extends RabbitQueue { protected $queue = 'EXAMPLE_QUEUE'; public function __construct(AMQPConnection $connection) { parent::__construct($connection) } public function getQueue(): string { return $this->queue; } }
然后,可以使用此类将所有消息推入或拉出该队列。
使用示例 - 发布
<?php use PhpAmqpLib\Connection\AMQPStreamConnection; use Brash\RabbitQueue\QueueException; try { // AMQPConnection(host, port, username, password) $message = "This is my message"; $amqp = new AMQPStreamConnection('http://myrabbithost', 5672, 'guest', 'guest'); $publish = new MyQueue($amqp); $publish->push($message); } catch (QueueException $e) { // Catch publish errors } catch (\Exception $e) { // Catch all other errors }
消费者将从队列中检索消息并将它们传递给选择的处理器。
处理器方法包括传递可调用的方法、对象/方法对或可调用类的类路径常量,例如。
$consume->pull([$class, 'method']); $consume->pull(function (AMQPMessage $message){ // Some code }) $consume->pull(ExampleConsumer::class);
在最后的例子中,ExampleConsumer类将如下所示
例如
class ExampleConsumer { public function __invoke(AMPQMessage $message) { // Code } }
您也可以选择带或不带确认地拉取消息。
使用示例 - 消费(确认)
<?php use PhpAmqpLib\Connection\AMQPStreamConnection; use Brash\RabbitQueue\QueueException; // A class containing a method that the consumer can send the retrieved message body try { $amqp = new AMQPStreamConnection('http://myrabbithost', 5672, 'guest', 'guest'); $consume = new MyQueue($amqp); $consume->pull(ExampleConsumer::class); // Keep listening to the queue... $consume->poll(); } catch (QueueException $e) { // Catch consume errors } catch (\Exception $e) { // Catch all other errors }
当使用此方法时,您必须在您定义的方法中处理完消息后发送确认。
在这个例子中...
<?php use PhpAmqpLib\Message\AMQPMessage; class ExampleConsumer { public function __invoke(AMQPMessage $message) { $body = $message->body; // Do something with the message $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); } }
或者,您可以扩展提供的AcknowledgableConsumer抽象类并调用acknowledge方法
<?php use PhpAmqpLib\Message\AMQPMessage; use Brash\RabbitQueue\AcknowledgableConsumer; class ExampleConsumer extends AcknowledgableConsumer { public function __invoke(AMQPMessage $message) { $body = $message->body; // Do something with the message $this->acknowledge($message); } }
使用示例 - 消费(不确认)
<?php use PhpAmqpLib\Connection\AMQPStreamConnection; use Brash\RabbitQueue\QueueException; // A class containing a method that the consumer can send the retrieved message body try { $amqp = new AMQPStreamConnection('http://myrabbithost', 5672, 'guest', 'guest'); $consume = new MyQueue($amqp); $consume->pullNoAck(ExampleConsumer::class); // Keep listening to the queue... $consume->poll(); } catch (QueueException $e) { // Catch consume errors } catch (\Exception $e) { // Catch all other errors }