krak/job

简单而强大的队列任务

v0.4.1 2017-06-27 16:06 UTC

This package is auto-updated.

Last update: 2024-09-18 17:59:26 UTC


README

简单而强大的队列任务实现。

特性

  • 异步消费多个队列
  • 易于设置
  • 易于与任何项目集成
  • 可扩展性极强
  • 强大的扩展

安装

使用 composer 安装 krak/job

用法

创建内核

内核是 Job 库的核心管理器。它只是一个 Cargo\Container 装饰器,具有辅助方法。

<?php

$kernel = new Krak\Job\Kernel();

如果您想使用特殊的容器,也可以传递一个可选的 Container 实例。

配置内核

您可以通过包装容器中定义的任何服务来配置内核。除了 Cargo\Container 提供的配置之外,内核还提供了辅助方法以简化自定义。

// configure the scheduling loop
$kernel->config([
    'queue' => 'jobs', // name of the queue
    'sleep' => 10, // duration in seconds the scheduler will sleep for after every iteration
    'ttl' => 50, // max duration of the scheduler before dying
    'max_jobs' => 10, // max number of processes to launch at once
    'max_retry' => 3, // max number of retries before just giving up on a failed job
    'batch_size' => 5, // max number of jobs to process in a given batch
]);

// configure the queue provider
$kernel['Predis\ClientInterface'] = function() {
    return new Predis\Client();
};
$kernel['krak.job.queue_provider'] = 'redis';

// configure the consumer stack
$kernel->wrap('krak.job.pipeline.consumer', function($consumer) {
    return $consumer->push(myConsumer());
});
// and so on...

定义任务

每个任务必须实现空接口 Krak\Job\Job 并有一个 handle 方法。当任务被消费以进行处理时,将执行 handle 方法。

<?php

namespace Acme\Jobs;

use Krak\Job\Job;
use Acme\ServiceA;

class ProcessJob implements Job
{
    private $id;

    public function __construct($id) {
        $this->id = $id;
    }

    public function handle(ServiceA $service) {
        process($this->id);
    }
}

参数将自动通过 AutoArgs 包连接到 handle 方法。任务实例将被序列化,因此请确保任务的属性是可序列化的。将数据量保持在任务中最小也是一个好主意。

如果您想自定义包装的任务,也可以实现 Krak\Job\PipeWrappedJob 接口。

<?php

use Krak\Job;

class ProcessJob implements Job\Job, Job\PipeWrappedJob
{

    public function handle() {}

    public function pipe(Job\WrappedJob $wrapped) {
        return $wrapped->withName('my_custom_job_name')
            ->withQueue('my_custom_queue')
            ->withDelay(3600)
            ->withAddedPayload([
                'custom_data' => 1,
            ]);
    }
}

当任务分发和产生时,将调用管道函数并配置包装的任务实例。

作为最终便利,我们提供了 Krak\Job\AbstractJob,您可以扩展它,它已经实现了 Krak\Job\JobKrak\Job\PipeWrappedJob,并提供了默认实现。

分发任务

使用 Krak\Job\Dispatch 可以轻松分发任务。

<?php

use Krak\Job;

// use the kernel to create a dispatch instance
$dispatch = $kernel['dispatch']; // or $kernel[Job\Dispatch::class];

$dispatch->wrap(new Acme\Jobs\ProcessJob(1))
    ->onQueue('process') // this is optional
    ->withName('process')
    ->delay(3600) // will delay the sending of this job for 1 hour (not all queues support delay)
    ->dispatch();

消费任务

为了开始消费任务,您需要做一些事情

  1. 将命令注册到您的 Symfony Console 应用程序中

    <?php
    // in bin/console
    
    $app = new Symfony\Component\Console\Application();
    Krak\Job\registerConsole($app, $kernel);

    此时,我们已经注册了所有任务命令并将 JobHelper 添加到辅助集中。

  2. 启动消费者

    ./bin/console job:consume -vvv

    您可以更改详细程度以适应您的需求

