gos/react-amqp

ReactPHP 的 AMQP 绑定

v0.3.0 2019-08-26 18:17 UTC

This package is auto-updated.

Last update: 2024-08-27 05:15:44 UTC


README

Build Status

React PHP 提供基本的 AMQP 绑定。

安装

此库需要 PHP >=7.1 和 PECL AMQP 扩展。通过 composer 安装此库是最佳方式。

请使用 1.x 版本用于 PHP-5.x。

composer require gos/react-amqp

使用方法

此库提供两个类,一个 AMQP 消费者和一个生产者。这两个类都与一个周期性定时器一起工作,您需要将定时器间隔作为参数传递给构造函数。

消费者

消费者类允许您从 AMQP 代理接收消息,并在收到消息时调用回调。您还可以提供一次要消费的消息数量,确保您的事件循环不会持续地从代理中消费消息。您提供的回调必须接受 AMQPEnvelope 作为第一个参数,以及可选的 AMQPQueue 作为第二个。

<?php
// Connect to an AMQP broker
$cnn = new AMQPConnection();
$cnn->connect();

// Create a channel
$ch = new AMQPChannel($cnn);

// Create a new queue
$queue = new AMQPQueue($ch);
$queue->setName('queue1');
$queue->declare();

// Create an event loop
$loop = React\EventLoop\Factory::create();

// Create a consumer that will check for messages every half a second and consume up to 10 at a time.
$consumer = new Gos\Component\ReactAMQP\Consumer($queue, $loop, 0.5, 10);
$consumer->on('consume', function(AMQPEnvelope $envelope, AMQPQueue $queue){
	//Process the message here
});
$loop->run();

生产者

生产者类允许您向 AMQP 交换发送消息。生产者有一个与 AMQPExchange 的 publish 方法具有完全相同签名的方法。消息存储在生产者类中,并根据传递给构造函数的定时器间隔发送。当调用生产者对象发送任何队列中的消息时,将使用 AMQPExchange 对象的 publish 方法。此方法是阻塞的,这可能会成为您应用程序的性能问题。当消息成功发送时,将发出一个 'produce' 事件,您可以将回调绑定到该事件。这将传递一个包含所有消息参数的数组。如果抛出 AMQPExchangeException,表示消息无法发送,则发出一个 'error' 事件,您可以将回调绑定到该事件。这将传递 AMQPExchangeException 对象供您处理。

<?php
// Connect to an AMQP broker
$cnn = new AMQPConnection();
$cnn->connect();

// Create a channel
$ch = new AMQPChannel($cnn);

// Declare a new exchange
$ex = new AMQPExchange($ch);
$ex->setName('exchange1');
$ex->declare();

// Create an event loop
$loop = React\EventLoop\Factory::create();

// Create a producer that will send any waiting messages every half a second.
$producer = new Gos\Component\ReactAMQP\Producer($ex, $loop, 0.5);

// Add a callback that's called every time a message is successfully sent.
$producer->on('produce', function(array $message) {
	// $message is an array containing keys 'message', 'routingKey', 'flags' and 'attributes'
});

$producer->on('error', function(AMQPExchangeException $e) {
	// Handle any exceptions here.
});

$i = 0;

$loop->addPeriodicTimer(1, function() use(&$i, $producer) {
	$i++;
	echo "Sending $i\n";
	$producer->publish($i, 'routing.key');
});

$loop->run();