swarrot/swarrot

一个简单的库,用于消费 RabbitMQ 队列

v4.18.0 2024-02-20 20:50 UTC

README

Build Status Scrutinizer Quality Score Latest Stable Version Latest Unstable Version

Swarrot 是一个 PHP 库,用于从任何代理中消费消息。

安装

推荐通过 Composer 安装 Swarrot。需要 swarrot/swarrot

$ composer require swarrot/swarrot

使用

基本使用

首先,您需要创建一个消息提供者来从您的代理中检索消息。例如,使用 PeclPackageMessageProvider(从带有 pecl amqp 包 的 AMQP 代理中检索消息)

use Swarrot\Broker\MessageProvider\PeclPackageMessageProvider;

// Create connection
$connection = new \AMQPConnection();
$connection->connect();
$channel = new \AMQPChannel($connection);
// Get the queue to consume
$queue = new \AMQPQueue($channel);
$queue->setName('global');

$messageProvider = new PeclPackageMessageProvider($queue);

完成后,您需要创建一个 Processor 来处理从代理检索的消息。此处理器必须实现 Swarrot\Processor\ProcessorInterface。例如

use Swarrot\Processor\ProcessorInterface;
use Swarrot\Broker\Message;

class Processor implements ProcessorInterface
{
    public function process(Message $message, array $options): bool
    {
        echo sprintf("Consume message #%d\n", $message->getId());

        return true; // Continue processing other messages
    }
}

现在,您有一个 Swarrot\Broker\MessageProviderInterface 来检索消息,以及一个处理器来处理它们。因此,请请求 Swarrot\Consumer 执行其工作

use Swarrot\Consumer;

$consumer = new Consumer($messageProvider, $processor);
$consumer->consume();

使用堆栈

受到 stackphp/builder 的强烈启发,您可以使用 Swarrot\Processor\Stack\Builder 来堆叠您的处理器。使用 内置处理器 或通过 创建自己的处理器,您可以扩展基本处理器的行为。在这个例子中,您的处理器被两个其他处理器装饰。这是 ExceptionCatcherProcessor,它使用 try/catch 块装饰您的处理器,以及 MaxMessagesProcessor,它会在某些消息被消费后停止您的工作者。

use Swarrot\Processor\ProcessorInterface;
use Swarrot\Broker\Message;

class Processor implements ProcessorInterface
{
    public function process(Message $message, array $options): bool
    {
        echo sprintf("Consume message #%d\n", $message->getId());
        
        return true; // Continue processing other messages
    }
}

$stack = (new \Swarrot\Processor\Stack\Builder())
    ->push('Swarrot\Processor\MaxMessages\MaxMessagesProcessor', new Logger())
    ->push('Swarrot\Processor\ExceptionCatcher\ExceptionCatcherProcessor')
    ->push('Swarrot\Processor\Ack\AckProcessor', $messageProvider)
;

$processor = $stack->resolve(new Processor());

以下是一个图示,展示了当使用此顺序时会发生什么

this

处理器

官方处理器

创建自己的处理器

要创建自己的处理器并使用它与 StackProcessor,你只需实现 ProcessorInterface 并在构造函数中将另一个 ProcessorInterface 作为第一个参数传递。

已弃用的处理器与消息提供者/发布者

为了减少 swarrot/swarrot 依赖项并简化维护,3.x 版本中已弃用一些处理器和消息提供者/发布者。它们将在 4.0 版本中删除。

如果你使用这些已弃用的类,你可以创建自己的仓库来保留它们,或者如果你愿意帮助维护它们,我们可以在 swarrot 组织下创建一个专门的仓库。

消息提供者/发布者

  • SQS 消息提供者(在 3.5.0 版本中)
  • Stomp 消息提供者(在 3.6.0 版本中)
  • Stomp 消息发布者(在 3.7.0 版本中)
  • Interop 消息发布者与提供者(在 3.7.0 版本中)

处理器

  • SentryProcessor(在 3.5.0 版本中)
  • RPC 相关处理器(在 3.5.0 版本中)
  • NewRelicProcessor(在 3.7.0 版本中)

灵感

许可证

Swarrot 采用 MIT 许可证发布。有关详细信息,请参阅附带 LICENSE 文件。