markup/job-queue-bundle

Markup Job Queue bundle 集成 oldsound/rabbiitmq-bundle,提供自动调度重复的控制台命令作业

安装数: 59,632

依赖项: 0

建议者: 0

安全性: 0

星标: 17

关注者: 22

分支: 7

开放问题: 3

类型:symfony-bundle

8.2.2 2022-11-14 14:36 UTC

README

Build Status

简介

此捆绑包提供了一些功能,为 symfony 控制台命令提供简单的作业队列机制和调度系统。它使用 rabbit-mq 管理作业队列(为此,多个工作者处理任务)。在继续之前,您应该阅读:https://github.com/videlalvaro/RabbitMqBundle。此捆绑包假定使用 'topic' 消费者而不是 'direct' 消费者。

这些工作者应由 supervisord 维护,以确保它们不会失败。

特性

  • 将控制台命令作业添加到 RabbitMq 以异步处理
  • 添加在未来日期运行的作业
  • 通过将 uuid 选项添加到所有控制台命令,在 redis 中记录作业状态(状态、峰值内存使用、输出等)
  • 使用 PHP 消费者或 Golang 消费者消费作业(感谢 ricbra/rabbitmq-cli-consumer)
  • 用于通过 supervisord 管理消费者的配置生成辅助命令

作业调度

而不是使用 crontab 调度控制台命令,它们应该在特定环境配置文件中管理(使用 crontab 语法)。这允许添加新的重复作业或更改时间,而无需修改多个环境中的 crontab。它还具有强制所有命令采用通用日志/异常通知策略的优势。这些任务示例包括轮询第三方服务器上的文件、发送队列电子邮件或生成报告。

配置

markup_job_queue:
    recurring: recurring_jobs.yml # name of file within app/config/
# app/config/recurring_jobs.yml
- command: your:console:command --and --any --options and arguments
  schedule: 1-59/2 * * * *
  topic: topic-of-a-configured-rabbitmq-producer # e.g 'default'
- command: another:console:command --and --any --options and arguments
  schedule: * * * * *
  topic: topic-of-a-configured-rabbitmq-producer

配置好您的重复调度后,您只需将一个控制台命令添加到您的实时 crontab。这将每分钟运行一个控制台命令,将任何 'due' 作业添加到 RabbitMQ 以进行处理

* * * * * /usr/bin/php /your/app/location/current/app/console markup:job_queue:recurring:add --no-debug -e=prod >> /var/log/recurring_jobs.log

在开发中,如果您更喜欢,可以在命令行上以间隔运行而不是安装到 crontab

while true; do app/console markup:job_queue:recurring:add; sleep 60; done

对于 'topic' 的值,需要在 oldsound/rabbitmq-bundle 配置中设置有效的消费者和生成器,如下所示,如果没有此类配置,作业处理将失败(目前这是一个约定,但最好通过允许此捆绑包直接配置 oldsound 捆绑包来强制执行 - PR 欢迎使用):由于 oldsound/rabbitmq-bundle 处理某些键的方式,请不要在生成器和消费者中使用连字符。

producers:
    a_valid_topic:
        connection:       default
        exchange_options: { name: 'a_valid_topic', type: topic }
                
consumers:
	a_valid_topic:
		connection:       default
		exchange_options: { name: 'a_valid_topic', type: topic }
		queue_options:    { name: 'a_valid_topic' }
		callback:         markup_job_queue.consumer

有几个控制台命令允许您通过 CLI 预览和验证配置的控制台作业(请参阅 /Command)

添加作业

作业也可以直接添加。有一个用于添加 'command' 作业的实用方法,它使用 Symfony 进程组件来执行控制台命令。可以使用以下方式使用 'jobby' 服务添加 'Command Job':

$container->get('jobby')
	->addConsoleCommandJob(
		'your:console:command',
		['--test']
		'a_valid_topic', # should be a valid topic name
		600, # allowed timeout for command (see symfony process component documentation)
		600, # allowed idle timeout for command (see symfony process component documentation)
	)

您可以使用此机制将大型导入任务分解成可以异步处理的更小部分。确保您适当地转义控制台命令中提供的任何用户参数。由于控制台命令是通过进程组件进行消费的,未转义的参数可能是安全攻击向量。

启用和监控工作者(通过 supervisord)

为了帮助部署此包,提供了一条控制台命令,可以在部署过程中运行。此控制台命令将生成一个supervisord文件,用于将其包含在您的main supervisord.conf文件中。这将生成一个配置,用于启动和监视php '消费者',每个主题提供一个消费者。有两种消费任务的方式。默认机制是使用由oldsound/rabbitmq-bundle提供的PHP消费者,但还有另一种机制使用基于Golang的消费者(ricbra/rabbitmq-cli-consumer)。要使用Golang变体,请提供cli_consumer节点的配置。

markup_job_queue:
	cli_consumer:
	    enabled: true

此控制台命令需要最小配置(每个要启动的消费者一个块)。按照惯例,这些必须与您已经定义的消费者相匹配(如上所示)。由于oldsound/rabbitmq-bundle处理某些键的方式,请不要在主题名称中使用短横线。

通过设置'prefetch_count',您可以选择消费者在重新启动前应处理的消息数量。

markup_job_queue:
	topics:
		test:
			prefetch_count: 10
		a_valid_topic:
			prefetch_count: 20

要写入配置文件

app/console markup:job_queue:supervisord_config:write disambiguator

默认情况下,文件将写入到/etc/supervisord/conf.d/。这可以修改

markup_job_queue:
	supervisor_config_path: /path/to/conf/file/

此路径需要包含在您的main /etc/supervisord.conf中,因此

[include]
files=/path/to/conf/file/*.conf

部署

例如,要将其用作capistrano部署的一部分,您可以为capistrano编写一些自定义任务

  • 停止消费者
  • 重写配置
  • 重新启动消费者

以下假设使用capistrano multistage在capifony 2.X下,情况可能有所不同

namespace :supervisor do
    	desc "Supervisor Tasks"
    	task :check_config, :roles => :app do
		stream "cd #{latest_release} && #{php_bin} #{symfony_console} markup:job_queue:recurring:check --env=#{symfony_env}"
	end
	task :write_config, :roles => :worker, :except => { :no_release => true } do
	        stream("cd #{latest_release} && #{php_bin} #{symfony_console} markup:job_queue:supervisord_config:write #{fetch(:stage)} --env=#{symfony_env_prod};")
	end
	task :restart_all, :roles => :app, :except => { :no_release => true } do
		stream "#{try_sudo} supervisorctl stop all #{fetch(:stage)}:*"
		stream "#{try_sudo} supervisorctl update"
		stream "#{try_sudo} supervisorctl start all #{fetch(:stage)}:*"
		capifony_puts_ok
	end
	task :stop_all, :roles => :app, :except => { :no_release => true } do
		# stops all consumers in this group
		stream "#{try_sudo} supervisorctl stop all #{fetch(:stage)}:*"
		capifony_puts_ok
	end
end