krak / job
简单而强大的队列任务
Requires
- krak/auto-args: ^0.3.0
- krak/cargo: ^0.2.0
- krak/mw: ^0.5.0
- nikic/iter: ^1.4
- psr/log: ^1.0
- psr/simple-cache: ^1.0
- symfony/filesystem: ^3.2
- symfony/process: ^2.8|^3.0
Requires (Dev)
- aws/aws-sdk-php: ^3.24
- doctrine/dbal: ^2.5
- krak/php-inc: ^0.1.3
- mockery/mockery: ^1.0-alpha
- monolog/monolog: ^1.22
- peridot-php/peridot: ^1.18
- pimple/pimple: ^3.0
- predis/predis: ^1.1
- symfony/cache: ^3.3
- symfony/console: ^3.2
- symfony/var-dumper: ^3.2
Suggests
- aws/aws-sdk-php: Enables SQS queues
- doctrine/dbal: Enables Doctrine queues
- predis/predis: Enables redis queues
README
简单而强大的队列任务实现。
特性
- 异步消费多个队列
- 易于设置
- 易于与任何项目集成
- 可扩展性极强
- 强大的扩展
安装
使用 composer 安装 krak/job
。
用法
创建内核
内核是 Job 库的核心管理器。它只是一个 Cargo\Container 装饰器,具有辅助方法。
<?php $kernel = new Krak\Job\Kernel();
如果您想使用特殊的容器,也可以传递一个可选的 Container 实例。
配置内核
您可以通过包装容器中定义的任何服务来配置内核。除了 Cargo\Container 提供的配置之外,内核还提供了辅助方法以简化自定义。
// configure the scheduling loop $kernel->config([ 'queue' => 'jobs', // name of the queue 'sleep' => 10, // duration in seconds the scheduler will sleep for after every iteration 'ttl' => 50, // max duration of the scheduler before dying 'max_jobs' => 10, // max number of processes to launch at once 'max_retry' => 3, // max number of retries before just giving up on a failed job 'batch_size' => 5, // max number of jobs to process in a given batch ]); // configure the queue provider $kernel['Predis\ClientInterface'] = function() { return new Predis\Client(); }; $kernel['krak.job.queue_provider'] = 'redis'; // configure the consumer stack $kernel->wrap('krak.job.pipeline.consumer', function($consumer) { return $consumer->push(myConsumer()); }); // and so on...
定义任务
每个任务必须实现空接口 Krak\Job\Job
并有一个 handle
方法。当任务被消费以进行处理时,将执行 handle
方法。
<?php namespace Acme\Jobs; use Krak\Job\Job; use Acme\ServiceA; class ProcessJob implements Job { private $id; public function __construct($id) { $this->id = $id; } public function handle(ServiceA $service) { process($this->id); } }
参数将自动通过 AutoArgs 包连接到 handle 方法。任务实例将被序列化,因此请确保任务的属性是可序列化的。将数据量保持在任务中最小也是一个好主意。
如果您想自定义包装的任务,也可以实现 Krak\Job\PipeWrappedJob
接口。
<?php use Krak\Job; class ProcessJob implements Job\Job, Job\PipeWrappedJob { public function handle() {} public function pipe(Job\WrappedJob $wrapped) { return $wrapped->withName('my_custom_job_name') ->withQueue('my_custom_queue') ->withDelay(3600) ->withAddedPayload([ 'custom_data' => 1, ]); } }
当任务分发和产生时,将调用管道函数并配置包装的任务实例。
作为最终便利,我们提供了 Krak\Job\AbstractJob
,您可以扩展它,它已经实现了 Krak\Job\Job
和 Krak\Job\PipeWrappedJob
,并提供了默认实现。
分发任务
使用 Krak\Job\Dispatch
可以轻松分发任务。
<?php use Krak\Job; // use the kernel to create a dispatch instance $dispatch = $kernel['dispatch']; // or $kernel[Job\Dispatch::class]; $dispatch->wrap(new Acme\Jobs\ProcessJob(1)) ->onQueue('process') // this is optional ->withName('process') ->delay(3600) // will delay the sending of this job for 1 hour (not all queues support delay) ->dispatch();
消费任务
为了开始消费任务,您需要做一些事情
-
将命令注册到您的 Symfony Console 应用程序中
<?php // in bin/console $app = new Symfony\Component\Console\Application(); Krak\Job\registerConsole($app, $kernel);
此时,我们已经注册了所有任务命令并将 JobHelper 添加到辅助集中。
-
启动消费者
./bin/console job:consume -vvv
您可以更改详细程度以适应您的需求
重启和停止任务
有时您想要重启正在运行消费者甚至重启系统。为了启用此功能,您需要将 Psr\SimpleCache
集成到内核中。
启用缓存
<?php $kernel['Psr\SimpleCache\CacheInterface'] = function($c) { // return any CacheInterface return new Symfony\Component\Cache\Simple\RedisCache($c['Predis\ClientInterface']); };
一旦启用缓存,您将可以访问以下命令: job:stop
、job:restart
、job:status
和 job:reset
。
概念
Job 库分为几个部分:内核、调度循环、队列、分发、控制台、管道、任务
内核
内核实现了 Krak\Job\Kernel
接口,负责配置和管理一切。
调度器
调度器负责安排任务的运行。实际上,Krak\Job\Scheduler
类不包含太多逻辑,它只是启动一个无限循环并将控制权传递给 调度循环。
调度循环是一个处理程序,它接受一组参数,如记录器和配置,并执行操作。实现可以是任何东西。我们有两种主要的调度循环类型:队列循环和调度循环。
队列循环管理任务的调度。它们从队列中向工作者分发任务,并回收完成的任务。它们还管理任务失败。
调度循环负责管理其他调度器。这允许递归调度和异步处理不同的队列。因为你可以有一个调度器,它在其自己的进程中管理两个不同的队列调度器。
工作进程
工作进程是一个简单的类,它接受一个包装作业并对其运行消费者。
消费者
消费者是一个处理程序,它接受一个WrappedJob
实例并返回一个结果。我们使用Krak\Mw库将一组消费者中间件转换为一个单一的消费者,以便能够通过消费者进行完全定制。
生产者
生产者是消费者的对立面。生产者被设计为生成一个作业并将其推送到队列中。与消费者一样,生产者通过一组生产者中间件实现。
分发
分发是一个非常简单的类/接口,它设计用来将作业包装到WrappedJob
中,然后将其分发到生产者。
队列
排队模块处理实际的排队实现。有两个主要组件:队列管理器和队列。
支持的队列
- Doctrine
- Redis
- Sqs
- Stub
- Sync
Doctrine
Doctrine需要安装doctrine/dbal
库。
$kernel['Doctrine\DBAL\Connection'] = function() { return Doctrine\DBAL\DriverManager::getConnection([ /* connection params */ ]); }; $kernel['krak.job.queue_provider'] = 'doctrine'; $kernel['krak.job.queue.doctrine.table_name'] = 'krak_jobs';
设置完成后,你需要执行数据库迁移来初始化作业表。Krak\Job\Queue\Doctrine\JobMigration
类是一个实用工具,将帮助运行迁移。
如果你已经在项目中使用Doctrine Migrations,你可以简单地使用以下方法
// in your migration class public function up(Schema $schema) { $migration = new Krak\Job\Queue\Doctrine\JobMigration('krak_jobs'); $migration->up($schema); } public function down(Schema $schema) { $migration = new Krak\Job\Queue\Doctrine\JobMigration('krak_jobs'); $migration->down($schema); }
此外,你可以简单地运行以下PHP代码来迁移你的表向上或向下
$conn = $kernel['Doctrine\DBAL\Connection']; $migration = $kernel['Krak\Job\Queue\Doctrine\JobMigration']; // up $migration->migrateUp($conn); // or down // $migration->migrateDown($conn);
Redis
Redis需要安装predis/predis
库。然后你只需通过以下方式设置队列管理器
$kernel['Predis\ClientInterface'] = function() { return new Predis\Client(); }; $kernel['krak.job.queue_provider'] = 'redis';
Sqs
Sqs需要安装aws sdk aws/aws-sdk-php
。你可以通过以下方式设置队列管理器
$kernel['Aws\Sqs\SqsClient'] = function() { return new Aws\Sqs\SqsClient(); }; $kernel['krak.job.queue.sqs.queue_url_map'] = ['queue-name' => '{queue-url}']; $kernel['krak.job.queue.sqs.receive_options'] = ['VisibilityTimeout' => 10]; $kernel['krak.job.queue_provider'] = 'sqs';
queue_url_map
是一个缓存,它将用于从队列名称中查找sqs队列的URL。此缓存是可选的,如果未设置,则将在运行时填充。
消息配置
当你包装和分发作业时,你可以使用配置来配置如何发送消息。
$dispatch->wrap(new MyJob())->with('sqs', ['AnySendMessageParamer' => 'Value'])->dispatch();
Stub
stub队列实际上是一个空操作队列提供程序。它不会对给定的任何作业进行排队或消费。
$kernel['krak.job.queue_provider'] = 'stub';
Sync
同步(同步)队列提供程序将同步地在调用线程中消费你的作业,而不是将它们分发到外部服务以在另一个进程中异步消费。这对于调试和开发目的很有用。
这也是默认的队列提供程序,因为它可以立即使用,无需配置。
$kernel['krak.job.queue_provider'] = 'sync';
配置选项
队列
queue
表示与队列提供程序一起使用的队列名称。
队列提供程序
这定义了队列级别的队列提供程序。你不必为每个队列使用单一的队列提供程序,你可能有一些队列在一个队列提供程序上,其他队列在另一个队列提供程序上。一个很好的用例是将管理类型的作业拆分到Doctrine队列中,然后将前端注册作业拆分到更健壮/更快的队列,如sqs。
sleep
进程运行的时间越长,它将消耗的资源就越少。队列提供程序将在每sleep
秒数被拉取一次。如果你计划在给定的队列上只处理少量作业,那么你可以将其设置为更高的值。相反,如果队列将进行高吞吐量处理,你需要将其设置为较小的值,如1或2秒。
ttl
这是调度器在停止前应运行的时间(以秒为单位)。结合使用respawn
选项,这可以用来刷新应用程序状态。例如,在开发过程中,您可能想设置一个短的超时时间并重新启动,这样就不必不断重新启动调度器来测试更改。
respawn
respawn是一个布尔值,用于确定父调度器在子队列调度器被杀死后是否重新启动它。
max_jobs
这是同时运行的最多工作进程数。如果您的作业完成时间较长或队列将消耗非常高的作业数量,那么将此值设置为大于1可以大大加快作业的整体处理速度,因为它们将并行完成。
请注意,每个进程都会消耗内存并有一定开销,因此应考虑到这一点进行调节。
max_retry
在放弃失败的作业之前,最大重试次数
batch_size
这是给定批次中要处理的最多作业数。每个创建的进程都会处理一个作业批次。批次大小越高,有助于降低系统的内存占用,因为当批次大小较大时,创建的进程较少。
这对于执行时间相对较短的作业(少于5秒)非常有效;然而,如果作业执行时间较长,则最好增加max_jobs并降低此值到约1。
食谱
异步调度
要同时执行多个队列,请更新内核配置如下
$kernel->config([ 'name' => 'Master Scheduler', 'sleep' => 10, 'schedulers' => [ [ 'queue' => 'emails', 'max_jobs' => 20, 'respawn' => true, // will be respawned after exiting 'ttl' => 50, ], [ 'queue' => 'orders', 'max_retry' => 3, ] ] ]);
这将创建一个主调度器,然后管理两个调度器,分别管理不同的队列。这将启动两个单独的进程来管理每个队列,因此每个队列的处理将完全异步。
额外日志记录
您可以通过将Psr\Log\LoggerInterface
服务定义为内核容器中的服务来启用日志记录。
$kernel[Psr\Log\LoggerInterface::class] = function() { return MyPsrLogger(); };
任何调度器日志也将记录到定义的记录器中。