轻量级调度和作业运行系统

0.14.0 2022-04-14 03:50 UTC

This package is auto-updated.

Last update: 2024-09-14 09:31:08 UTC


README

一个简单的基于队列的作业调度和运行器

Sched 是一个简单的作业运行器和调度器,用于调度和运行作业。它使用 beanstalkd 作为后端,可以根据进入管线的作业任意调用代码,并使用 cron 语法调度作业。

安装

Sched 被设计成与应用程序一起安装,并使用您的供应商和自动加载设置。它不打算作为一个独立的应用程序运行,尽管它有自己的守护进程。

composer require dragonmantank/sched

如果您想使用 Sched 的 cron 功能,您需要设置系统 cron,使 Sched 每分钟检查一次是否有作业到期

* * * * /path/to/php /path/to/your/app/vendor/bin/sched-manager -c /path/to/sched-manager.config.php cron:process

要求

用法

运行调度需要启动管理器以及配置文件。要运行管理器

vendor/bin/sched-manager [-c /path/to/sched-manager.config.php] [-v] manager:run 

管理器将循环遍历所有配置的队列,并以 5 为一组处理它们。例如,如果队列中有 10 条消息,Sched 将启动 2 个工作进程,每个处理 5 个作业。它将循环遍历队列,并不断检查作业数量与工作进程数量的匹配情况。

配置

Sched 需要一个配置文件来知道如何处理您的队列,并将忽略任何未配置的队列。您还可以通过相同的配置文件安排作业和自定义命令。

return [
    'cron' => [
        [
            'name' => 'Name of cron job, for logging',
            'expression' => '* * * * *',
            'worker' => // Invokable that needs to run at this time
        ]
    ],
    'logger' => MyLoggingFactory::class,
    'custom_commands' => [MyCommand::class, MyOtherCommand::class],
    'manager' => [
        'max_workers' => 10,
        'max_workers_per_tube' => 5,
    ],
    'pheanstalk' => [
        'host' => '127.0.0.1',
        'port' => 113900,
        'timeout' => 10,
    ],
    'queues' => [
        'queueName' => [
            'worker' => // Invokable that processes the queue
        ],
    ],
];

队列管理

配置文件的 queues 部分允许您定义要监视哪些队列以及将什么代码传递给有效负载(称为工作进程)。假设每个有效负载都是一个 JSON 对象。

例如,如果您想监视 download-payroll-report 队列并使其由 Me\MyApp\ReportDownloader\Payroll 处理,可以这样设置

use Me\MyApp\ReportDownloader\Payroll;

return [
    'queues' => [
        'download-payroll-report' => [
            'worker' => Payroll::class
        ],
    ],
];

Me\MyApp\ReportDownloader\Payroll 只需要是一个可调用的类,并实现 __invoke(array $payload): int 签名。

namespace Me\MyApp\ReportDownloader;

class Payroll
{
    // Assuming a payload originally of {"url": "https://payrollcorp.net/api/report/2021-01-01?apiKey=S3CR3T"}
    public function __invoke(array $payload): int
    {
        $ch = curl_init($payload['url']);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
        $report = curl_exec($ch);
        curl_close($ch);

        file_put_contents(
            (\new DateTime())->format('Y-m-d') . '.csv',
            $report
        );

        return 0;
    }
}

您还可以控制每个管理器实例以及每个队列生成的作业数量。默认情况下,Sched 将自身限制为 10 个总作业和 5 个每队列总作业,但可以在 manager 配置部分的 max_workersmax_workers_per_queue 选项下进行调整。

安排作业

您还可以安排在特定时间运行的作业。与队列工作进程类似,您可以指定一个在特定时间运行的工作进程。假设我们想在每周六凌晨 4:00 运行报告下载器,以便我们的工资系统有足够的时间处理和生成报告。我们可以指定一个 cron 表达式和要执行的工作进程

use Me\MyApp\Cron\GeneratePayrollDownload;

return [
    'cron' => [
        [
            'name' => 'Generate Payroll Download',
            'expression' => '0 4 * * SAT',
            'worker' => GeneratePayrollDownload::class
        ]
    ]
    'queues' => [ ... ]
];

与队列工作进程一样,cron 工作进程只需要是一个可调用的类,并实现 public function __invoke(): int(注意它不需要 $payload

namespace Me\MyApp\Cron;

class GeneratePayrollDownload
{
    public function __construct(protected Pheanstalk $pheanstalk)
    {
    }

    public function __invoke(): int
    {
        $date = new \DateTimeImmutable();
        $url = 'https://payrollcorp.net/api/report/' . $date->format('Y-m-d') . '/?apiKey=S3C3R3T';
        $this->pheanstalk->useTube('download-payroll-report')
            ->put(json_encode([
                'url' => $url
            ]));
    }
}

虽然上面的示例添加了一个要由队列系统拾取和处理的作业,但您还可以运行在特定时间处理所需内容的作业,而不使用队列。

自定义命令

可能会出现这样的情况,你需要直接向Sched注册一个命令,而不是让它为你安排作业。例如,如果你想定期运行一个脚本以将大量文件排队处理,但又不想设置定时任务,你可以使用sched-manager注册一个可以调用的命令。你可以在配置文件的custom_commands数组中设置一个类名。这个命令必须是一个Symfony控制台命令

// sched-manager-config.php.dist
use Me\MyApp\Command\QueueFilesForProcessing;

return [
    'cron' => [ ...],
    'custom_commands' => [
        QueueFilesForProcessing::class,
    ]
    'queues' => [ ... ]
];
// QueueFieldsForProcessing.php
use Me\MyApp\Command;

use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class QueueFilesForProcessing extends Command
{
    protected static $defaultName = 'app:customcommand';

    protected function configure(): void
    {
        $this
            ->setHelp('This is a sample command');
    }

    protected function execute(InputInterface $input, OutputInterface $output)
    {
        // Do some stuff
        Return Command::SUCCESS;
    }

}

日志记录

Sched支持PSR-3兼容的日志记录器,同时也支持直接写入控制台。如果你向logger配置键提供一个可调用的类,Sched将会通过依赖注入将其注入到所需的命令中。许多命令即使没有使用合适的日志记录器,仍然会将输出写入控制台。

Sched提供了一个名为Dragonmantank\Sched\LoggingTrait的特质,用于帮助你在自己的命令中写入日志。你可以将这个特质添加到你的类中,然后使用$this->log($output, LogLevel::INFO, "My Message");将日志记录到日志记录器。这个函数还会根据命令中提供的详细程度(-v-vv-vvv)将日志写入控制台。