swarrot/swarrot-bundle

SwarrotBundle

安装数: 3,306,948

依赖项: 5

建议者: 0

安全: 0

星标: 89

关注者: 4

分支: 59

开放问题: 13

类型:symfony-bundle

v2.6.0 2024-07-04 15:19 UTC

README

Latest Stable Version Latest Unstable Version

一个用于在您的 Symfony 应用程序中使用 Swarrot 的包。

安装

安装此包的推荐方法是使用 Composer。只需运行

composer require swarrot/swarrot-bundle

将包注册到应用程序的内核中

// app/AppKernel.php
public function registerBundles()
{
    $bundles = array(
        // ...
        new Swarrot\SwarrotBundle\SwarrotBundle(),
    );

    return $bundles;
}

配置参考

swarrot:
    provider: pecl # pecl or amqp_lib (require php-amqplib/php-amqplib)
    default_connection: rabbitmq
    default_command: swarrot.command.base # Swarrot\SwarrotBundle\Command\SwarrotCommand
    logger: logger # logger or channel logger like monolog.logger.[my_channel]
    connections:
        rabbitmq:
            url: "amqp://%rabbitmq_login%:%rabbitmq_password%@%rabbitmq_host%:%rabbitmq_port%/%rabbitmq_vhost%"
    consumers:
        my_consumer:
            processor: my_consumer.processor.service # Symfony service id implementing Swarrot\Processor\ProcessorInterface
            middleware_stack: # order matters
                 - configurator: swarrot.processor.signal_handler
                   # extras:
                   #     signal_handler_signals:
                   #         - SIGTERM
                   #         - SIGINT
                   #         - SIGQUIT
                 # - configurator: swarrot.processor.insomniac
                 - configurator: swarrot.processor.max_messages
                   # extras:
                   #     max_messages: 100
                 - configurator: swarrot.processor.max_execution_time
                   # extras:
                   #     max_execution_time: 300
                 - configurator: swarrot.processor.memory_limit
                   # extras:
                   #     memory_limit: null
                 - configurator: swarrot.processor.doctrine_connection
                   # extras:
                   #     doctrine_ping: true
                   #     doctrine_close_master: true
                 - configurator: swarrot.processor.doctrine_object_manager
                 - configurator: swarrot.processor.exception_catcher

                 - configurator: swarrot.processor.ack
                   # extras:
                   #     requeue_on_error: false
                 - configurator: swarrot.processor.retry
                   # extras:
                   #     retry_exchange: retry
                   #     retry_attempts: 3
                   #     retry_routing_key_pattern: 'retry_%%attempt%%'

                 # - configurator: swarrot.processor.services_resetter

            extras:
                poll_interval: 500000
    messages_types:
        my_publisher:
            connection: rabbitmq # use the default connection by default
            exchange: my_exchange
            routing_key: my_routing_key

发布消息

第一步是从您的控制器中检索 Swarrot 发布者服务。

$messagePublisher = $this->get('swarrot.publisher');

之后,您需要使用 Message 类准备您的消息。

use Swarrot\Broker\Message;

$message = new Message('"My first message with the awesome swarrot lib :)"');

然后,您可以从 message_types 中将新消息发布到预定义的配置(connectionexchangerouting_key 等)。

$messagePublisher->publish('my_publisher', $message);

发布消息时,您可以通过传递第三个参数来覆盖 message_types 配置

$messagePublisher->publish('my_publisher', $message, array(
    'exchange'    => 'my_new_echange',
    'connection'  => 'my_second_connection',
    'routing_key' => 'my_new_routing_key'
));

消费消息

Swarrot 将自动为配置中的每个消费者创建一个命令。这些命令需要将队列名称作为第一个参数来消费。如果您不想使用默认的队列,您还可以使用命名连接作为第二个参数。

app/console swarrot:consume:my_consumer queue_name [connection_name]

您的消费者(my_consumer.processor.service)必须实现 Swarrot\Processor\ProcessorInterface

