gos / react-amqp
ReactPHP 的 AMQP 绑定
v0.3.0
2019-08-26 18:17 UTC
Requires
- php: >=7.2
- ext-amqp: *
- evenement/evenement: ^3.0
- react/event-loop: ^1.0
Requires (Dev)
- friendsofphp/php-cs-fixer: ^2.15.1
- phpunit/phpunit: ^8.1
README
为 React PHP 提供基本的 AMQP 绑定。
安装
此库需要 PHP >=7.1 和 PECL AMQP 扩展。通过 composer 安装此库是最佳方式。
请使用 1.x 版本用于 PHP-5.x。
composer require gos/react-amqp
使用方法
此库提供两个类,一个 AMQP 消费者和一个生产者。这两个类都与一个周期性定时器一起工作,您需要将定时器间隔作为参数传递给构造函数。
消费者
消费者类允许您从 AMQP 代理接收消息,并在收到消息时调用回调。您还可以提供一次要消费的消息数量,确保您的事件循环不会持续地从代理中消费消息。您提供的回调必须接受 AMQPEnvelope 作为第一个参数,以及可选的 AMQPQueue 作为第二个。
<?php // Connect to an AMQP broker $cnn = new AMQPConnection(); $cnn->connect(); // Create a channel $ch = new AMQPChannel($cnn); // Create a new queue $queue = new AMQPQueue($ch); $queue->setName('queue1'); $queue->declare(); // Create an event loop $loop = React\EventLoop\Factory::create(); // Create a consumer that will check for messages every half a second and consume up to 10 at a time. $consumer = new Gos\Component\ReactAMQP\Consumer($queue, $loop, 0.5, 10); $consumer->on('consume', function(AMQPEnvelope $envelope, AMQPQueue $queue){ //Process the message here }); $loop->run();
生产者
生产者类允许您向 AMQP 交换发送消息。生产者有一个与 AMQPExchange 的 publish 方法具有完全相同签名的方法。消息存储在生产者类中,并根据传递给构造函数的定时器间隔发送。当调用生产者对象发送任何队列中的消息时,将使用 AMQPExchange 对象的 publish 方法。此方法是阻塞的,这可能会成为您应用程序的性能问题。当消息成功发送时,将发出一个 'produce' 事件,您可以将回调绑定到该事件。这将传递一个包含所有消息参数的数组。如果抛出 AMQPExchangeException,表示消息无法发送,则发出一个 'error' 事件,您可以将回调绑定到该事件。这将传递 AMQPExchangeException 对象供您处理。
<?php // Connect to an AMQP broker $cnn = new AMQPConnection(); $cnn->connect(); // Create a channel $ch = new AMQPChannel($cnn); // Declare a new exchange $ex = new AMQPExchange($ch); $ex->setName('exchange1'); $ex->declare(); // Create an event loop $loop = React\EventLoop\Factory::create(); // Create a producer that will send any waiting messages every half a second. $producer = new Gos\Component\ReactAMQP\Producer($ex, $loop, 0.5); // Add a callback that's called every time a message is successfully sent. $producer->on('produce', function(array $message) { // $message is an array containing keys 'message', 'routingKey', 'flags' and 'attributes' }); $producer->on('error', function(AMQPExchangeException $e) { // Handle any exceptions here. }); $i = 0; $loop->addPeriodicTimer(1, function() use(&$i, $producer) { $i++; echo "Sending $i\n"; $producer->publish($i, 'routing.key'); }); $loop->run();