okvpn/cron-bundle

友好的 Docker 的 Symfony cron 扩展包,用于一致地处理计划任务,可以是并行或跨集群的,如 Symfony Messenger

安装次数: 465,317

依赖者: 1

建议者: 0

安全: 0

星级: 18

关注者: 1

分支: 5

开放问题: 0

类型:symfony-bundle

1.1.3 2024-03-29 23:19 UTC

This package is auto-updated.

Last update: 2024-08-30 00:08:48 UTC


README

本扩展包为在 Symfony 应用程序内注册和处理计划任务提供了接口。

Latest Stable Version Total Downloads Latest Unstable Version License

目的

这是一个比现有 cron 扩展包更简单的替代品,无需 doctrine 依赖。还增加了支持中间件,用于在集群安装中自定义处理 cron 作业:(将作业发送到消息队列,如 Symfony Messenger;锁定等)。这允许限制并行运行进程的数量并对其进行优先排序。

功能

  • 无需 doctrine/database。
  • Docker 友好,作为后台命令运行,无需 crond
  • 使用一毫秒的精度安排任务。
  • 使用 @random 3600jitter 有更多随机化 crons 的方法。
  • 与 Symfony Messenger 集成。
  • 从不同的存储中加载 cron 作业(config.yml,标记服务,命令)。
  • 支持多种引擎运行 cron(并行进程,消息队列,一致性)。
  • 支持多种 cron 处理器/命令类型:(服务,symfony 命令,UNIX shell 命令)。
  • 可以与计时器、订阅者和 React EventLoop 中的异步 I/O 一起使用,例如 Redis 订阅 clue/redis-react
  • 中间件和自定义。

目录

安装

使用 composer 安装

composer require okvpn/cron-bundle

对于 Symfony 4+,将扩展包添加到 config/bundles.php

<?php
return [
    ... //  bundles
    Okvpn\Bundle\CronBundle\OkvpnCronBundle::class => ['all' => true],
]

快速使用

您可以使用 AsCronAsPeriodicTask 属性来自动配置。

<?php declare(strict_types=1);

namespace App\Service;

use Okvpn\Bundle\CronBundle\Attribute\AsCron;
use Okvpn\Bundle\CronBundle\Attribute\AsPeriodicTask;

#[AsCron('*/5 * * * *', messenger: true)]
class SyncAppWorker
{
    public function __invoke(array $arguments = []): void
    {
        // code
    }
}

#[AsCron('*/10 * * * *', jitter: 60)]
class Sync2AppWorker { /* ... */ } // Run each 10 minutes with 60 sec random delay 

#[AsCron('@random 3600')]
class Sync3AppWorker { /* ... */ } // Run with random 0-3600 sec 

#[AsPeriodicTask('30 seconds', jitter: 5)]
class Sync4AppWorker { /* ... */ } // Run each 30 sec with 5 sec random delay.

命令

运行当前的 cron 计划

# Add this line to system crontab and execute each minute.
php bin/console okvpn:cron 

# Run cron scheduler every minute without exiting.
php bin/console okvpn:cron --demand 

# Run cron scheduler for specific groups.
php bin/console okvpn:cron --group  

调试并手动执行 cron 作业,并显示列表

php bin/console okvpn:debug:cron 

php bin/console okvpn:debug:cron --execute-one=7

debug

模拟运行 cron 任务。

php bin/console okvpn:cron --dry-run --demand -vvv

debug

CRON 表达式

CRON 表达式语法取自库 dragonmantank/cron-expressions

此外,还扩展了 @random 以避免在午夜或每小时 XX:00 运行事物。大多数人这样做,并且同一服务每小时都有流量峰值。

示例

*/5 * * * * - every 5 min 
0 1 * * 0 - at 1 am every Sunday
@random 3600 # where 3600 - parameter lambda in the Poisson distribution, if it will run each seconds. Here, the avg probability period is 1 hour.

第一种方法。安装系统 crontab

要定期从应用程序运行一组命令,配置系统每分钟运行 oro:cron 命令。在基于 UNIX 的系统上,您可以简单地为此设置一个 crontab 条目

*/1 * * * * /path/to/php /path/to/bin/console okvpn:cron --env=prod > /dev/null

第二种方法。使用 supervisor

设置 Supervisor 在需要时运行 cron。

sudo apt -y --no-install-recommends install supervisor

创建一个新的 supervisor 配置文件。

sudo vim /etc/supervisor/conf.d/app_cron.conf

将以下行添加到文件中。

[program:app-cron]
command=/path/to/bin/console okvpn:cron --env=prod --demand
process_name=%(program_name)s_%(process_num)02d
numprocs=1
autostart=true
autorestart=true
startsecs=0
redirect_stderr=true
priority=1
user=www-data

注册新的计划任务

要添加新的计划任务,您可以使用标签 okvpn.cron 或使用带有接口 Okvpn\Bundle\CronBundle\CronSubscriberInterfaceautoconfigure

服务。

<?php

namespace App\Cron;

use Okvpn\Bundle\CronBundle\CronSubscriberInterface;

