jcook / react-amqp
ReactPHP 的 AMQP 绑定
Requires
- php: >=5.4.0
- ext-amqp: *
- evenement/evenement: 1.0.*
- react/event-loop: 0.3.*
This package is auto-updated.
Last update: 2023-05-17 05:45:10 UTC
README
此库不再维护,应视为已过时。
ReactAMQP
为 React PHP 提供基本的 AMQP 绑定。
安装
此库需要 PHP 5.4 和 PECL AMQP 扩展。最佳安装方式是通过 composer。
{ "require": { "jcook/react-amqp": "dev-master" } }
使用方法
此库提供两个类,AMQP 消费者和生产者。这两个类都与一个周期性计时器一起工作,你需要在构造函数中提供一个计时器间隔作为参数。
消费者
消费者类允许您从 AMQP 代理接收消息,并在收到消息时调用回调。您还可以提供一次消耗的消息数量,确保您的事件循环不会无限期地从一个代理中消耗消息。您提供的回调必须接受 AMQPEnvelope 作为第一个参数,以及可选的 AMQPQueue 作为第二个。
<?php // Connect to an AMQP broker $cnn = new AMQPConnection(); $cnn->connect(); // Create a channel $ch = new AMQPChannel($cnn); // Create a new queue $queue = new AMQPQueue($ch); $queue->setName('queue1'); $queue->declare(); // Create an event loop $loop = React\EventLoop\Factory::create(); // Create a consumer that will check for messages every half a second and consume up to 10 at a time. $consumer = new JCook\ReactAMQP\Consumer($queue, $loop, 0.5, 10); $consumer->on('consume', function(AMQPEnvelope $envelope, AMQPQueue $queue){ //Process the message here }); $loop->run();
生产者
生产者类允许您将消息发送到 AMQP 交换机。生产者有一个 publish 方法,其方法签名与 AMQPExchange 的 publish 方法完全相同。消息存储在生产者类中,并根据构造函数中传递的计时器间隔发送。当生产者对象被调用以发送任何队列中的消息时,将使用 AMQPExchange 对象的 publish 方法。此方法是阻塞的,这可能会对您的应用程序的性能产生影响。当消息成功发送时,会发出一个 'produce' 事件,您可以将回调绑定到该事件。这将传递一个包含所有消息参数的数组。如果抛出 AMQPExchangeException,意味着消息无法发送,则会发出一个 'error' 事件,您可以将回调绑定到该事件。这将传递 AMQPExchangeException 对象供您处理。
<?php // Connect to an AMQP broker $cnn = new AMQPConnection(); $cnn->connect(); // Create a channel $ch = new AMQPChannel($cnn); // Declare a new exchange $ex = new AMQPExchange($ch); $ex->setName('exchange1'); $ex->declare(); // Create an event loop $loop = React\EventLoop\Factory::create(); // Create a producer that will send any waiting messages every half a second. $producer = new JCook\ReactAMQP\Producer($ex, $loop, 0.5); // Add a callback that's called every time a message is successfully sent. $producer->on('produce', function(array $message) { // $message is an array containing keys 'message', 'routingKey', 'flags' and 'attributes' }); $producer->on('error', function(AMQPExchangeException $e) { // Handle any exceptions here. }); $i = 0; $loop->addPeriodicTimer(1, function() use(&$i, $producer) { $i++; echo "Sending $i\n"; $producer->publish($i, 'routing.key'); }); $loop->run();
限制
由于 PECL AMQP 扩展没有提供任何方法来“监听”底层 AMQP 套接字,此库必须使用周期性计时器。这不是非阻塞 I/O,因此性能不会很好。尽管如此,这些类应该足够好,直到有更好的替代方案可用。
待办事项
- 添加对 php-amqplib 的支持。