Amqp 简单实现

v1.0.3 2019-12-13 10:45 UTC

This package is auto-updated.

Last update: 2024-09-13 20:58:37 UTC


README

这个库提供了一系列用于处理基于AMQP 0.9.1协议的邮件系统工具,特别是用于Rabbit MQ。这些工具还包括对称名中间件的适配器,以及用于消费的命令行工具。

此存储库中的工具大多是高层类,它们大量使用底层Rabbit PHP客户端类,可以在https://github.com/php-amqplib/php-amqplib中找到。

创建交换机、队列和绑定。

为了声明交换机、队列和绑定,我们提供了三个构建器,它们是原ampqlib库中****_declare函数的“快捷方式”,除了BindBuilder使用queue_bind

  • namespace Pccomponentes\Amqp\Builder\ExchangeBuilder
  • namespace Pccomponentes\Amqp\Builder\QueueBuilder
  • namespace Pccomponentes\Amqp\Builder\BindBuilder

它们都提供了方法来设置不同的选项。

要了解每个构建器接受哪些选项,请参阅

使用示例

<?php
include __DIR__ . '/../vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use Pccomponentes\Amqp\Builder\ExchangeBuilder;
use Pccomponentes\Amqp\Builder\QueueBuilder;
use Pccomponentes\Amqp\Builder\BindBuilder;

$connection = new AMQPStreamConnection('ampq-rabbitmq', 5672, 'guest', 'guest', 'my_vhost');

$queueBuilder = (new QueueBuilder('queue_example'))
    ->durable()
    ->noAutoDelete();

$exchangeBuilder = (new ExchangeBuilder('exchange_example', ExchangeBuilder::TYPE_FANOUT))
    ->durable()
    ->noAutoDelete();

$bindBuilder = new BindBuilder('queue_example', 'exchange_example', '');

$channel = $connection->channel();
$queueBuilder->build($channel);
$exchangeBuilder->build($channel);
$bindBuilder->build($channel);

在队列中发布消息

要发布一个消息到一个队列,以下是一些类

  • namespace Pccomponentes\Amqp\Publisher\Publisher
  • namespace Pccomponentes\Amqp\Builder\BasicPublishBuilder
  • namespace Pccomponentes\Amqp\Builder\MessageBuilder

主类Publisher需要一个PhpAmqpLib\Connection\AbstractConnection实例、一个BasicPublishBuilder和一个MessageBuilder实例,并提供了发送消息到相应交换机的方法。

消息发送到哪个交换机,以及发送配置,由BasicPublishBuilder声明,它是对原库中basic_publish函数的快捷方式。

最后,为了方便创建PhpAmqpLib\Message\AMQPMessage消息,提供了一个相应的MessageBuilder,您可以在其中设置content_typedelivery_mode以及其他许多参数,用于构建和发送的每个消息。

更多信息

使用示例

<?php
include __DIR__ . '/../vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use Pccomponentes\Amqp\Builder\BasicPublishBuilder;
use Pccomponentes\Amqp\Builder\MessageBuilder;
use Pccomponentes\Amqp\Publisher\Publisher;

$connection = new AMQPStreamConnection('ampq-rabbitmq', 5672, 'guest', 'guest', 'my_vhost');

$basicPublishBuilder = (new BasicPublishBuilder('exchange_example'))
    ->noImmediate()
    ->noMandatory();

$messageBuilder = (new MessageBuilder())
    ->contentTypeJson()
    ->deliveryModePersistent();

$publisher = new Publisher($connection, $basicPublishBuilder, $messageBuilder);
$publisher->send('{"message" : "example"}', 'example');
$publisher->close();

消费队列

要消费一个队列中的消息,以下是一些类

  • namespace Pccomponentes\Amqp\Subscriber\Subscriber
  • namespace Pccomponentes\Amqp\Builder\BasicConsumeBuilder
  • namespace Pccomponentes\Amqp\Subscriber\SubscriberCallback
  • namespace Pccomponentes\Amqp\Subscriber\SubscriberMessage

主类Subscriber需要一个PhpAmqpLib\Connection\AbstractConnection实例、一个BasicConsumeBuilder和一个SubscriberCallback实例,并会发送一个类型为SubscriberMessage的消息。

如何消费队列的配置将委托给BasicConsumerBuilder,它是对原库中basic_qosbasic_consume方法的快捷方式。

接口 SubscriberCallback 将是您的项目需要实现的,并在其中编写当您检索到消息时想要执行的任务。此消息的类型为 SubscriberMessage,它只是 PhpAmqpLib\Message\AMQPMessage 的包装器,并提供了简单的 ACKNACKREJECT 方法来处理消息。此外,它还提供了一个 message 方法来访问原始类。

更多信息

使用示例

在以下示例中,我们将声明一个 Subscriber,它将对消费的每个消息进行简单的 var_dump

<?php
include __DIR__ . '/../vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use Pccomponentes\Amqp\Builder\BasicConsumeBuilder;
use Pccomponentes\Amqp\Subscriber\SubscriberMessage;
use Pccomponentes\Amqp\Subscriber\SubscriberCallback;
use Pccomponentes\Amqp\Subscriber\Subscriber;

$connection = new AMQPStreamConnection('ampq-rabbitmq', 5672, 'guest', 'guest', 'my_vhost');

