pccomponentes / amqp
Amqp 简单实现
Requires
- php: ^7.2
- php-amqplib/php-amqplib: ^2.7
- psr/log: ^1.0
- symfony/console: ^4.1
- symfony/messenger: ^4.1
Requires (Dev)
- pccomponentes/ganchudo: ^1.0
- phpunit/phpunit: ^7.3
- squizlabs/php_codesniffer: ^3.3
- symfony/var-dumper: ^4.1
This package is auto-updated.
Last update: 2024-09-13 20:58:37 UTC
README
这个库提供了一系列用于处理基于AMQP 0.9.1协议的邮件系统工具,特别是用于Rabbit MQ。这些工具还包括对称名中间件的适配器,以及用于消费的命令行工具。
此存储库中的工具大多是高层类,它们大量使用底层Rabbit PHP客户端类,可以在https://github.com/php-amqplib/php-amqplib中找到。
创建交换机、队列和绑定。
为了声明交换机、队列和绑定,我们提供了三个构建器,它们是原ampqlib库中****_declare
函数的“快捷方式”,除了BindBuilder
使用queue_bind
。
namespace Pccomponentes\Amqp\Builder\ExchangeBuilder
namespace Pccomponentes\Amqp\Builder\QueueBuilder
namespace Pccomponentes\Amqp\Builder\BindBuilder
它们都提供了方法来设置不同的选项。
要了解每个构建器接受哪些选项,请参阅
使用示例
<?php include __DIR__ . '/../vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use Pccomponentes\Amqp\Builder\ExchangeBuilder; use Pccomponentes\Amqp\Builder\QueueBuilder; use Pccomponentes\Amqp\Builder\BindBuilder; $connection = new AMQPStreamConnection('ampq-rabbitmq', 5672, 'guest', 'guest', 'my_vhost'); $queueBuilder = (new QueueBuilder('queue_example')) ->durable() ->noAutoDelete(); $exchangeBuilder = (new ExchangeBuilder('exchange_example', ExchangeBuilder::TYPE_FANOUT)) ->durable() ->noAutoDelete(); $bindBuilder = new BindBuilder('queue_example', 'exchange_example', ''); $channel = $connection->channel(); $queueBuilder->build($channel); $exchangeBuilder->build($channel); $bindBuilder->build($channel);
在队列中发布消息
要发布一个消息到一个队列,以下是一些类
namespace Pccomponentes\Amqp\Publisher\Publisher
namespace Pccomponentes\Amqp\Builder\BasicPublishBuilder
namespace Pccomponentes\Amqp\Builder\MessageBuilder
主类Publisher
需要一个PhpAmqpLib\Connection\AbstractConnection
实例、一个BasicPublishBuilder
和一个MessageBuilder
实例,并提供了发送消息到相应交换机的方法。
消息发送到哪个交换机,以及发送配置,由BasicPublishBuilder
声明,它是对原库中basic_publish
函数的快捷方式。
最后,为了方便创建PhpAmqpLib\Message\AMQPMessage
消息,提供了一个相应的MessageBuilder
,您可以在其中设置content_type
、delivery_mode
以及其他许多参数,用于构建和发送的每个消息。
更多信息
使用示例
<?php include __DIR__ . '/../vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use Pccomponentes\Amqp\Builder\BasicPublishBuilder; use Pccomponentes\Amqp\Builder\MessageBuilder; use Pccomponentes\Amqp\Publisher\Publisher; $connection = new AMQPStreamConnection('ampq-rabbitmq', 5672, 'guest', 'guest', 'my_vhost'); $basicPublishBuilder = (new BasicPublishBuilder('exchange_example')) ->noImmediate() ->noMandatory(); $messageBuilder = (new MessageBuilder()) ->contentTypeJson() ->deliveryModePersistent(); $publisher = new Publisher($connection, $basicPublishBuilder, $messageBuilder); $publisher->send('{"message" : "example"}', 'example'); $publisher->close();
消费队列
要消费一个队列中的消息,以下是一些类
namespace Pccomponentes\Amqp\Subscriber\Subscriber
namespace Pccomponentes\Amqp\Builder\BasicConsumeBuilder
namespace Pccomponentes\Amqp\Subscriber\SubscriberCallback
namespace Pccomponentes\Amqp\Subscriber\SubscriberMessage
主类Subscriber
需要一个PhpAmqpLib\Connection\AbstractConnection
实例、一个BasicConsumeBuilder
和一个SubscriberCallback
实例,并会发送一个类型为SubscriberMessage
的消息。
如何消费队列的配置将委托给BasicConsumerBuilder
,它是对原库中basic_qos
和basic_consume
方法的快捷方式。
接口 SubscriberCallback
将是您的项目需要实现的,并在其中编写当您检索到消息时想要执行的任务。此消息的类型为 SubscriberMessage
,它只是 PhpAmqpLib\Message\AMQPMessage
的包装器,并提供了简单的 ACK、NACK 和 REJECT 方法来处理消息。此外,它还提供了一个 message
方法来访问原始类。
更多信息
使用示例
在以下示例中,我们将声明一个 Subscriber
,它将对消费的每个消息进行简单的 var_dump
。
<?php include __DIR__ . '/../vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use Pccomponentes\Amqp\Builder\BasicConsumeBuilder; use Pccomponentes\Amqp\Subscriber\SubscriberMessage; use Pccomponentes\Amqp\Subscriber\SubscriberCallback; use Pccomponentes\Amqp\Subscriber\Subscriber; $connection = new AMQPStreamConnection('ampq-rabbitmq', 5672, 'guest', 'guest', 'my_vhost'); $basicConsumeBuilder = new BasicConsumeBuilder('queue_example'); $basicConsumeBuilder ->wait() ->ack() ->local() ->prefetchSize(0) ->prefetchCount(1) ->noPrefetchGlobal(); $callback = new class implements SubscriberCallback { public function execute(SubscriberMessage $message): void { \var_dump($message->message()->getBody()); $message->ack(); } }; $subscriber = new Subscriber($connection, $basicConsumeBuilder, $callback); $subscriber->listen(3, 10);
订阅者 + 消息总线
如果需要将 rabbit 队列中的消息发送到消息总线,则提供了 Pccomponentes\Amqp\Messenger\MessageBusSusbcriberCallback
类,这是为此目的实现的 Subscriber
回调的具体实现。更多信息
使用示例
<?php include __DIR__ . '/../vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use Symfony\Component\Messenger\Middleware\MiddlewareInterface; use Symfony\Component\Messenger\MessageBus; use Pccomponentes\Amqp\Builder\BasicConsumeBuilder; use Pccomponentes\Amqp\Messenger\MessageBusSusbcriberCallback; use Pccomponentes\Amqp\Subscriber\Subscriber; $connection = new AMQPStreamConnection('ampq-rabbitmq', 5672, 'guest', 'guest', 'my_vhost'); $messageBusMiddleware = new class() implements MiddlewareInterface { public function handle($message, callable $next) { \var_dump($message->message()->getBody()); $message->ack(); return $next($message); } }; $basicConsumeBuilder = new BasicConsumeBuilder('queue_example'); $basicConsumeBuilder ->wait() ->ack() ->local() ->prefetchSize(0) ->prefetchCount(1) ->noPrefetchGlobal(); $messageBus = new MessageBus([$messageBusMiddleware]); $toMessageBusCallback = new MessageBusSusbcriberCallback($messageBus); $subscriber = new Subscriber($connection, $basicConsumeBuilder, $toMessageBusCallback); $subscriber->listen(1, 10);
消息总线 + 发布者
要向消息队列中添加一个发布中间件,我们有两个辅助类:Pccomponentes\Amqp\Messenger\PublisherMiddleware
,它是 symfony 消息总线的中间件,以及 Pccomponentes\Amqp\Messenger\MessageSerializer
,它是一个接口,我们的项目将实现它以指示在将消息发送到消息传递系统之前如何序列化消息,以及将其发送到哪个 主题 或 路由密钥。
更多信息
使用示例
<?php include __DIR__ . '/../vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use Pccomponentes\Amqp\Builder\BasicPublishBuilder; use Pccomponentes\Amqp\Builder\MessageBuilder; use Pccomponentes\Amqp\Publisher\Publisher; use Pccomponentes\Amqp\Messenger\PublisherMiddleware; use Pccomponentes\Amqp\Messenger\MessageSerializer; use Symfony\Component\Messenger\MessageBus; $connection = new AMQPStreamConnection('ampq-rabbitmq', 5672, 'guest', 'guest', 'my_vhost'); $basicPublishBuilder = (new BasicPublishBuilder('exchange_example')) ->noImmediate() ->noMandatory(); $messageBuilder = (new MessageBuilder()) ->contentTypeJson() ->deliveryModePersistent(); $publisher = new Publisher($connection, $basicPublishBuilder, $messageBuilder); $messageSerializer = new class implements MessageSerializer { public function serialize($message): string { return \json_encode($message); } public function routingKey($message): string { return $message->topic; } }; $publisherMiddleware = new PublisherMiddleware($publisher, $messageSerializer); $messageBus = new MessageBus([$publisherMiddleware]); $message = \json_decode(\json_encode(['body' => 'body example', 'topic' => 'topic_example'])); $messageBus->dispatch($message);
控制台命令
以下将详细介绍该库提供的控制台命令。所有这些命令都依赖于 Symfony 的 console 组件。更多信息
消费消息
使用 symfony 框架
如果我们的项目包含 symfony 框架,我们可以直接将命令放入依赖容器中,并使用相应的标签标记它。
例如
pdo_migration_command: class: Pccomponentes\Amqp\Command\SubscriberCommand arguments: - 'custom' # nombre del comando, que se concatenará a "subscriber:" - '@project.subscriber' # Servicio subscriptor tags: - { name: console.command }
创建我们自己的控制台应用程序
为了能够执行该命令,我们首先需要生成一个应用程序。为此,我们应该创建一个包含以下内容的 PHP 文件,并根据需要对其进行修改以适应我们的项目。由于它将是一个控制台可执行文件,我们将它命名为 console(没有扩展名),并将其放置在我们的项目根目录下的 bin 目录中。
#!/usr/bin/env php <?php require __DIR__ . '/../vendor/autoload.php'; use Symfony\Component\Console\Application; use Pccomponentes\Amqp\Command\SubscriberCommand; use Pccomponentes\Amqp\Subscriber\Subscriber; $subscriber = new Subscriber(/* argumentos */); $application = new Application(); $application->addCommands( [ new SubscriberCommand('custom', $subscriber) ] ); $application->run();
执行命令
要执行命令,只需在终端中写入
bin/console subscriber:custom --timeout=10 20
在这里我们指示系统消费 20 条消息,并且当队列中没有消息时,将最多等待 10 秒,或者直到执行完毕。