swarrot / swarrot-bundle
SwarrotBundle
Requires
- php: ^7.4|^8.0
- psr/log: ^1.0|^2.0|^3.0
- swarrot/swarrot: ^4.0
- symfony/config: ^5.4|^6.0|^7.0
- symfony/console: ^5.4|^6.0|^7.0
- symfony/dependency-injection: ^5.4|^6.0|^7.0
- symfony/http-kernel: ^5.4|^6.0|^7.0
- symfony/yaml: ^5.4|^6.0|^7.0
Requires (Dev)
- doctrine/common: ^2.9|^3.0
- doctrine/dbal: ^2.2|^3.0
- friendsofphp/php-cs-fixer: ^3.59
- php-amqplib/php-amqplib: ^2.9|^3.0
- phpspec/prophecy-phpunit: ^2.0
- phpstan/phpstan: ^1.2
- phpunit/phpunit: ^9.5
- symfony/event-dispatcher: ^5.4|^6.0|^7.0
- symfony/phpunit-bridge: ^5.4|^6.0|^7.0
Suggests
- php-amqplib/php-amqplib: Required if using amqp_lib
Conflicts
- doctrine/persistence: <1.3
This package is auto-updated.
Last update: 2024-09-04 15:38:40 UTC
README
一个用于在您的 Symfony 应用程序中使用 Swarrot 的包。
安装
安装此包的推荐方法是使用 Composer。只需运行
composer require swarrot/swarrot-bundle
将包注册到应用程序的内核中
// app/AppKernel.php public function registerBundles() { $bundles = array( // ... new Swarrot\SwarrotBundle\SwarrotBundle(), ); return $bundles; }
配置参考
swarrot: provider: pecl # pecl or amqp_lib (require php-amqplib/php-amqplib) default_connection: rabbitmq default_command: swarrot.command.base # Swarrot\SwarrotBundle\Command\SwarrotCommand logger: logger # logger or channel logger like monolog.logger.[my_channel] connections: rabbitmq: url: "amqp://%rabbitmq_login%:%rabbitmq_password%@%rabbitmq_host%:%rabbitmq_port%/%rabbitmq_vhost%" consumers: my_consumer: processor: my_consumer.processor.service # Symfony service id implementing Swarrot\Processor\ProcessorInterface middleware_stack: # order matters - configurator: swarrot.processor.signal_handler # extras: # signal_handler_signals: # - SIGTERM # - SIGINT # - SIGQUIT # - configurator: swarrot.processor.insomniac - configurator: swarrot.processor.max_messages # extras: # max_messages: 100 - configurator: swarrot.processor.max_execution_time # extras: # max_execution_time: 300 - configurator: swarrot.processor.memory_limit # extras: # memory_limit: null - configurator: swarrot.processor.doctrine_connection # extras: # doctrine_ping: true # doctrine_close_master: true - configurator: swarrot.processor.doctrine_object_manager - configurator: swarrot.processor.exception_catcher - configurator: swarrot.processor.ack # extras: # requeue_on_error: false - configurator: swarrot.processor.retry # extras: # retry_exchange: retry # retry_attempts: 3 # retry_routing_key_pattern: 'retry_%%attempt%%' # - configurator: swarrot.processor.services_resetter extras: poll_interval: 500000 messages_types: my_publisher: connection: rabbitmq # use the default connection by default exchange: my_exchange routing_key: my_routing_key
发布消息
第一步是从您的控制器中检索 Swarrot 发布者服务。
$messagePublisher = $this->get('swarrot.publisher');
之后,您需要使用 Message 类准备您的消息。
use Swarrot\Broker\Message; $message = new Message('"My first message with the awesome swarrot lib :)"');
然后,您可以从 message_types
中将新消息发布到预定义的配置(connection
、exchange
、routing_key
等)。
$messagePublisher->publish('my_publisher', $message);
发布消息时,您可以通过传递第三个参数来覆盖 message_types
配置
$messagePublisher->publish('my_publisher', $message, array( 'exchange' => 'my_new_echange', 'connection' => 'my_second_connection', 'routing_key' => 'my_new_routing_key' ));
消费消息
Swarrot 将自动为配置中的每个消费者创建一个命令。这些命令需要将队列名称作为第一个参数来消费。如果您不想使用默认的队列,您还可以使用命名连接作为第二个参数。
app/console swarrot:consume:my_consumer queue_name [connection_name]
您的消费者(my_consumer.processor.service
)必须实现 Swarrot\Processor\ProcessorInterface
use Swarrot\Processor\ProcessorInterface; class MyProcessor implements ProcessorInterface { public function process(Message $message, array $options) { var_dump($message->getBody()); // "My first message with the awesome swarrot lib :)" } }
您的处理器也将自动装饰为在 middleware_stack
部分中列出的所有处理器。顺序很重要。
所有这些处理器都是可配置的。您可以在您的 config.yml
文件中的每个配置定义上添加一个 extras
键。查看 配置参考 了解现有配置器的可用 extras。
您还可以使用命令行选项
- --poll-interval [默认: 500000]: 当在代理中找不到消息时更改轮询间隔
- --requeue-on-error (-r): 如果发生错误,将消息重新入队到同一个队列中。
- --no-catch (-C): 禁用 ExceptionCatcher 处理器(仅当处理器在堆栈中时可用)
- --max-execution-time (-t) [默认: 300]: 配置 MaxExecutionTime 处理器(仅当处理器在堆栈中时可用)
- --max-messages (-m) [默认: 300]: 配置 MaxMessages 处理器(仅当处理器在堆栈中时可用)
- --no-retry (-R): 禁用 Retry 处理器(仅当处理器在堆栈中时可用)
默认值将被您的 config.yml
覆盖,并且使用选项将覆盖默认配置值。
使用 -h
运行您的命令以获取完整的选项列表。
请注意,您可以使用 command_alias
配置定义一个或多个 别名 为此命令
swarrot: consumers: my_consumer: command_alias: 'my:super:commmand'
从而允许您使用更合适的措辞来消费消息
app/console my:super:command queue_name [connection_name]
实现自己的 Provider
如果您想实现自己的 Provider(如 Redis),您首先必须实现 Swarrot\SwarrotBundle\Broker\FactoryInterface
。然后,您可以将其与其他服务注册并标记为 swarrot.provider_factory
。
services: app.swarrot.custom_provider_factory: class: AppBundle\Provider\CustomFactory tags: - {name: swarrot.provider_factory} app.swarrot.redis_provider_factory: class: AppBundle\Provider\RedisFactory tags: - {name: swarrot.provider_factory, alias: redis}
现在,您可以在 config.yml
文件中告诉 Swarrot 使用它。
swarrot: provider: app.swarrot.custom_provider_factory
或使用别名
swarrot: provider: redis
使用自定义处理器
如果您想使用自定义处理器,您需要两样东西。即 Processor
本身和 ProcessorConfigurator
。对于 Processor
,您可以参考 swarrot/swarrot
文档。对于 ConfigurationProcessor
,您需要实现 ProcessorConfiguratorInterface
并将其注册为一个抽象服务,如下所示
services:
my_own_processor_configurator_service_id:
abstract: true
class: MyProject\MyOwnProcessorConfigurator
完成后,只需将其添加到消费者中间件堆栈中
middleware_stack: - configurator: swarrot.processor.signal_handler - configurator: my_own_processor_configurator_service_id
像往常一样,注意您的 middleware_stack
的顺序。
在不发布的情况下运行测试
如果您使用 Swarrot,您可能不想在测试环境中实际发布消息,例如。您可以使用 BlackholePublisher
来实现这一点。
只需将 DIC 中的 swarrot.publisher.class
参数重写为 Swarrot\SwarrotBundle\Broker\BlackholePublisher
类,例如更新 config_test.yml
即可
parameters: swarrot.publisher.class: Swarrot\SwarrotBundle\Broker\BlackholePublisher
代理配置
此组件的目标是处理消息消费,而不是处理您的代理配置。我们不希望将基础设施逻辑与消费逻辑混合。
如果您正在寻找配置代理的工具,请查看 odolbeau/rabbit-mq-admin-toolkit。
许可证
此组件在 MIT 许可证下发布。有关详细信息,请参阅捆绑的 LICENSE 文件。