markup / job-queue-bundle
Markup Job Queue bundle 集成 oldsound/rabbiitmq-bundle,提供自动调度重复的控制台命令作业
Requires
- php: >=7.1
- doctrine/collections: ^1.6
- doctrine/orm: ~2.7
- doctrine/persistence: ^1.3
- markup/rabbitmq-management-api: >=2.1.1
- mtdowling/cron-expression: 1.0.*
- pagerfanta/pagerfanta: ~1.0.2|^2
- php-amqplib/rabbitmq-bundle: ^1.14
- ramsey/uuid: ^3.8
- snc/redis-bundle: ~1.1.2|^2
- symfony/console: ^3.4|^4
- symfony/finder: ^3.4|^4
- symfony/framework-bundle: ^3.4|^4
- symfony/process: ^3.4|^4
- symfony/yaml: ^3.4|^4
- twig/twig: ^2
Requires (Dev)
- mockery/mockery: ^1.2
- phpstan/phpstan-shim: ^0.11.8
- phpunit/phpunit: ^7.2
- predis/predis: ^1.0
- symfony/form: ^3.4|^4
Suggests
- ricbra/rabbitmq-cli-consumer: To consume jobs with Go instead of PHP
README
简介
此捆绑包提供了一些功能,为 symfony 控制台命令提供简单的作业队列机制和调度系统。它使用 rabbit-mq 管理作业队列(为此,多个工作者处理任务)。在继续之前,您应该阅读:https://github.com/videlalvaro/RabbitMqBundle。此捆绑包假定使用 'topic' 消费者而不是 'direct' 消费者。
这些工作者应由 supervisord 维护,以确保它们不会失败。
特性
- 将控制台命令作业添加到 RabbitMq 以异步处理
- 添加在未来日期运行的作业
- 通过将 uuid 选项添加到所有控制台命令,在 redis 中记录作业状态(状态、峰值内存使用、输出等)
- 使用 PHP 消费者或 Golang 消费者消费作业(感谢 ricbra/rabbitmq-cli-consumer)
- 用于通过 supervisord 管理消费者的配置生成辅助命令
作业调度
而不是使用 crontab 调度控制台命令,它们应该在特定环境配置文件中管理(使用 crontab 语法)。这允许添加新的重复作业或更改时间,而无需修改多个环境中的 crontab。它还具有强制所有命令采用通用日志/异常通知策略的优势。这些任务示例包括轮询第三方服务器上的文件、发送队列电子邮件或生成报告。
配置
markup_job_queue: recurring: recurring_jobs.yml # name of file within app/config/
# app/config/recurring_jobs.yml - command: your:console:command --and --any --options and arguments schedule: 1-59/2 * * * * topic: topic-of-a-configured-rabbitmq-producer # e.g 'default' - command: another:console:command --and --any --options and arguments schedule: * * * * * topic: topic-of-a-configured-rabbitmq-producer
配置好您的重复调度后,您只需将一个控制台命令添加到您的实时 crontab。这将每分钟运行一个控制台命令,将任何 'due' 作业添加到 RabbitMQ 以进行处理
* * * * * /usr/bin/php /your/app/location/current/app/console markup:job_queue:recurring:add --no-debug -e=prod >> /var/log/recurring_jobs.log
在开发中,如果您更喜欢,可以在命令行上以间隔运行而不是安装到 crontab
while true; do app/console markup:job_queue:recurring:add; sleep 60; done
对于 'topic' 的值,需要在 oldsound/rabbitmq-bundle 配置中设置有效的消费者和生成器,如下所示,如果没有此类配置,作业处理将失败(目前这是一个约定,但最好通过允许此捆绑包直接配置 oldsound 捆绑包来强制执行 - PR 欢迎使用):由于 oldsound/rabbitmq-bundle 处理某些键的方式,请不要在生成器和消费者中使用连字符。
producers: a_valid_topic: connection: default exchange_options: { name: 'a_valid_topic', type: topic } consumers: a_valid_topic: connection: default exchange_options: { name: 'a_valid_topic', type: topic } queue_options: { name: 'a_valid_topic' } callback: markup_job_queue.consumer
有几个控制台命令允许您通过 CLI 预览和验证配置的控制台作业(请参阅 /Command)
添加作业
作业也可以直接添加。有一个用于添加 'command' 作业的实用方法,它使用 Symfony 进程组件来执行控制台命令。可以使用以下方式使用 'jobby' 服务添加 'Command Job':
$container->get('jobby') ->addConsoleCommandJob( 'your:console:command', ['--test'] 'a_valid_topic', # should be a valid topic name 600, # allowed timeout for command (see symfony process component documentation) 600, # allowed idle timeout for command (see symfony process component documentation) )
您可以使用此机制将大型导入任务分解成可以异步处理的更小部分。确保您适当地转义控制台命令中提供的任何用户参数。由于控制台命令是通过进程组件进行消费的,未转义的参数可能是安全攻击向量。
启用和监控工作者(通过 supervisord)
为了帮助部署此包,提供了一条控制台命令,可以在部署过程中运行。此控制台命令将生成一个supervisord文件,用于将其包含在您的main supervisord.conf文件中。这将生成一个配置,用于启动和监视php '消费者',每个主题提供一个消费者。有两种消费任务的方式。默认机制是使用由oldsound/rabbitmq-bundle提供的PHP消费者,但还有另一种机制使用基于Golang的消费者(ricbra/rabbitmq-cli-consumer)。要使用Golang变体,请提供cli_consumer节点的配置。
markup_job_queue: cli_consumer: enabled: true
此控制台命令需要最小配置(每个要启动的消费者一个块)。按照惯例,这些必须与您已经定义的消费者相匹配(如上所示)。由于oldsound/rabbitmq-bundle处理某些键的方式,请不要在主题名称中使用短横线。
通过设置'prefetch_count',您可以选择消费者在重新启动前应处理的消息数量。
markup_job_queue: topics: test: prefetch_count: 10 a_valid_topic: prefetch_count: 20
要写入配置文件
app/console markup:job_queue:supervisord_config:write disambiguator
默认情况下,文件将写入到/etc/supervisord/conf.d/。这可以修改
markup_job_queue: supervisor_config_path: /path/to/conf/file/
此路径需要包含在您的main /etc/supervisord.conf中,因此
[include]
files=/path/to/conf/file/*.conf
部署
例如,要将其用作capistrano部署的一部分,您可以为capistrano编写一些自定义任务
- 停止消费者
- 重写配置
- 重新启动消费者
以下假设使用capistrano multistage在capifony 2.X下,情况可能有所不同
namespace :supervisor do desc "Supervisor Tasks" task :check_config, :roles => :app do stream "cd #{latest_release} && #{php_bin} #{symfony_console} markup:job_queue:recurring:check --env=#{symfony_env}" end task :write_config, :roles => :worker, :except => { :no_release => true } do stream("cd #{latest_release} && #{php_bin} #{symfony_console} markup:job_queue:supervisord_config:write #{fetch(:stage)} --env=#{symfony_env_prod};") end task :restart_all, :roles => :app, :except => { :no_release => true } do stream "#{try_sudo} supervisorctl stop all #{fetch(:stage)}:*" stream "#{try_sudo} supervisorctl update" stream "#{try_sudo} supervisorctl start all #{fetch(:stage)}:*" capifony_puts_ok end task :stop_all, :roles => :app, :except => { :no_release => true } do # stops all consumers in this group stream "#{try_sudo} supervisorctl stop all #{fetch(:stage)}:*" capifony_puts_ok end end