jcook/react-amqp

此包已被废弃,不再维护。没有推荐替代包。

ReactPHP 的 AMQP 绑定

dev-master 2015-03-23 00:37 UTC

This package is auto-updated.

Last update: 2023-05-17 05:45:10 UTC


README

此库不再维护,应视为已过时。

ReactAMQP

React PHP 提供基本的 AMQP 绑定。

安装

此库需要 PHP 5.4 和 PECL AMQP 扩展。最佳安装方式是通过 composer

{
	"require": {
		"jcook/react-amqp": "dev-master"
	}
}

使用方法

此库提供两个类,AMQP 消费者和生产者。这两个类都与一个周期性计时器一起工作,你需要在构造函数中提供一个计时器间隔作为参数。

消费者

消费者类允许您从 AMQP 代理接收消息,并在收到消息时调用回调。您还可以提供一次消耗的消息数量,确保您的事件循环不会无限期地从一个代理中消耗消息。您提供的回调必须接受 AMQPEnvelope 作为第一个参数,以及可选的 AMQPQueue 作为第二个。

<?php
// Connect to an AMQP broker
$cnn = new AMQPConnection();
$cnn->connect();

// Create a channel
$ch = new AMQPChannel($cnn);

// Create a new queue
$queue = new AMQPQueue($ch);
$queue->setName('queue1');
$queue->declare();

// Create an event loop
$loop = React\EventLoop\Factory::create();

// Create a consumer that will check for messages every half a second and consume up to 10 at a time.
$consumer = new JCook\ReactAMQP\Consumer($queue, $loop, 0.5, 10);
$consumer->on('consume', function(AMQPEnvelope $envelope, AMQPQueue $queue){
	//Process the message here
});
$loop->run();

生产者

生产者类允许您将消息发送到 AMQP 交换机。生产者有一个 publish 方法,其方法签名与 AMQPExchange 的 publish 方法完全相同。消息存储在生产者类中,并根据构造函数中传递的计时器间隔发送。当生产者对象被调用以发送任何队列中的消息时,将使用 AMQPExchange 对象的 publish 方法。此方法是阻塞的,这可能会对您的应用程序的性能产生影响。当消息成功发送时,会发出一个 'produce' 事件,您可以将回调绑定到该事件。这将传递一个包含所有消息参数的数组。如果抛出 AMQPExchangeException,意味着消息无法发送,则会发出一个 'error' 事件,您可以将回调绑定到该事件。这将传递 AMQPExchangeException 对象供您处理。

<?php
// Connect to an AMQP broker
$cnn = new AMQPConnection();
$cnn->connect();

// Create a channel
$ch = new AMQPChannel($cnn);

// Declare a new exchange
$ex = new AMQPExchange($ch);
$ex->setName('exchange1');
$ex->declare();

// Create an event loop
$loop = React\EventLoop\Factory::create();

// Create a producer that will send any waiting messages every half a second.
$producer = new JCook\ReactAMQP\Producer($ex, $loop, 0.5);

// Add a callback that's called every time a message is successfully sent.
$producer->on('produce', function(array $message) {
	// $message is an array containing keys 'message', 'routingKey', 'flags' and 'attributes'
});

$producer->on('error', function(AMQPExchangeException $e) {
	// Handle any exceptions here.
});

$i = 0;

$loop->addPeriodicTimer(1, function() use(&$i, $producer) {
	$i++;
	echo "Sending $i\n";
	$producer->publish($i, 'routing.key');
});

$loop->run();

限制

由于 PECL AMQP 扩展没有提供任何方法来“监听”底层 AMQP 套接字,此库必须使用周期性计时器。这不是非阻塞 I/O,因此性能不会很好。尽管如此,这些类应该足够好,直到有更好的替代方案可用。

待办事项