alveos/swarrot

该包已被放弃,不再维护。未建议替代包。

一个用于消费 RabbitMQ 队列的简单库

v4.18.0 2024-02-20 20:50 UTC

README

Build Status Scrutinizer Quality Score Latest Stable Version Latest Unstable Version

Swarrot 是一个 PHP 库,用于从任何代理中消费消息。

安装

建议通过 Composer 安装 Swarrot。需要 swarrot/swarrot

$ composer require swarrot/swarrot

用法

基本用法

首先,您需要创建一个消息提供者以从您的代理中检索消息。例如,使用 PeclPackageMessageProvider(从使用 pecl amqp 包 的 AMQP 代理检索消息)

use Swarrot\Broker\MessageProvider\PeclPackageMessageProvider;

// Create connection
$connection = new \AMQPConnection();
$connection->connect();
$channel = new \AMQPChannel($connection);
// Get the queue to consume
$queue = new \AMQPQueue($channel);
$queue->setName('global');

$messageProvider = new PeclPackageMessageProvider($queue);

完成后,您需要创建一个 Processor 来处理从代理中检索的消息。此处理器必须实现 Swarrot\Processor\ProcessorInterface。例如

use Swarrot\Processor\ProcessorInterface;
use Swarrot\Broker\Message;

class Processor implements ProcessorInterface
{
    public function process(Message $message, array $options): bool
    {
        echo sprintf("Consume message #%d\n", $message->getId());

        return true; // Continue processing other messages
    }
}

您现在有一个 Swarrot\Broker\MessageProviderInterface 用于检索消息和一个处理器来处理它们。所以,请让 Swarrot\Consumer 做它的任务

use Swarrot\Consumer;

$consumer = new Consumer($messageProvider, $processor);
$consumer->consume();

使用堆栈

深受 stackphp/builder 的启发,您可以使用 Swarrot\Processor\Stack\Builder 来堆叠您的处理器。使用 内置处理器 或通过 创建自己的,您可以扩展基本处理器的行为。在这个例子中,您的处理器被 2 个其他处理器装饰。ExceptionCatcherProcessor,它使用 try/catch 块装饰您的处理器,以及 MaxMessagesProcessor,它停止您的工人当某些消息已被消费。

use Swarrot\Processor\ProcessorInterface;
use Swarrot\Broker\Message;

class Processor implements ProcessorInterface
{
    public function process(Message $message, array $options): bool
    {
        echo sprintf("Consume message #%d\n", $message->getId());
        
        return true; // Continue processing other messages
    }
}

$stack = (new \Swarrot\Processor\Stack\Builder())
    ->push('Swarrot\Processor\MaxMessages\MaxMessagesProcessor', new Logger())
    ->push('Swarrot\Processor\ExceptionCatcher\ExceptionCatcherProcessor')
    ->push('Swarrot\Processor\Ack\AckProcessor', $messageProvider)
;

$processor = $stack->resolve(new Processor());

以下是一个插图,展示了使用此顺序会发生什么

this

处理器

官方处理器

创建您自己的处理器

要创建自己的处理器并能够使用它与 StackProcessor,您只需实现 ProcessorInterface 并在构造函数中将另一个 ProcessorInterface 作为第一个参数传入。

已弃用的处理器与消息提供者/发布者

为了减少 swarrot/swarrot 依赖关系并简化维护,3.x 版本中已弃用一些处理器和消息提供者/发布者。它们将在 4.0 版本中删除。

如果您使用这些已弃用的类,可以创建自己的存储库来保留它们,或者如果您愿意帮助维护它们,我们可以在 swarrot 组织下创建一个专门的存储库。

消息提供者/发布者

  • SQS 消息提供者(在 3.5.0 版本中)
  • Stomp 消息提供者(在 3.6.0 版本中)
  • Stomp 消息发布者(在 3.7.0 版本中)
  • Interop 消息发布者与提供者(在 3.7.0 版本中)

处理器

  • SentryProcessor(在 3.5.0 版本中)
  • RPC 相关处理器(在 3.5.0 版本中)
  • NewRelicProcessor(在 3.7.0 版本中)

灵感来源

许可协议

Swarrot 在 MIT 许可证下发布。有关详细信息,请参阅附带的 LICENSE 文件。