use Swarrot\Processor\ProcessorInterface;

class MyProcessor implements ProcessorInterface
{
    public function process(Message $message, array $options)
    {
        var_dump($message->getBody()); // "My first message with the awesome swarrot lib :)"
    }
}

您的处理器也将自动装饰为在 middleware_stack 部分中列出的所有处理器。顺序很重要。

所有这些处理器都是可配置的。您可以在您的 config.yml 文件中的每个配置定义上添加一个 extras 键。查看 配置参考 了解现有配置器的可用 extras。

您还可以使用命令行选项

  • --poll-interval [默认: 500000]: 当在代理中找不到消息时更改轮询间隔
  • --requeue-on-error (-r): 如果发生错误,将消息重新入队到同一个队列中。
  • --no-catch (-C): 禁用 ExceptionCatcher 处理器(仅当处理器在堆栈中时可用)
  • --max-execution-time (-t) [默认: 300]: 配置 MaxExecutionTime 处理器(仅当处理器在堆栈中时可用)
  • --max-messages (-m) [默认: 300]: 配置 MaxMessages 处理器(仅当处理器在堆栈中时可用)
  • --no-retry (-R): 禁用 Retry 处理器(仅当处理器在堆栈中时可用)

默认值将被您的 config.yml 覆盖,并且使用选项将覆盖默认配置值。

使用 -h 运行您的命令以获取完整的选项列表。

请注意,您可以使用 command_alias 配置定义一个或多个 别名 为此命令

swarrot:
    consumers:
        my_consumer:
            command_alias: 'my:super:commmand'

从而允许您使用更合适的措辞来消费消息

app/console my:super:command queue_name [connection_name]

实现自己的 Provider

如果您想实现自己的 Provider(如 Redis),您首先必须实现 Swarrot\SwarrotBundle\Broker\FactoryInterface。然后,您可以将其与其他服务注册并标记为 swarrot.provider_factory

services:
    app.swarrot.custom_provider_factory:
        class: AppBundle\Provider\CustomFactory
        tags:
            - {name: swarrot.provider_factory}
    app.swarrot.redis_provider_factory:
        class: AppBundle\Provider\RedisFactory
        tags:
            - {name: swarrot.provider_factory, alias: redis}

现在,您可以在 config.yml 文件中告诉 Swarrot 使用它。

swarrot:
  provider: app.swarrot.custom_provider_factory

或使用别名

swarrot:
  provider: redis

使用自定义处理器

如果您想使用自定义处理器,您需要两样东西。即 Processor 本身和 ProcessorConfigurator。对于 Processor,您可以参考 swarrot/swarrot 文档。对于 ConfigurationProcessor,您需要实现 ProcessorConfiguratorInterface 并将其注册为一个抽象服务,如下所示

services:
  my_own_processor_configurator_service_id:
    abstract: true
    class: MyProject\MyOwnProcessorConfigurator

完成后,只需将其添加到消费者中间件堆栈中

middleware_stack:
  - configurator: swarrot.processor.signal_handler
  - configurator: my_own_processor_configurator_service_id

像往常一样,注意您的 middleware_stack 的顺序。

在不发布的情况下运行测试

如果您使用 Swarrot,您可能不想在测试环境中实际发布消息,例如。您可以使用 BlackholePublisher 来实现这一点。

只需将 DIC 中的 swarrot.publisher.class 参数重写为 Swarrot\SwarrotBundle\Broker\BlackholePublisher 类,例如更新 config_test.yml 即可

parameters:
    swarrot.publisher.class: Swarrot\SwarrotBundle\Broker\BlackholePublisher

代理配置

此组件的目标是处理消息消费,而不是处理您的代理配置。我们不希望将基础设施逻辑与消费逻辑混合。

如果您正在寻找配置代理的工具,请查看 odolbeau/rabbit-mq-admin-toolkit

许可证

此组件在 MIT 许可证下发布。有关详细信息,请参阅捆绑的 LICENSE 文件。