swarrot / swarrot
一个简单的库,用于消费 RabbitMQ 队列
Requires
- php: ^7.4 || ^8.0
- psr/log: ^1.0 || ^2.0 || ^3.0
- symfony/options-resolver: ^5.4 || ^6.0 || ^7.0
Requires (Dev)
- doctrine/dbal: ^3.0 || ^4.0
- doctrine/persistence: ^3.0
- friendsofphp/php-cs-fixer: ^3.27
- php-amqplib/php-amqplib: ^2.1 || ^3.0
- phpspec/prophecy: ^1.15
- phpspec/prophecy-phpunit: ^2.0
- phpstan/phpstan: ^1.10
- phpunit/phpunit: ^9.6 || ^10.0
- symfony/error-handler: ^5.4 || ^6.0 || ^7.0
- symfony/phpunit-bridge: ^5.4 || ^6.0 || ^7.0
- symfony/service-contracts: ^2.5 || ^3.0
Suggests
- pecl-amqp: *
- php-amqplib/php-amqplib: ^2.1 || ^3.0
Conflicts
- doctrine/persistence: <1.3
- dev-main / 4.x-dev
- v4.18.0
- v4.17.0
- v4.16.0
- v4.15.0
- v4.14.0
- v4.13.0
- v4.12.0
- v4.11.0
- v4.10
- v4.1.1
- v4.1
- v4.0.2
- v4.0.1
- v4.0.0
- v3.7.0
- v3.6.1
- v3.6.0
- v3.5.0
- v3.4.0
- v3.3.1
- 3.3.0
- v3.2.1
- v3.2.0
- v3.1.0
- v3.0.0
- v2.4.0
- v2.3.0
- v2.2.0
- v2.1.2
- v2.1.1
- v2.1.0
- v2.0.3
- v2.0.2
- v2.0.1
- v2.0.0
- v1.6.2
- v1.6.1
- v1.6.0
- v1.5.0
- v1.4.1
- v1.4.0
- v1.3.0
- v1.2.8
- v1.2.7
- v1.2.6
- v1.2.5
- v1.2.4
- v1.2.3
- v1.2.2
- v1.2.1
- v1.2.0
- v1.1.4
- v1.1.3
- v1.1.2
- v1.1.1
- v1.1.0
- v1.0.0
This package is auto-updated.
Last update: 2024-09-22 07:32:57 UTC
README
Swarrot 是一个 PHP 库,用于从任何代理中消费消息。
安装
推荐通过 Composer 安装 Swarrot。需要 swarrot/swarrot
包
$ composer require swarrot/swarrot
使用
基本使用
首先,您需要创建一个消息提供者来从您的代理中检索消息。例如,使用 PeclPackageMessageProvider
(从带有 pecl amqp 包 的 AMQP 代理中检索消息)
use Swarrot\Broker\MessageProvider\PeclPackageMessageProvider; // Create connection $connection = new \AMQPConnection(); $connection->connect(); $channel = new \AMQPChannel($connection); // Get the queue to consume $queue = new \AMQPQueue($channel); $queue->setName('global'); $messageProvider = new PeclPackageMessageProvider($queue);
完成后,您需要创建一个 Processor
来处理从代理检索的消息。此处理器必须实现 Swarrot\Processor\ProcessorInterface
。例如
use Swarrot\Processor\ProcessorInterface; use Swarrot\Broker\Message; class Processor implements ProcessorInterface { public function process(Message $message, array $options): bool { echo sprintf("Consume message #%d\n", $message->getId()); return true; // Continue processing other messages } }
现在,您有一个 Swarrot\Broker\MessageProviderInterface
来检索消息,以及一个处理器来处理它们。因此,请请求 Swarrot\Consumer
执行其工作
use Swarrot\Consumer; $consumer = new Consumer($messageProvider, $processor); $consumer->consume();
使用堆栈
受到 stackphp/builder 的强烈启发,您可以使用 Swarrot\Processor\Stack\Builder
来堆叠您的处理器。使用 内置处理器 或通过 创建自己的处理器,您可以扩展基本处理器的行为。在这个例子中,您的处理器被两个其他处理器装饰。这是 ExceptionCatcherProcessor,它使用 try/catch 块装饰您的处理器,以及 MaxMessagesProcessor,它会在某些消息被消费后停止您的工作者。
use Swarrot\Processor\ProcessorInterface; use Swarrot\Broker\Message; class Processor implements ProcessorInterface { public function process(Message $message, array $options): bool { echo sprintf("Consume message #%d\n", $message->getId()); return true; // Continue processing other messages } } $stack = (new \Swarrot\Processor\Stack\Builder()) ->push('Swarrot\Processor\MaxMessages\MaxMessagesProcessor', new Logger()) ->push('Swarrot\Processor\ExceptionCatcher\ExceptionCatcherProcessor') ->push('Swarrot\Processor\Ack\AckProcessor', $messageProvider) ; $processor = $stack->resolve(new Processor());
以下是一个图示,展示了当使用此顺序时会发生什么
处理器
官方处理器
- AckProcessor
- 与 Doctrine 相关的处理器(感谢 Adrien Brault)
- ExceptionCatcherProcessor
- InsomniacProcessor(感谢 Adrien Brault)
- InstantRetryProcessor
- MaxExecutionTimeProcessor(感谢 Remy Lemeunier)
- MaxMessagesProcessor(感谢 Remy Lemeunier)
- MemoryLimitProcessor(感谢 Christophe Coevoet)
- RetryProcessor
- ServicesResetterProcessor(感谢 Pierrick Vignand)
- SignalHandlerProcessor
- XDeathMaxCountProcessor (感谢 Anthony Moutte)
- XDeathMaxLifetimeProcessor (感谢 Anthony Moutte)
创建自己的处理器
要创建自己的处理器并使用它与 StackProcessor
,你只需实现 ProcessorInterface
并在构造函数中将另一个 ProcessorInterface
作为第一个参数传递。
已弃用的处理器与消息提供者/发布者
为了减少 swarrot/swarrot
依赖项并简化维护,3.x 版本中已弃用一些处理器和消息提供者/发布者。它们将在 4.0 版本中删除。
如果你使用这些已弃用的类,你可以创建自己的仓库来保留它们,或者如果你愿意帮助维护它们,我们可以在 swarrot 组织下创建一个专门的仓库。
消息提供者/发布者
- SQS 消息提供者(在 3.5.0 版本中)
- Stomp 消息提供者(在 3.6.0 版本中)
- Stomp 消息发布者(在 3.7.0 版本中)
- Interop 消息发布者与提供者(在 3.7.0 版本中)
处理器
- SentryProcessor(在 3.5.0 版本中)
- RPC 相关处理器(在 3.5.0 版本中)
- NewRelicProcessor(在 3.7.0 版本中)
灵感
许可证
Swarrot 采用 MIT 许可证发布。有关详细信息,请参阅附带 LICENSE 文件。