class MyCron implements CronSubscriberInterface // implements is not required, but helpful if yor are use autoconfigure
{
    public function __invoke(array $arguments = [])
    {
        // processing...
    }

    public static function getCronExpression(): string
    {
        return '*/10 * * * *';
    }
}

如果您使用默认配置,则相关的服务将自动注册,归功于 autoconfigure。要显式声明服务,可以使用以下代码片段

services:
    App\Cron\MyCron:
        tags:
            - { name: okvpn.cron, cron: '*/5 * * * *' }
    

    App\Cron\SmsNotificationHandler:
        tags:
            - { name: okvpn.cron, cron: '*/5 * * * *', lock: true, async: true }

使用标签配置的可能选项

  • cron - 一个 cron 表达式,如果为空,则命令将始终运行。
  • lock - 防止在先前的命令未完成时再次运行该命令。可能的值:true{name: lock1, ttl: 300}。要使用此选项,需要安装 symfony 锁组件
  • async - 在新进程中运行命令,不阻塞主线程。
  • arguments - 参数数组,用于运行 symfony 控制台命令或将参数传递给处理器。
  • priority - 排序优先级。
  • group - 组名称,见“Cron 组”部分。
  • jitter - 随机延迟 0-60 秒
  • interval - 通过间隔运行周期性任务。示例:1010 seconds1 day
  • messenger - 将作业发送到 Messenger Bus。默认 false。您还可以在此处指定传输 {routing: async},见 Symfony 将消息路由到传输

Symfony 控制台命令

services:
    App\Command\DowloadOrdersCommand:
        tags:
            - { name: console.command }
            - { name: okvpn.cron, cron: '*/5 * * * *' }

通过配置/ shell 命令

okvpn_cron:
  tasks:
    -
      command: "php %kernel_project.dir%/bin/console cache:clear --env=prod" # Shell command 
      cron: "0 0 * * *"
    -
      command: "bash /root/renew.sh > /root/renew.txt" # Shell command
      group: root # Filtering by group. You can run `bin/console okvpn:cron --group=root` under the root user 
      cron: "0 0 * * *"
    -
      command: 'App\Cron\YouServiceName' # Your service name
      cron: "0 0 * * *"
    -
      command: 'app:cron:sync-amazon-orders' # Your symfony console command name
      cron: "*/30 * * * *"
      async: true
      arguments: { '--transport': 15 } # command arguments or options
      jitter: 60 # 60 sec random delay 

    -
      command: 'app:cron:wrfda-grib2' # run the command with 20 sec interval and 10 sec random delay 
      interval: "20 seconds"
      jitter: 10

完整配置参考

# Your config file
okvpn_cron:
    lock_factory: ~ # The Service to create lock. Default lock.factory, see Symfony Lock component.
    timezone: ~ # default timezone, like Europe/Minsk. if null will use php.ini default
    messenger:
        enable: false # Enable symfony messenger
        
    # Default options allow to add define default policy for all tasks, 
    # For example to always run commands with locking and asynchronously
    default_policy:
        async: true # Default false
        lock: true # Default false
        messenger: true # Handle all jobs with symfony messenger bus.
    
    # Stamps it's markers that will add to each tasks.
    with_stamps:
        - 'Packagist\WebBundle\Cron\WorkerStamp'
    
    # service name for run cron in demand (Okvpn\Bundle\CronBundle\Runner\ScheduleLoopInterface)
    loop_engine: ~

    tasks: # Defined tasks via configuration
      - 
        command: 'app:noaa:gfs-grib-download'
        cron: '34,45 */6 * * *'
        messenger: { routing: lowpriority } # See Messenger configuration
        lock: true
        arguments: { '--transport': '0p25' }
        # Here you can also add other custom options and create your own middleware.
      -
        command: "bash /root/renew.sh > /root/renew.txt" # Shell command
        group: root # Group filter. You can run `bin/console okvpn:cron --group=root` under the root user 
        cron: "0 0 * * *"

通过 Symfony Messenger 处理 Cron 作业

要限制并行运行进程的数量,可以使用 Symfony Messenger 在队列中处理 cron 作业。

  1. 安装 Symfony Messenger
  2. 启用 cron 作业的默认路由
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async: "%env(MESSENGER_TRANSPORT_DSN)%"
            lowpriority: "%env(MESSENGER_TRANSPORT_LOW_DSN)%"

        routing:
            # async is whatever name you gave your transport above
            'Okvpn\Bundle\CronBundle\Messenger\CronMessage':  async
  1. 启用 Messenger 用于 cron。
# config/packages/cron.yaml
okvpn_cron:
    # Required. messenger middleware is disable
    messenger:
        enable: true

    # Optional
    default_options:
        messenger: true # For handle all cron jobs with messenger
    
    # Optional 
    tasks:
        - 
            command: 'app:noaa:gfs-grib-download'
            cron: '34,45 */6 * * *'
#           messenger: true # OR
            messenger: { routing: lowpriority } # Send to lowpriority transport