重启和停止任务

有时您想要重启正在运行消费者甚至重启系统。为了启用此功能,您需要将 Psr\SimpleCache 集成到内核中。

启用缓存

<?php

$kernel['Psr\SimpleCache\CacheInterface'] = function($c) {
    // return any CacheInterface
    return new Symfony\Component\Cache\Simple\RedisCache($c['Predis\ClientInterface']);
};

一旦启用缓存,您将可以访问以下命令: job:stopjob:restartjob:statusjob:reset

概念

Job 库分为几个部分:内核、调度循环、队列、分发、控制台、管道、任务

内核

内核实现了 Krak\Job\Kernel 接口,负责配置和管理一切。

调度器

调度器负责安排任务的运行。实际上,Krak\Job\Scheduler 类不包含太多逻辑,它只是启动一个无限循环并将控制权传递给 调度循环

调度循环是一个处理程序,它接受一组参数,如记录器和配置,并执行操作。实现可以是任何东西。我们有两种主要的调度循环类型:队列循环和调度循环。

队列循环管理任务的调度。它们从队列中向工作者分发任务,并回收完成的任务。它们还管理任务失败。

调度循环负责管理其他调度器。这允许递归调度和异步处理不同的队列。因为你可以有一个调度器,它在其自己的进程中管理两个不同的队列调度器。

工作进程

工作进程是一个简单的类,它接受一个包装作业并对其运行消费者。

消费者

消费者是一个处理程序,它接受一个WrappedJob实例并返回一个结果。我们使用Krak\Mw库将一组消费者中间件转换为一个单一的消费者,以便能够通过消费者进行完全定制。

生产者

生产者是消费者的对立面。生产者被设计为生成一个作业并将其推送到队列中。与消费者一样,生产者通过一组生产者中间件实现。

分发

分发是一个非常简单的类/接口,它设计用来将作业包装到WrappedJob中,然后将其分发到生产者。

队列

排队模块处理实际的排队实现。有两个主要组件:队列管理器和队列。

支持的队列

  • Doctrine
  • Redis
  • Sqs
  • Stub
  • Sync

Doctrine

Doctrine需要安装doctrine/dbal库。

$kernel['Doctrine\DBAL\Connection'] = function() {
    return Doctrine\DBAL\DriverManager::getConnection([
        /* connection params */
    ]);
};
$kernel['krak.job.queue_provider'] = 'doctrine';
$kernel['krak.job.queue.doctrine.table_name'] = 'krak_jobs';

设置完成后,你需要执行数据库迁移来初始化作业表。Krak\Job\Queue\Doctrine\JobMigration类是一个实用工具,将帮助运行迁移。

如果你已经在项目中使用Doctrine Migrations,你可以简单地使用以下方法

// in your migration class
public function up(Schema $schema) {
    $migration = new Krak\Job\Queue\Doctrine\JobMigration('krak_jobs');
    $migration->up($schema);
}

public function down(Schema $schema) {
    $migration = new Krak\Job\Queue\Doctrine\JobMigration('krak_jobs');
    $migration->down($schema);
}

此外,你可以简单地运行以下PHP代码来迁移你的表向上或向下

$conn = $kernel['Doctrine\DBAL\Connection'];
$migration = $kernel['Krak\Job\Queue\Doctrine\JobMigration'];

// up
$migration->migrateUp($conn);
// or down
// $migration->migrateDown($conn);

Redis

Redis需要安装predis/predis库。然后你只需通过以下方式设置队列管理器

$kernel['Predis\ClientInterface'] = function() {
    return new Predis\Client();
};
$kernel['krak.job.queue_provider'] = 'redis';

Sqs

Sqs需要安装aws sdk aws/aws-sdk-php。你可以通过以下方式设置队列管理器

