okvpn / cron-bundle
友好的 Docker 的 Symfony cron 扩展包,用于一致地处理计划任务,可以是并行或跨集群的,如 Symfony Messenger
Requires
- php: >=7.2
- dragonmantank/cron-expression: ^3.0
- symfony/framework-bundle: ^4.4 || ^5.4 || ^6.0 || ^7.0
Suggests
- symfony/lock: For locking jobs
- symfony/messenger: For handle cron jobs in the message queue.
- symfony/process: For using shell commands
README
本扩展包为在 Symfony 应用程序内注册和处理计划任务提供了接口。
目的
这是一个比现有 cron 扩展包更简单的替代品,无需 doctrine 依赖。还增加了支持中间件,用于在集群安装中自定义处理 cron 作业:(将作业发送到消息队列,如 Symfony Messenger;锁定等)。这允许限制并行运行进程的数量并对其进行优先排序。
功能
- 无需 doctrine/database。
- Docker 友好,作为后台命令运行,无需
crond
。 - 使用一毫秒的精度安排任务。
- 使用
@random 3600
和jitter
有更多随机化 crons 的方法。 - 与 Symfony Messenger 集成。
- 从不同的存储中加载 cron 作业(config.yml,标记服务,命令)。
- 支持多种引擎运行 cron(并行进程,消息队列,一致性)。
- 支持多种 cron 处理器/命令类型:(服务,symfony 命令,UNIX shell 命令)。
- 可以与计时器、订阅者和 React EventLoop 中的异步 I/O 一起使用,例如 Redis 订阅 clue/redis-react。
- 中间件和自定义。
目录
安装
使用 composer 安装
composer require okvpn/cron-bundle
对于 Symfony 4+,将扩展包添加到 config/bundles.php
<?php return [ ... // bundles Okvpn\Bundle\CronBundle\OkvpnCronBundle::class => ['all' => true], ]
快速使用
您可以使用 AsCron
或 AsPeriodicTask
属性来自动配置。
<?php declare(strict_types=1); namespace App\Service; use Okvpn\Bundle\CronBundle\Attribute\AsCron; use Okvpn\Bundle\CronBundle\Attribute\AsPeriodicTask; #[AsCron('*/5 * * * *', messenger: true)] class SyncAppWorker { public function __invoke(array $arguments = []): void { // code } } #[AsCron('*/10 * * * *', jitter: 60)] class Sync2AppWorker { /* ... */ } // Run each 10 minutes with 60 sec random delay #[AsCron('@random 3600')] class Sync3AppWorker { /* ... */ } // Run with random 0-3600 sec #[AsPeriodicTask('30 seconds', jitter: 5)] class Sync4AppWorker { /* ... */ } // Run each 30 sec with 5 sec random delay.
命令
运行当前的 cron 计划
# Add this line to system crontab and execute each minute.
php bin/console okvpn:cron
# Run cron scheduler every minute without exiting.
php bin/console okvpn:cron --demand
# Run cron scheduler for specific groups.
php bin/console okvpn:cron --group
调试并手动执行 cron 作业,并显示列表
php bin/console okvpn:debug:cron
php bin/console okvpn:debug:cron --execute-one=7
模拟运行 cron 任务。
php bin/console okvpn:cron --dry-run --demand -vvv
CRON 表达式
CRON 表达式语法取自库 dragonmantank/cron-expressions
此外,还扩展了 @random
以避免在午夜或每小时 XX:00 运行事物。大多数人这样做,并且同一服务每小时都有流量峰值。
示例
*/5 * * * * - every 5 min
0 1 * * 0 - at 1 am every Sunday
@random 3600 # where 3600 - parameter lambda in the Poisson distribution, if it will run each seconds. Here, the avg probability period is 1 hour.
第一种方法。安装系统 crontab
要定期从应用程序运行一组命令,配置系统每分钟运行 oro:cron 命令。在基于 UNIX 的系统上,您可以简单地为此设置一个 crontab 条目
*/1 * * * * /path/to/php /path/to/bin/console okvpn:cron --env=prod > /dev/null
第二种方法。使用 supervisor
设置 Supervisor 在需要时运行 cron。
sudo apt -y --no-install-recommends install supervisor
创建一个新的 supervisor 配置文件。
sudo vim /etc/supervisor/conf.d/app_cron.conf
将以下行添加到文件中。
[program:app-cron]
command=/path/to/bin/console okvpn:cron --env=prod --demand
process_name=%(program_name)s_%(process_num)02d
numprocs=1
autostart=true
autorestart=true
startsecs=0
redirect_stderr=true
priority=1
user=www-data
注册新的计划任务
要添加新的计划任务,您可以使用标签 okvpn.cron
或使用带有接口 Okvpn\Bundle\CronBundle\CronSubscriberInterface
的 autoconfigure
。
服务。
<?php namespace App\Cron; use Okvpn\Bundle\CronBundle\CronSubscriberInterface; class MyCron implements CronSubscriberInterface // implements is not required, but helpful if yor are use autoconfigure { public function __invoke(array $arguments = []) { // processing... } public static function getCronExpression(): string { return '*/10 * * * *'; } }
如果您使用默认配置,则相关的服务将自动注册,归功于 autoconfigure
。要显式声明服务,可以使用以下代码片段
services: App\Cron\MyCron: tags: - { name: okvpn.cron, cron: '*/5 * * * *' } App\Cron\SmsNotificationHandler: tags: - { name: okvpn.cron, cron: '*/5 * * * *', lock: true, async: true }
使用标签配置的可能选项
cron
- 一个 cron 表达式,如果为空,则命令将始终运行。lock
- 防止在先前的命令未完成时再次运行该命令。可能的值:true
,{name: lock1, ttl: 300}
。要使用此选项,需要安装 symfony 锁组件async
- 在新进程中运行命令,不阻塞主线程。arguments
- 参数数组,用于运行 symfony 控制台命令或将参数传递给处理器。priority
- 排序优先级。group
- 组名称,见“Cron 组”部分。jitter
- 随机延迟 0-60 秒interval
- 通过间隔运行周期性任务。示例:10
,10 seconds
,1 day
。messenger
- 将作业发送到 Messenger Bus。默认false
。您还可以在此处指定传输{routing: async}
,见 Symfony 将消息路由到传输
Symfony 控制台命令
services: App\Command\DowloadOrdersCommand: tags: - { name: console.command } - { name: okvpn.cron, cron: '*/5 * * * *' }
通过配置/ shell 命令
okvpn_cron: tasks: - command: "php %kernel_project.dir%/bin/console cache:clear --env=prod" # Shell command cron: "0 0 * * *" - command: "bash /root/renew.sh > /root/renew.txt" # Shell command group: root # Filtering by group. You can run `bin/console okvpn:cron --group=root` under the root user cron: "0 0 * * *" - command: 'App\Cron\YouServiceName' # Your service name cron: "0 0 * * *" - command: 'app:cron:sync-amazon-orders' # Your symfony console command name cron: "*/30 * * * *" async: true arguments: { '--transport': 15 } # command arguments or options jitter: 60 # 60 sec random delay - command: 'app:cron:wrfda-grib2' # run the command with 20 sec interval and 10 sec random delay interval: "20 seconds" jitter: 10
完整配置参考
# Your config file okvpn_cron: lock_factory: ~ # The Service to create lock. Default lock.factory, see Symfony Lock component. timezone: ~ # default timezone, like Europe/Minsk. if null will use php.ini default messenger: enable: false # Enable symfony messenger # Default options allow to add define default policy for all tasks, # For example to always run commands with locking and asynchronously default_policy: async: true # Default false lock: true # Default false messenger: true # Handle all jobs with symfony messenger bus. # Stamps it's markers that will add to each tasks. with_stamps: - 'Packagist\WebBundle\Cron\WorkerStamp' # service name for run cron in demand (Okvpn\Bundle\CronBundle\Runner\ScheduleLoopInterface) loop_engine: ~ tasks: # Defined tasks via configuration - command: 'app:noaa:gfs-grib-download' cron: '34,45 */6 * * *' messenger: { routing: lowpriority } # See Messenger configuration lock: true arguments: { '--transport': '0p25' } # Here you can also add other custom options and create your own middleware. - command: "bash /root/renew.sh > /root/renew.txt" # Shell command group: root # Group filter. You can run `bin/console okvpn:cron --group=root` under the root user cron: "0 0 * * *"
通过 Symfony Messenger 处理 Cron 作业
要限制并行运行进程的数量,可以使用 Symfony Messenger 在队列中处理 cron 作业。
- 安装 Symfony Messenger
- 启用 cron 作业的默认路由
# config/packages/messenger.yaml framework: messenger: transports: async: "%env(MESSENGER_TRANSPORT_DSN)%" lowpriority: "%env(MESSENGER_TRANSPORT_LOW_DSN)%" routing: # async is whatever name you gave your transport above 'Okvpn\Bundle\CronBundle\Messenger\CronMessage': async
- 启用 Messenger 用于 cron。
# config/packages/cron.yaml okvpn_cron: # Required. messenger middleware is disable messenger: enable: true # Optional default_options: messenger: true # For handle all cron jobs with messenger # Optional tasks: - command: 'app:noaa:gfs-grib-download' cron: '34,45 */6 * * *' # messenger: true # OR messenger: { routing: lowpriority } # Send to lowpriority transport
有关如何使用 messenger 的更多信息
您自己的计划任务加载器
您可以创建自定义任务加载器,见示例
<?php declare(strict_types=1); namespace Packagist\WebBundle; use Okvpn\Bundle\CronBundle\Loader\ScheduleLoaderInterface; use Okvpn\Bundle\CronBundle\Model\ScheduleEnvelope; use Okvpn\Bundle\CronBundle\Model; class DoctrineCronLoader implements ScheduleLoaderInterface { /** * @inheritDoc */ public function getSchedules(array $options = []): iterable { // ... get active cron from database/etc. yield new ScheduleEnvelope( 'yor_service_command_name', // A service name, (object must be have a __invoke method) // !!! Important. You must mark this service with tag `okvpn.cron_service` to add into our service locator. new Model\ScheduleStamp('*/5 * * * *'), // Cron expression new Model\LockStamp('yor_service_command_name'), // If you want to use locking new Model\AsyncStamp() // If you want to run asynchronously ); yield new ScheduleEnvelope( 'app:cron:sync-amazon-orders', // Symfony console new Model\ScheduleStamp('*/5 * * * *'), // Cron expression new Model\LockStamp('sync-amazon-orders_1'), // If you want to use locking new Model\ArgumentsStamp(['--integration' => 1]), // Command arguments new Model\AsyncStamp() // If you want to run asynchronously ); yield new ScheduleEnvelope( 'ls -l', // shell command new Model\ScheduleStamp('*/10 * * * *'), // Cron expression new Model\ShellStamp(['timeout'=> 300]), // Run command as shell ); // ... } }
并注册您的加载器。
services: Packagist\WebBundle\DoctrineCronLoader: tags: [okvpn_cron.loader]
跨集群或自定义消息队列处理 cron 作业
您可以使用 cron $group
在集群之间分割多个计划任务,见示例
<?php declare(strict_types=1); namespace App\Cron; class EntityCronLoader implements ScheduleLoaderInterface { public function getSchedules(array $options = []): iterable { if (!\in_array($group = $options['group'] ?? 'default', ['default', 'all_chunk']) && !\str_starts_with($group, 'chunk_')) { return; } $chunkId = str_replace('chunk_', '', $group) foreach ($this->registry->getAllRepos($chunkId) as $name => $repo) { $expr = '@random ' . $this->getSyncInterval($repo); yield new ScheduleEnvelope( 'sync:mirrors', new Model\ScheduleStamp($expr), new WorkerStamp(true), new Model\ArgumentsStamp(['mirror' => $name,]) ); } } }
[program:app-cron]
command=/path/to/bin/console okvpn:cron --env=prod --demand --group=chunk_%process_num%
process_name=%(program_name)s_%(process_num)02d
numprocs=1
使用 ReactPHP EventLoop
您可以直接将您自己的周期性任务添加到 Loop
。该捆绑包使用来自库 react/event-loop
的简单包装 Okvpn\Bundle\CronBundle\Runner\ScheduleLoopInterface
<?php use Okvpn\Bundle\CronBundle\Event\LoopEvent; use Okvpn\Bundle\CronBundle\Runner\TimerStorage; use Symfony\Component\EventDispatcher\Attribute\AsEventListener; class CronStartListener { #[AsEventListener('loopInit')] public function loopInit(LoopEvent $event): void { $dataDogS = $this->getDDog(); $event->getLoop()->addPeriodicTimer(6.0, static function () use ($dataDogS) { $dataDogS->set('crond', getmypid()); }); } }
配置 ReactPHP 适配器
如果您想使用异步 I/O,例如处理 websockets、redis,则需要安装 react/event-loop
composer req react/event-loop
# Add file to config/packages/* okvpn_cron: loop_engine: okvpn_cron.react_loop # service name
<?php use Okvpn\Bundle\CronBundle\Event\LoopEvent; use Okvpn\Bundle\CronBundle\Runner\TimerStorage; use Symfony\Component\EventDispatcher\Attribute\AsEventListener; use Okvpn\Bundle\CronBundle\Runner\TimerStorage; use React\EventLoop\Loop; class CronStartListener { public function __construct( private TimerStorage $timers, ) { } #[AsEventListener('loopInit')] public function loopInit(LoopEvent $event): void { $redis = new RedisClient('127.0.0.1:6379'); $timers = $this->timers; $loop = $event->getLoop(); $redis->on('message', static function (string $channel, string $payload) use ($timers, $loop) { [$command, $args] = unserialize($payload); if ($timers->hasTimer($envelope = $timers->find($command, $args))) { [$timer] = $timers->getTimer($envelope); $loop->futureTick($timer); } }); Loop::addPeriodicTimer(5.0, static function () use ($redis) { $redis->ping(); }); } }
许可证
MIT 许可证。