有关如何使用 messenger 的更多信息

您自己的计划任务加载器

您可以创建自定义任务加载器,见示例

<?php declare(strict_types=1);

namespace Packagist\WebBundle;

use Okvpn\Bundle\CronBundle\Loader\ScheduleLoaderInterface;
use Okvpn\Bundle\CronBundle\Model\ScheduleEnvelope;
use Okvpn\Bundle\CronBundle\Model;

class DoctrineCronLoader implements ScheduleLoaderInterface
{
    /**
     * @inheritDoc
     */
    public function getSchedules(array $options = []): iterable
    {
        // ... get active cron from database/etc.

        yield new ScheduleEnvelope(
            'yor_service_command_name', // A service name, (object must be have a __invoke method)
            // !!! Important. You must mark this service with tag `okvpn.cron_service` to add into our service locator.
            new Model\ScheduleStamp('*/5 * * * *'), // Cron expression
            new Model\LockStamp('yor_service_command_name'), // If you want to use locking
            new Model\AsyncStamp() // If you want to run asynchronously
        );

        yield new ScheduleEnvelope(
            'app:cron:sync-amazon-orders', // Symfony console
            new Model\ScheduleStamp('*/5 * * * *'), // Cron expression
            new Model\LockStamp('sync-amazon-orders_1'), // If you want to use locking
            new Model\ArgumentsStamp(['--integration' => 1]), // Command arguments
            new Model\AsyncStamp() // If you want to run asynchronously
        );

        yield new ScheduleEnvelope(
            'ls -l', // shell command
            new Model\ScheduleStamp('*/10 * * * *'),  // Cron expression
            new Model\ShellStamp(['timeout'=> 300]), // Run command as shell
        );

        // ...
    }
}

并注册您的加载器。

services:
    Packagist\WebBundle\DoctrineCronLoader:
        tags: [okvpn_cron.loader]

跨集群或自定义消息队列处理 cron 作业

您可以使用 cron $group 在集群之间分割多个计划任务,见示例

<?php declare(strict_types=1);
namespace App\Cron;

class EntityCronLoader implements ScheduleLoaderInterface
{
    public function getSchedules(array $options = []): iterable
    {
        if (!\in_array($group = $options['group'] ?? 'default', ['default', 'all_chunk']) && !\str_starts_with($group, 'chunk_')) {
            return;
        }
        
        $chunkId = str_replace('chunk_', '', $group)
        foreach ($this->registry->getAllRepos($chunkId) as $name => $repo) {
            $expr = '@random ' . $this->getSyncInterval($repo);
            
            yield new ScheduleEnvelope(
                'sync:mirrors',
                new Model\ScheduleStamp($expr),
                new WorkerStamp(true),
                new Model\ArgumentsStamp(['mirror' => $name,])
            );
        }
    }
}
[program:app-cron]
command=/path/to/bin/console okvpn:cron --env=prod --demand --group=chunk_%process_num%
process_name=%(program_name)s_%(process_num)02d
numprocs=1

见定制示例

使用 ReactPHP EventLoop

您可以直接将您自己的周期性任务添加到 Loop。该捆绑包使用来自库 react/event-loop 的简单包装 Okvpn\Bundle\CronBundle\Runner\ScheduleLoopInterface

<?php
use Okvpn\Bundle\CronBundle\Event\LoopEvent;
use Okvpn\Bundle\CronBundle\Runner\TimerStorage;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;

class CronStartListener
{
    #[AsEventListener('loopInit')]
    public function loopInit(LoopEvent $event): void
    {
        $dataDogS = $this->getDDog();
        $event->getLoop()->addPeriodicTimer(6.0, static function () use ($dataDogS) {
            $dataDogS->set('crond', getmypid());
        });
    }
}

配置 ReactPHP 适配器

如果您想使用异步 I/O,例如处理 websockets、redis,则需要安装 react/event-loop

composer req react/event-loop 
# Add file to config/packages/*
okvpn_cron:
    loop_engine: okvpn_cron.react_loop # service name
<?php
use Okvpn\Bundle\CronBundle\Event\LoopEvent;
use Okvpn\Bundle\CronBundle\Runner\TimerStorage;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
use Okvpn\Bundle\CronBundle\Runner\TimerStorage;
use React\EventLoop\Loop;

class CronStartListener
{
    public function __construct(
        private TimerStorage $timers,
    ) {
    }

    #[AsEventListener('loopInit')]
    public function loopInit(LoopEvent $event): void
    {
        $redis = new RedisClient('127.0.0.1:6379');
        $timers = $this->timers;
        $loop = $event->getLoop();

        $redis->on('message', static function (string $channel, string $payload) use ($timers, $loop) {
            [$command, $args] = unserialize($payload);
            if ($timers->hasTimer($envelope = $timers->find($command, $args))) {
                [$timer] = $timers->getTimer($envelope);
                $loop->futureTick($timer);
            }
        });

        Loop::addPeriodicTimer(5.0, static function () use ($redis) {
            $redis->ping();
        });
    }
}

许可证

MIT 许可证。