$kernel['Aws\Sqs\SqsClient'] = function() {
    return new Aws\Sqs\SqsClient();
};
$kernel['krak.job.queue.sqs.queue_url_map'] = ['queue-name' => '{queue-url}'];
$kernel['krak.job.queue.sqs.receive_options'] = ['VisibilityTimeout' => 10];
$kernel['krak.job.queue_provider'] = 'sqs';

queue_url_map是一个缓存,它将用于从队列名称中查找sqs队列的URL。此缓存是可选的,如果未设置,则将在运行时填充。

消息配置

当你包装和分发作业时,你可以使用配置来配置如何发送消息。

$dispatch->wrap(new MyJob())->with('sqs', ['AnySendMessageParamer' => 'Value'])->dispatch();

Stub

stub队列实际上是一个空操作队列提供程序。它不会对给定的任何作业进行排队或消费。

$kernel['krak.job.queue_provider'] = 'stub';

Sync

同步(同步)队列提供程序将同步地在调用线程中消费你的作业,而不是将它们分发到外部服务以在另一个进程中异步消费。这对于调试和开发目的很有用。

这也是默认的队列提供程序,因为它可以立即使用,无需配置。

$kernel['krak.job.queue_provider'] = 'sync';

配置选项

队列

queue表示与队列提供程序一起使用的队列名称。

队列提供程序

这定义了队列级别的队列提供程序。你不必为每个队列使用单一的队列提供程序,你可能有一些队列在一个队列提供程序上,其他队列在另一个队列提供程序上。一个很好的用例是将管理类型的作业拆分到Doctrine队列中,然后将前端注册作业拆分到更健壮/更快的队列,如sqs。

sleep

进程运行的时间越长,它将消耗的资源就越少。队列提供程序将在每sleep秒数被拉取一次。如果你计划在给定的队列上只处理少量作业,那么你可以将其设置为更高的值。相反,如果队列将进行高吞吐量处理,你需要将其设置为较小的值,如1或2秒。

ttl

这是调度器在停止前应运行的时间(以秒为单位)。结合使用respawn选项,这可以用来刷新应用程序状态。例如,在开发过程中,您可能想设置一个短的超时时间并重新启动,这样就不必不断重新启动调度器来测试更改。

respawn

respawn是一个布尔值,用于确定父调度器在子队列调度器被杀死后是否重新启动它。

max_jobs

这是同时运行的最多工作进程数。如果您的作业完成时间较长或队列将消耗非常高的作业数量,那么将此值设置为大于1可以大大加快作业的整体处理速度,因为它们将并行完成。

请注意,每个进程都会消耗内存并有一定开销,因此应考虑到这一点进行调节。

max_retry

在放弃失败的作业之前,最大重试次数

batch_size

这是给定批次中要处理的最多作业数。每个创建的进程都会处理一个作业批次。批次大小越高,有助于降低系统的内存占用,因为当批次大小较大时,创建的进程较少。

这对于执行时间相对较短的作业(少于5秒)非常有效;然而,如果作业执行时间较长,则最好增加max_jobs并降低此值到约1。

食谱

异步调度

要同时执行多个队列,请更新内核配置如下

$kernel->config([
    'name' => 'Master Scheduler',
    'sleep' => 10,
    'schedulers' => [
        [
            'queue' => 'emails',
            'max_jobs' => 20,
            'respawn' => true, // will be respawned after exiting
            'ttl' => 50,
        ],
        [
            'queue' => 'orders',
            'max_retry' => 3,
        ]
    ]
]);

这将创建一个主调度器,然后管理两个调度器,分别管理不同的队列。这将启动两个单独的进程来管理每个队列,因此每个队列的处理将完全异步。

额外日志记录

您可以通过将Psr\Log\LoggerInterface服务定义为内核容器中的服务来启用日志记录。

$kernel[Psr\Log\LoggerInterface::class] = function() {
    return MyPsrLogger();
};

任何调度器日志也将记录到定义的记录器中。