alveos / swarrot
一个用于消费 RabbitMQ 队列的简单库
Requires
- php: ^7.4 || ^8.0
- psr/log: ^1.0 || ^2.0 || ^3.0
- symfony/options-resolver: ^5.4 || ^6.0 || ^7.0
Requires (Dev)
- doctrine/dbal: ^3.0 || ^4.0
- doctrine/persistence: ^3.0
- friendsofphp/php-cs-fixer: ^3.27
- php-amqplib/php-amqplib: ^2.1 || ^3.0
- phpspec/prophecy: ^1.15
- phpspec/prophecy-phpunit: ^2.0
- phpstan/phpstan: ^1.10
- phpunit/phpunit: ^9.6 || ^10.0
- symfony/error-handler: ^5.4 || ^6.0 || ^7.0
- symfony/phpunit-bridge: ^5.4 || ^6.0 || ^7.0
- symfony/service-contracts: ^2.5 || ^3.0
Suggests
- pecl-amqp: *
- php-amqplib/php-amqplib: ^2.1 || ^3.0
Conflicts
- doctrine/persistence: <1.3
- dev-main / 4.x-dev
- v4.18.0
- v4.17.0
- v4.16.0
- v4.15.0
- v4.14.0
- v4.13.0
- v4.12.0
- v4.11.0
- v4.10
- v4.1.1
- v4.1
- v4.0.2
- v4.0.1
- v4.0.0
- v3.7.0
- v3.6.1
- v3.6.0
- v3.5.0
- v3.4.0
- v3.3.1
- 3.3.0
- v3.2.1
- v3.2.0
- v3.1.0
- v3.0.0
- v2.4.0
- v2.3.0
- v2.2.0
- v2.1.2
- v2.1.1
- v2.1.0
- v2.0.3
- v2.0.2
- v2.0.1
- v2.0.0
- v1.6.2
- v1.6.1
- v1.6.0
- v1.5.0
- v1.4.1
- v1.4.0
- v1.3.0
- v1.2.8
- v1.2.7
- v1.2.6
- v1.2.5
- v1.2.4
- v1.2.3
- v1.2.2
- v1.2.1
- v1.2.0
- v1.1.4
- v1.1.3
- v1.1.2
- v1.1.1
- v1.1.0
- v1.0.0
- dev-static-analysis-update
This package is auto-updated.
Last update: 2024-07-22 07:08:42 UTC
README
Swarrot 是一个 PHP 库,用于从任何代理中消费消息。
安装
建议通过 Composer 安装 Swarrot。需要 swarrot/swarrot
包
$ composer require swarrot/swarrot
用法
基本用法
首先,您需要创建一个消息提供者以从您的代理中检索消息。例如,使用 PeclPackageMessageProvider
(从使用 pecl amqp 包 的 AMQP 代理检索消息)
use Swarrot\Broker\MessageProvider\PeclPackageMessageProvider; // Create connection $connection = new \AMQPConnection(); $connection->connect(); $channel = new \AMQPChannel($connection); // Get the queue to consume $queue = new \AMQPQueue($channel); $queue->setName('global'); $messageProvider = new PeclPackageMessageProvider($queue);
完成后,您需要创建一个 Processor
来处理从代理中检索的消息。此处理器必须实现 Swarrot\Processor\ProcessorInterface
。例如
use Swarrot\Processor\ProcessorInterface; use Swarrot\Broker\Message; class Processor implements ProcessorInterface { public function process(Message $message, array $options): bool { echo sprintf("Consume message #%d\n", $message->getId()); return true; // Continue processing other messages } }
您现在有一个 Swarrot\Broker\MessageProviderInterface
用于检索消息和一个处理器来处理它们。所以,请让 Swarrot\Consumer
做它的任务
use Swarrot\Consumer; $consumer = new Consumer($messageProvider, $processor); $consumer->consume();
使用堆栈
深受 stackphp/builder 的启发,您可以使用 Swarrot\Processor\Stack\Builder
来堆叠您的处理器。使用 内置处理器 或通过 创建自己的,您可以扩展基本处理器的行为。在这个例子中,您的处理器被 2 个其他处理器装饰。ExceptionCatcherProcessor,它使用 try/catch 块装饰您的处理器,以及 MaxMessagesProcessor,它停止您的工人当某些消息已被消费。
use Swarrot\Processor\ProcessorInterface; use Swarrot\Broker\Message; class Processor implements ProcessorInterface { public function process(Message $message, array $options): bool { echo sprintf("Consume message #%d\n", $message->getId()); return true; // Continue processing other messages } } $stack = (new \Swarrot\Processor\Stack\Builder()) ->push('Swarrot\Processor\MaxMessages\MaxMessagesProcessor', new Logger()) ->push('Swarrot\Processor\ExceptionCatcher\ExceptionCatcherProcessor') ->push('Swarrot\Processor\Ack\AckProcessor', $messageProvider) ; $processor = $stack->resolve(new Processor());
以下是一个插图,展示了使用此顺序会发生什么
处理器
官方处理器
- AckProcessor
- 与 Doctrine 相关的处理器(感谢 Adrien Brault)
- ExceptionCatcherProcessor
- InsomniacProcessor(感谢 Adrien Brault)
- InstantRetryProcessor
- MaxExecutionTimeProcessor(感谢 Remy Lemeunier)
- MaxMessagesProcessor(感谢 Remy Lemeunier)
- MemoryLimitProcessor(感谢 Christophe Coevoet)
- RetryProcessor
- ServicesResetterProcessor(感谢 Pierrick Vignand)
- SignalHandlerProcessor
- XDeathMaxCountProcessor (感谢 Anthony Moutte)
- XDeathMaxLifetimeProcessor (感谢 Anthony Moutte)
创建您自己的处理器
要创建自己的处理器并能够使用它与 StackProcessor
,您只需实现 ProcessorInterface
并在构造函数中将另一个 ProcessorInterface
作为第一个参数传入。
已弃用的处理器与消息提供者/发布者
为了减少 swarrot/swarrot
依赖关系并简化维护,3.x 版本中已弃用一些处理器和消息提供者/发布者。它们将在 4.0 版本中删除。
如果您使用这些已弃用的类,可以创建自己的存储库来保留它们,或者如果您愿意帮助维护它们,我们可以在 swarrot 组织下创建一个专门的存储库。
消息提供者/发布者
- SQS 消息提供者(在 3.5.0 版本中)
- Stomp 消息提供者(在 3.6.0 版本中)
- Stomp 消息发布者(在 3.7.0 版本中)
- Interop 消息发布者与提供者(在 3.7.0 版本中)
处理器
- SentryProcessor(在 3.5.0 版本中)
- RPC 相关处理器(在 3.5.0 版本中)
- NewRelicProcessor(在 3.7.0 版本中)
灵感来源
许可协议
Swarrot 在 MIT 许可证下发布。有关详细信息,请参阅附带的 LICENSE 文件。