$basicConsumeBuilder = new BasicConsumeBuilder('queue_example');
$basicConsumeBuilder
    ->wait()
    ->ack()
    ->local()
    ->prefetchSize(0)
    ->prefetchCount(1)
    ->noPrefetchGlobal();

$callback = new class implements SubscriberCallback
{
    public function execute(SubscriberMessage $message): void
    {
        \var_dump($message->message()->getBody());
        $message->ack();
    }
};
$subscriber = new Subscriber($connection, $basicConsumeBuilder, $callback);
$subscriber->listen(3, 10);

订阅者 + 消息总线

如果需要将 rabbit 队列中的消息发送到消息总线,则提供了 Pccomponentes\Amqp\Messenger\MessageBusSusbcriberCallback 类,这是为此目的实现的 Subscriber 回调的具体实现。更多信息

使用示例

<?php
include __DIR__ . '/../vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\MessageBus;
use Pccomponentes\Amqp\Builder\BasicConsumeBuilder;
use Pccomponentes\Amqp\Messenger\MessageBusSusbcriberCallback;
use Pccomponentes\Amqp\Subscriber\Subscriber;

$connection = new AMQPStreamConnection('ampq-rabbitmq', 5672, 'guest', 'guest', 'my_vhost');

$messageBusMiddleware = new class() implements MiddlewareInterface
{
    public function handle($message, callable $next)
    {
        \var_dump($message->message()->getBody());
        $message->ack();
        return $next($message);
    }
};

$basicConsumeBuilder = new BasicConsumeBuilder('queue_example');
$basicConsumeBuilder
    ->wait()
    ->ack()
    ->local()
    ->prefetchSize(0)
    ->prefetchCount(1)
    ->noPrefetchGlobal();

$messageBus = new MessageBus([$messageBusMiddleware]);
$toMessageBusCallback = new MessageBusSusbcriberCallback($messageBus);
$subscriber = new Subscriber($connection, $basicConsumeBuilder, $toMessageBusCallback);

$subscriber->listen(1, 10);

消息总线 + 发布者

要向消息队列中添加一个发布中间件,我们有两个辅助类:Pccomponentes\Amqp\Messenger\PublisherMiddleware,它是 symfony 消息总线的中间件,以及 Pccomponentes\Amqp\Messenger\MessageSerializer,它是一个接口,我们的项目将实现它以指示在将消息发送到消息传递系统之前如何序列化消息,以及将其发送到哪个 主题路由密钥

更多信息

使用示例

<?php
include __DIR__ . '/../vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use Pccomponentes\Amqp\Builder\BasicPublishBuilder;
use Pccomponentes\Amqp\Builder\MessageBuilder;
use Pccomponentes\Amqp\Publisher\Publisher;
use Pccomponentes\Amqp\Messenger\PublisherMiddleware;
use Pccomponentes\Amqp\Messenger\MessageSerializer;
use Symfony\Component\Messenger\MessageBus;

$connection = new AMQPStreamConnection('ampq-rabbitmq', 5672, 'guest', 'guest', 'my_vhost');

$basicPublishBuilder = (new BasicPublishBuilder('exchange_example'))
    ->noImmediate()
    ->noMandatory();

$messageBuilder = (new MessageBuilder())
    ->contentTypeJson()
    ->deliveryModePersistent();

$publisher = new Publisher($connection, $basicPublishBuilder, $messageBuilder);
$messageSerializer = new class implements MessageSerializer
{
    public function serialize($message): string
    {
        return \json_encode($message);
    }

    public function routingKey($message): string
    {
        return $message->topic;
    }
};
$publisherMiddleware = new PublisherMiddleware($publisher, $messageSerializer);

$messageBus = new MessageBus([$publisherMiddleware]);
$message = \json_decode(\json_encode(['body' => 'body example', 'topic' => 'topic_example']));
$messageBus->dispatch($message);

控制台命令

以下将详细介绍该库提供的控制台命令。所有这些命令都依赖于 Symfony 的 console 组件。更多信息

消费消息

使用 symfony 框架

如果我们的项目包含 symfony 框架,我们可以直接将命令放入依赖容器中,并使用相应的标签标记它。

例如

      
pdo_migration_command:
    class: Pccomponentes\Amqp\Command\SubscriberCommand
    arguments:
        - 'custom'                  # nombre del comando, que se concatenará a "subscriber:"
        - '@project.subscriber'     # Servicio subscriptor
    tags:
        - { name: console.command }

创建我们自己的控制台应用程序

为了能够执行该命令,我们首先需要生成一个应用程序。为此,我们应该创建一个包含以下内容的 PHP 文件,并根据需要对其进行修改以适应我们的项目。由于它将是一个控制台可执行文件,我们将它命名为 console(没有扩展名),并将其放置在我们的项目根目录下的 bin 目录中。

#!/usr/bin/env php
<?php
require __DIR__ . '/../vendor/autoload.php';

use Symfony\Component\Console\Application;
use Pccomponentes\Amqp\Command\SubscriberCommand;
use Pccomponentes\Amqp\Subscriber\Subscriber;

$subscriber = new Subscriber(/* argumentos */);
$application = new Application();
$application->addCommands(
    [
        new SubscriberCommand('custom', $subscriber)
    ]
);

$application->run();

执行命令

要执行命令,只需在终端中写入

bin/console subscriber:custom --timeout=10 20

在这里我们指示系统消费 20 条消息,并且当队列中没有消息时,将最多等待 10 秒,或者直到执行完毕。