makaronnik/amphp-rabbitmq-manager

PHP (8.1) 异步 RabbitMQ 连接和通道管理器。PHPinnacle Ridge 的包装器

v1.0.0 2022-07-26 13:11 UTC

This package is auto-updated.

Last update: 2024-09-29 06:14:51 UTC


README

StandWithUkraine Unit Testing & Code Lint Latest Release License

Stand With Ukraine

Amphp RabbitMQ 管理器

PHP (8.1) 异步 RabbitMQ 连接和通道管理器,它是基于 PHPinnacle Ridge 库的包装器,基于 Amp

安装

此包可以作为 Composer 依赖项安装。

composer require makaronnik/amphp-rabbitmq-manager

要求

  • PHP 8.1+

此管理器用于什么?

  1. 获取已连接客户端(PHPinnacle\Ridge\Client)。如果连接失败(发生异常),它将尝试重新连接(尝试次数在管理器构造函数中配置)。当配置了 AsyncEvent 时,将发出事件表示已建立连接。
  2. 通过其名称获取活动通道(PHPinnacle\Ridge\Channel)。在此之前,管理器将验证连接,并在必要时执行连接过程。
  3. 处理异常断开。配置了记录器和异常后,将记录一条日志条目。当配置了 AsyncEvent 时,将发出事件表示连接已丢失。

基本用法

<?php

use Amp\Loop;
use PHPinnacle\Ridge\Client;
use PHPinnacle\Ridge\Channel;
use PHPinnacle\Ridge\Message;
use Makaronnik\RabbitManager\Manager;
use Cspray\Labrador\AsyncEvent\AmpEventEmitter;
use Cspray\Labrador\AsyncEvent\StandardEventFactory;

require __DIR__ . '/../vendor/autoload.php';

Loop::run(static function () {
    $dsn = getenv('RABBIT_EXAMPLE_DSN');

    if (false === \is_string($dsn) || empty($dsn)) {
        echo 'No example dsn! Please set RABBIT_EXAMPLE_DSN environment variable.', \PHP_EOL;

        Loop::stop();
    }

    $manager = new Manager(
        client: Client::create($dsn),
        pendingConnectionQueue: new SplQueue(),
        maxAttempts: 30,
        eventEmitter: new AmpEventEmitter(),
        eventFactory: new StandardEventFactory()
    );

    $channel = yield $manager->getChanel('testChannel');

    try {
        yield $channel->queueDeclare('basic_queue', false, false, false, true);

        for ($i = 0; $i < 10; $i++) {
            yield $channel->publish("test_$i", '', 'basic_queue');
        }

        yield $channel->consume(function (Message $message, Channel $channel) {
            echo $message->content . \PHP_EOL;
            yield $channel->ack($message);
        }, 'basic_queue');

    } catch (Throwable $exception) {
        yield $manager->handleConnectionBreak($exception);
    }

    yield $manager->disconnect('Bye!');
});

用例

如果您在容器化环境中运行应用程序(例如 Docker Compose),则大多数情况下,您的应用程序将比 RabbitMQ 容器准备好要快得多。当尝试创建已连接客户端时,这将在您的应用程序中引发异常。您需要自己捕获此异常并在一定间隔和尝试次数内尝试重新连接。

您可以使用 dockerize 工具在 RabbitMQ 服务器准备好响应请求后运行您的应用程序

CMD dockerize -wait tcp://rabbitmq:5672 -timeout 30s php app.php

但是,一些 rebbit 镜像,如 bitnami/rabbitmq,首先执行预配置运行,然后在生产模式下重新启动服务器。这种行为将使 dockerize 在启动的第一阶段给出响应,然后您的应用程序将启动并在尝试连接 RabbitMQ 时引发异常,因为服务器将进入重新启动状态。

在这种情况下,明智的做法是使用 dockerize 获取服务器的第一个响应,然后使用 amphp-rabbitmq-manager 在您的应用程序启动后成功获取连接。

版本控制

makaronnik/amphp-rabbitmq-manager 遵循 semver 语义版本控制规范。

许可证

MIT 许可证(MIT)。有关更多信息,请参阅 LICENSE