krak/ symfony-messenger-auto-scale
Symfony Messenger 自动扩展
Requires
- ext-pcntl: *
- krak/schema: ^0.2.0
- psr/event-dispatcher: ^1.0
- symfony/messenger: ^4.4|^5.4|^6.2
Requires (Dev)
- ext-redis: *
- krak/symfony-messenger-redis: ^0.1.0
- nyholm/symfony-bundle-test: ^1.6
- phpunit/phpunit: ^9.2
- psr/simple-cache: ^1.0
- symfony/cache: ^5.4
- symfony/console: ^5.4
- symfony/dependency-injection: ^5.4
- symfony/http-kernel: ^5.4
- symfony/process: ^5.4
- symfony/property-access: ^5.4
- symfony/serializer: ^5.4
README
Symfony Messenger 自动扩展包提供了动态扩展一组接收器工人数量的能力,以便响应动态工作负载。
某些类型的工作负载在长时间内吞吐量波动是常见的,需要队列消费者的数量动态扩展以满足需求。使用此自动扩展包,现在可以通过 symfony 的 messenger 系统实现这一点。
安装
使用 composer 在 krak/symfony-messenger-auto-scale
上安装。
如果 symfony 的 composer 安装没有自动注册包,您可以手动完成。
<?php return [ //... Krak\SymfonyMessengerAutoScale\MessengerAutoScaleBundle::class => ['all' => true], ];
用法
加载包之后,您需要配置工作池,这些工作池将管理一组 messenger 接收器的进程。
messenger_auto_scale: console_path: '%kernel.project_dir%/tests/Feature/Fixtures/console' pools: sales: min_procs: 0 max_procs: 5 receivers: "sales*" heartbeat_interval: 5 default: min_procs: 0 max_procs: 5 backed_up_alert_threshold: 100 receivers: "*" heartbeat_interval: 10
配置完成后,您可以使用 krak:auto-scale:consume
命令启动消费者,该命令将启动并管理工作池。
匹配接收器
每个池配置必须有一个 receivers
属性,它是一个简单的 glob,可以匹配 messenger 配置中设置的当前任何传输名称。
请注意,接收器只能属于一个池。因此,如果两个池的接收器模式匹配相同的接收器,则第一个定义的池将拥有该接收器。
接收器优先级
默认情况下,如果池匹配多个接收器,接收器在框架 messenger 配置中定义的顺序将决定它们被消费的顺序。
让我们看一个例子
# auto scale config messenger_auto_scale: pools: default: receivers: '*' # this will match all receivers defined # messenger config framework: messenger: transports: transport1: '' transport2: '' transport3: ''
池中的每个工作进程将首先处理 transport1 中的消息,然后为空时,它们将查看 transport2,依此类推。本质上,我们正在调用 messenger consume 命令,例如:console messenger:consume transport1 transport2 transport3
如果您想更明确地指定接收器优先级,则可以在传输上定义优先级选项,这将确保优先级最高的接收器在优先级较低的接收器之前处理。如果两个接收器具有相同的优先级,则定义顺序将占先。
让我们看一个例子
# auto scale config messenger_auto_scale: pools: default: receivers: '*' # this will match all receivers defined # messenger config framework: messenger: transports: transport3: dsn: '' options: { priority: -1 } transport1: dsn: '' options: { priority: 1 } transport2: '' # default priority is 0
这将产生与上述配置相同的效果。尽管传输的顺序不同,但优先级选项确保它们的顺序与上述相同。
禁用必须匹配所有接收器
默认情况下,如果任何接收器没有通过池配置匹配,则包将抛出异常。这是为了帮助防止任何意外的错误,其中由于某些原因接收器名称没有通过池匹配,而您预期它会匹配。
要禁用此检查,请将 must_match_all_receivers
配置选项更新为 false。
messenger_auto_scale: must_match_all_receivers: false
自定义工作进程命令和选项
默认情况下,每个工作进程以默认 symfony messenger:consume
命令启动,并传入接收器 ID。您可以配置要运行的命令以及与它相关的任何附加选项。
messenger_auto_scale: pools: default: # ... worker_command: 'messenger:consume' worker_command_options: ['--memory-limit=64M']
您可以在 symfony 的工作进程中的 ConsumeMessagesCommand 类 中找到所有可用选项。
配置心跳
默认情况下,每个工作池将每 60 秒记录一次心跳事件。如果您想更改此频率,请使用池 heartbeat_interval
定义后续心跳之间的秒数。
监控
您可以从自己的服务中访问PoolControl,如果您想构建自定义监控,或者您可以直接使用已注册的krak:auto-scale:pool:*
命令。
自动缩放
自动缩放是通过AutoScale接口管理的,该接口负责捕获在AutoScaleRequest
中记录的工人池当前状态,并在AutoScaleResponse
中返回该工人池预期的工人数。
默认的自动缩放已配置为根据当前队列大小和配置的消息速率进行工作,然后将其裁剪到配置的最小/最大进程数。还包括一些逻辑,用于防抖自动缩放请求,以确保系统在何时创建新的进程以及波动频率上更加谨慎。
以下是一些示例配置,我们将讨论一些场景
messenger_auto_scale: pools: catalog: max_procs: 5 message_rate: 100 scale_up_threshold_seconds: 5 scale_down_threshold_seconds: 20 receivers: "catalog" sales: min_procs: 5 message_rate: 10 scale_up_threshold_seconds: 5 scale_down_threshold_seconds: 20 receivers: "sales"
定义自己的自动缩放算法
如果您想增强或执行自己的自动缩放算法,可以实现AutoScale接口,然后更新Krak\SymfonyMessengerAutoScale\AutoScale
以指向您的新自动缩放服务。默认服务定义如下
use Krak\SymfonyMessengerAutoScale\AutoScale; $autoScale = new AutoScale\MinMaxClipAutoScale(new AutoScale\DebouncingAutoScale(new AutoScale\QueueSizeMessageRateAutoScale()));
警报
警报系统设计得非常灵活,允许每个用户根据自己的需求定义警报。警报只是当达到由实现RaiseAlerts
的服务确定的特定指标时触发的事件。
要实际触发警报,需要运行krak:auto-scale:alert
命令,该命令将检查池的状态并引发警报。将此命令放在cron中,以任何您希望警报监控的间隔运行。
订阅警报
您只需创建一个基本的事件监听器/订阅者,然后应该能够在这些事件上执行任何操作。
PoolBackedUpAlert
如果给定队列的消息太多,将触发此警报。要在池中启用此功能,需要定义backed_up_alert_threshold
配置值。
# ... sales: backed_up_alert_threshold: 100
如果销售池中有超过100条消息,则PoolBackedUpAlert将在下一次检查时触发。
创建自己的警报
要创建警报,需要订阅RaiseAlerts接口,然后注册该服务。如果启用自动配置,它应该自动带有messenger_auto_scale.raise_alerts
标签。
从Symfony应用访问Supervisor Pool配置
当将此作为symfony应用中的捆绑包安装时,提供对某些内部配置结构的访问可能很有帮助。该库公开了可以注入/访问的服务,以提供对内部配置的访问。
Supervisor Pool配置数组
krak.messenger_auto_scale.supervisor_pool_configs
根据自动缩放配置存储list
。
接收器到池名称数组
krak.messenger_auto_scale.receiver_to_pool_mapping
存储array
,它将messenger接收器ID映射到自动缩放池名称。
测试
您可以使用: composer test
运行测试套件
为了使功能测试套件通过,您需要在本地启动redis docker容器。
请注意,您需要在本地php cli上安装redis-ext,并且需要通过docker-compose
启动docker中的redis实例。