mouf / oo-amqp-client
在php-amqplib之上提供面向对象包装的对象,帮助以面向对象的方式处理RabbitMQ。
Requires
- php: >=7.0
- mouf/utils.log.psr.errorlog_logger: ^2
- php-amqplib/php-amqplib: ^2.6.3
- psr/log: ^1
Requires (Dev)
- fabpot/php-cs-fixer: ^1.9.3
- phpunit/phpunit: ^6
- satooshi/php-coveralls: ~1.0
- symfony/console: ^3.0
Suggests
- symfony/console: To use the 'amqp:publish' CLI command.
README
关于面向对象AMQP客户端
此包包含一个在php-amqplib之上提供面向对象包装的对象,帮助以面向对象的方式处理RabbitMQ。
使用此包,交换、绑定和队列都表示为对象。这对于您想要将对象注入依赖注入容器的情况非常有用。
安装
composer require mouf/oo-amqp-client
使用方法
在使用此库之前,您应该熟悉AMQP概念。如果您不熟悉,我们强烈建议您从阅读RabbitMQ文档中的"AMQP 0-9-1模型解释"文档开始。
准备好了吗?让我们开始吧。
创建客户端
您首先想要创建的是Client
对象。一个Client
表示与RabbitMQ的连接(对于熟悉php-amqplib的您来说,它既是连接也是通道)。
use Mouf\AmqpClient\Client; $client = new Client( $rabbitmq_host, $rabbitmq_port, $rabbitmq_user, $rabbitmq_password, $rabbitmq_vhost = '/', $rabbitmq_insist = false, $rabbitmq_login_method = 'AMQPLAIN', $rabbitmq_login_response = null, $rabbitmq_locale = 'en_US', $rabbitmq_connection_timeout = 3.0, $rabbitmq_read_write_timeout = 3.0, $rabbitmq_context = null, $rabbitmq_keepalive = false, $rabbitmq_heartbeat = 0 );
注意:Client
类公开了多个有用的配置方法(如果您不知道它们的作用,则不需要使用它们)
public function setPrefetchSize($prefetchSize); public function setPrefetchCount($prefetchCount); public function setAGlobal($aGlobal);
创建交换
在AMQP中,交换是接收消息并将这些消息转发到队列的对象。因此,您必须定义一个Exchange
对象来发送消息。
use Mouf\AmqpClient\Objects\Exchange; $exchange = new Exchange($client, 'exchange_name', 'fanout');
在创建交换时,您将传递给构造函数的Client
对象、交换名称和交换类型。
注意:交换将在客户端中自动注册。
您可以使用配置方法应用高级配置
public function setPassive($passive); public function setDurable($durable); public function setAutoDelete($autoDelete); public function setInternal($internal); public function setNowait($nowait); public function setArguments($arguments); public function setTicket($ticket);
创建队列和绑定
到达交换的消息将通过绑定转发到一个队列。
我们现在创建一个队列来存储我们的消息。
use Mouf\AmqpClient\Objects\Queue; $queue = new Queue($client, 'queue_name', [ new Consumer(function(AMQPMessage $msg) { // Do some stuff with the received message }) ]);
在创建客户端时,您将传递给构造函数的Client
对象、客户端名称以及一个包含Consumer
对象的数组(实际上是一个实现ConsumerInterface
的对象的数组)。
Consumer
对象是包含每次收到消息时将被调用的代码的对象。
注意:队列将在客户端中自动注册。
您可以使用这些配置方法对您的队列应用高级配置
public function setPassive($passive); public function setDurable($durable); public function setExclusive($exclusive); public function setAutoDelete($autoDelete); public function setNoWait($noWait); public function setArguments($arguments); public function setTicket($ticket); public function setDeadLetterExchange(Exchange $exchange); public function setConfirm($confirm); public function setConsumerCancelNotify(Queue $consumerCancelNotify); public function setAlternateExchange(Queue $alternateExchange); public function setTtl($ttl); public function setMaxLength($maxLength); public function setMaxPriority($maxPriority);
如果您想使队列在接收器断电时存储消息,您肯定想使用setDurable
方法。
此时,我们有一个交换和一个队列,但它们没有连接在一起。我们需要使用Binding
对象来绑定它们。
use Mouf\AmqpClient\Objects\Binding; $binding = new Binding($exchange, $queue); $client->register($binding);
Binding
将交换与队列链接。
重要:与Exchange
和Queue
不同,Binding
不会在客户端中自动注册。您必须使用Client::register
方法自己声明它。
准备好了吗?让我们发送和接收消息!
发送消息
为了发送消息,您只需使用Exchange::publish
方法
$exchange->publish(new Message('your message body'), 'message_key'); // ... and that's it!
您还可以进一步配置消息的发送。Exchange::publish
方法接受一些可选参数
public function publish(Message $message, string $routingKey, bool $mandatory = false, bool $immediate = false, $ticket = null);
此外,您还可以使用这些方法之一调整Message
类
public function setContentType(string $content_type); public function setContentEncoding(string $content_encoding); public function setApplicationHeaders(array $application_headers); public function setDeliveryMode(int $delivery_mode); public function setPriority(int $priority); public function setCorrelationId(string $correlation_id); public function setReplyTo(string $reply_to); public function setExpiration(string $expiration); public function setMessageId(string $message_id); public function setTimestamp(\DateTimeInterface $timestamp); public function setType(string $type); public function setUserId(string $user_id); public function setAppId(string $app_id); public function setClusterId(string $cluster_id);
接收消息
正如我们已经看到的,接收消息的第一步是创建一个队列并将Consumer
对象添加到该队列中。
我们还需要告诉PHP开始监听,否则,Consumer
中的回调永远不会被调用。
这可以通过使用ConsumerService
类来实现。
$consumerService = new ConsumerService($client, [ $queue ]); $consumerService->run();
ConsumerService
构造函数接受客户端作为参数,以及必须监听的队列数组。
ConsumerService::run
方法将开始监听到达的消息,在一个无限循环中。
注意,如果你想只监听一条消息并在之后返回,可以使用$consumerService->run(true);
。
确认和错误处理
当你收到一条消息时,在Consumer
完成消息消费之前不会发送确认。
如果在Consumer
中触发异常,则会向RabbitMQ发送一个nack
。
注意:如果你的消费者回调抛出实现了RetryableExceptionInterface
接口的异常,则带有“requeue”标记的nack
消息将被发送。该消息将被重新入队。
注意:如果你的消费者回调抛出实现了FatalExceptionInterface
接口的异常,异常将被消费者传播(从而导致消费者脚本的崩溃)。否则,消费者将继续处理消息。
默认情况下,异常使用error_log函数进行记录。您可以通过将PSR-3兼容的记录器传递给AbstractConsumer
构造函数来覆盖此行为。
将您的消费者编写为类
到目前为止,为了创建一个消费者,我们使用了接受回调作为第一个构造参数的Consumer
类。
作为替代方案,您可以扩展AbstractConsumer
类并实现onMessageReceived
方法
class MyConsumer extends AbstractConsumer { public function onMessageReceived($msg) { // Do some stuff. } }
向特定队列发送消息
如果您想针对特定队列并直接向其发送消息,您有2种选择。
选项1:创建一个DefaultExchange
对象并将队列名称作为消息的键。
use Mouf\AmqpClient\Objects\DefaultExchange; $exchange = new DefaultExchange($client); // Simply pass the queue name as the second parameter of "publish". // Note: you do not need to bind the queue to the exchange. RabbitMQ does this automatically. $exchange->publish(new Message('your message body'), 'name_of_the_target_queue'); // ... and that's it!
选项2:使用Queue
对象的publish
方法
use Mouf\AmqpClient\Objects\Queue; $queue = new Queue($client, 'queue_name', [ new Consumer(function(AMQPMessage $msg) { // Do some stuff with the received message }) ]); // Shazam! We are directly sending a message to the queue. No exchange needed! $queue->publish(new Message('your message body'));
注意:这些是RabbitMQ特有的功能,可能不适用于其他AMQP总线。
Symfony控制台集成
此软件包附带2个Symfony命令,您可以使用它们发送和接收消息。
Mouf\AmqpClient\Commands\PublishCommand
(amqp:publish
) 允许您向交换发送任意消息(从文件或从STDIN读取)Mouf\AmqpClient\Commands\ConsumeCommand
(amqp:consume
) 监听所有配置的队列
运行单元测试
此软件包使用PHPUnit进行单元测试。
要运行测试
vendor/bin/phpunit
显然,您需要一个正在运行的RabbitMQ服务器来测试此软件包。如果您使用Docker,可以使用以下命令启动一个:
docker run -p 5672:5672 -p 15672:15672 rabbitmq:management