gyselroth / mongodb-php-task-scheduler
基于MongoDB的PHP异步任务调度器
Requires
- php: >=7.1
- ext-mongodb: *
- ext-pcntl: *
- ext-posix: *
- ext-sysvmsg: *
- league/event: ^2.2
- mongodb/mongodb: ^1.4.0
- psr/container: *
- psr/log: 1.*
Requires (Dev)
- friendsofphp/php-cs-fixer: *
- helmich/mongomock: dev-master#6925c67
- phpstan/phpstan: ^0.8.5
- phpunit/phpunit: ^8.5
- dev-master
- v4.x-dev
- v4.0.17
- v4.0.16
- v4.0.16-beta4
- v4.0.16-beta3
- v4.0.16-beta2
- v4.0.16-beta1
- v4.0.15
- v4.0.15-beta1
- v4.0.14
- v4.0.13
- v4.0.12
- v4.0.11
- v4.0.10
- v4.0.9
- v4.0.8
- v4.0.7
- v4.0.6
- v4.0.5
- v4.0.4
- v4.0.3
- v4.0.2
- v4.0.1
- v4.0.0
- v4.0.0-beta18
- v4.0.0-beta17
- v4.0.0-beta16
- v4.0.0-beta15
- v4.0.0-beta14
- v4.0.0-beta13
- v4.0.0-beta12
- v4.0.0-beta11
- v4.0.0-beta10
- v4.0.0-beta9
- v4.0.0-beta8
- v4.0.0-beta7
- v4.0.0-beta6
- v4.0.0-beta5
- v4.0.0-beta4
- v4.0.0-beta3
- v4.0.0-beta2
- v4.0.0-beta1
- v4.0.0-alpha7
- v4.0.0-alpha6
- v4.0.0-alpha5
- v4.0.0-alpha4
- v4.0.0-alpha3
- v4.0.0-alpha2
- v4.0.0-alpha1
- v3.x-dev
- v3.3.0
- v3.3.0-beta.1
- v3.3.0-alpha.3
- v3.3.0-alpha.2
- v3.3.0-alpha.1
- v3.2.2
- v3.2.1
- v3.2.0
- v3.1.0
- v3.0.2
- v3.0.1
- v3.0.0
- v3.0.0-beta8
- v3.0.0-beta7
- v3.0.0-beta6
- v3.0.0-beta5
- v3.0.0-beta4
- v3.0.0-beta3
- v3.0.0-beta2
- v3.0.0-beta1
- v3.0.0-alpha1
- v2.0.5
- v2.0.4
- v2.0.3
- v2.0.2
- v2.0.1
- v2.0.0
- 1.x-dev
- v1.0.3
- v1.0.2
- v1.0.1
- v1.0.0
- v0.0.4
- v0.0.3
- v0.0.2
- v0.0.1
- dev-dev
This package is not auto-updated.
Last update: 2024-09-20 09:18:47 UTC
README
使用MongoDB作为分发队列的PHP并行任务调度器。轻松执行并行任务。此库内置了对集群系统和多核CPU的支持。您可以启动多个工作节点,它们将按照“先到先得”的原则平衡可用的作业。每个节点还将启动(动态)可配置数量的子进程以使用所有可用资源。此外,您还可以在特定时间安排作业,无限间隔以及如果作业失败则重新安排作业。这为PHP带来了并行进程管理的真实世界实现。您还可以同步子任务和更多有用的功能。
功能
- 并行任务
- 集群支持
- 多核支持
- 负载均衡
- 故障转移
- 可伸缩
- 相互同步任务
- 中止正在运行的任务
- 超时作业
- 重试和间隔
- 在特定时间安排任务
- 信号管理
- 拦截事件
- 进度支持
- 自动检测孤儿作业
v4
这是当前主版本v4的文档。如果您想从v3或更早版本升级,可以查看升级指南。v3的文档可在此处找到。
目录
为什么?
PHP不是多线程语言,也不能处理(大多数)异步任务。当然有pthread和pcntl,但它们仅适用于cli模式(或应该只在那里使用)。使用此库,您可以编写可以由同一系统或任何其他系统并行执行的任务。
它如何工作(请尽量简短)?
通过任务调度器安排作业并写入中央消息队列(MongoDB)。所有队列节点将在(软)实时中接收到通知,表明有新的作业可用。队列节点将通过内部systemv消息队列将作业转发给工作管理器。工作管理器决定是否需要生成新的工作进程。最后,一个工作进程将根据“先到先得”的原则执行任务。如果无可用插槽,作业将等待在队列中,并在有空闲插槽时执行。如果作业失败,可以重新安排作业。还有很多其他功能,请继续阅读。
要求
- Posix系统(基本上是每个Linux系统)
- mongodb服务器 >= 3.6
- MongoDB复制集(也可能只是一个单独的MongoDB节点)
- PHP >= 7.1
- PHP pcntl 扩展
- PHP posix 扩展
- PHP mongodb 扩展
- PHP sysvmsg 扩展
注意:此库仅适用于*nix系统。没有Windows支持,并且可能永远不会支持。
下载
该软件包可在packagist找到。
要通过composer安装此软件包,请执行以下命令
composer require gyselroth/php-task-scheduler
变更日志
变更日志可在此处找到。
贡献
我们很高兴您想为此项目做出贡献。请遵循提供的条款。
条款
您可能在readme或其它地方遇到以下术语
安装
如果您的应用程序使用docker容器构建,您必须使用以下至少构建选项
FROM php:7.4 RUN docker-php-ext-install pcntl sysvmsg RUN pecl install mongodb && docker-php-ext-enable mongodb pcntl sysvmsg
文档
为了更好地理解此库的工作原理,我们将实现一个邮件任务。当然,您可以实现任何类型的任务。
创建作业
创建任务非常简单,您只需要实现TaskScheduler\JobInterface。在这个例子中,我们将实现一个名为MailJob的任务,它使用zend-mail发送邮件。
注意:您可以使用TaskScheduler\AbstractJob实现TaskScheduler\JobInterface所需的基本方法。您唯一需要实现的是start()方法,它执行实际的任务(发送邮件)。
class MailJob extends TaskScheduler\AbstractJob { /** * {@inheritdoc} */ public function start(): bool { $transport = new Zend\Mail\Transport\Sendmail(); $mail = Message::fromString($this->data); $this->transport->send($mail); return true; } }
初始化调度器
您需要一个MongoDB\Database的实例和一个兼容Psr\Log\LoggerInterface的日志记录器来初始化调度器。
$mongodb = new MongoDB\Client('mongodb://:27017'); $logger = new \A\Psr4\Compatible\Logger(); $scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
排队作业
现在,让我们创建一个邮件并将其部署到我们在之前初始化的任务调度器中
$mail = new Message(); $mail->setSubject('Hello...'); $mail->setBody('World'); $mail->setFrom('root@localhost', 'root'); $scheduler->addJob(MailJob::class, $mail->toString());
这就是整个魔法,我们的调度器现在有了第一个任务,太棒了!
执行作业
但现在是时候执行那些队列中的任务了。
这时候队列节点就派上用场了。这些节点在(软)实时中监听新任务,并将这些任务进行负载均衡。
创建工作工厂
您需要在您的应用程序命名空间中创建自己的工作节点工厂,该工厂在创建新子进程时被调用。这个工厂在新的分支被创建时被调用。这意味着如果它被调用,您就在一个新进程中,您需要从头开始引导您的应用程序(或者只是您需要的工作器)。
注意:工作管理器和工作者本身都是从队列节点进程中独立的分支中创建的。
Queue node (TaskScheduler\Queue)
|
|-- Worker Manager (TaskScheduler\WorkerManager)
|
|-- Worker (TaskScheduler\Worker)
|-- Worker (TaskScheduler\Worker)
|-- Worker (TaskScheduler\Worker)
|-- ...
对于工作管理器和工作者,一个新的分支意味着您需要从头开始引导类。
注意:理论上,您可以通过设置通过构造函数中的工厂来重用现有的连接、对象等,因为工厂在main()中初始化。但这可能会引发错误和奇怪的程序行为,并且不受支持。
为了更好地理解:如果您有一个配置文件,其中存储了您的配置,例如MongoDB uri,在工厂中,您需要再次解析此配置并创建一个新的mongodb实例。或者您可能正在使用PSR-11容器,容器需要在工厂中从头创建(一个新的依赖树)。您可以将dic(与Psr\Container\ContainerInterface兼容)的实例作为第五个参数传递给TaskScheduler\Worker(或作为第三个参数的高级工作者管理器选项传递给TaskScheduler\WorkerManager(《高级工作者管理器选项》)。有关更多信息,请参阅《使用DIC》》。
class WorkerFactory extends TaskScheduler\WorkerFactoryInterface { /** * {@inheritdoc} */ public function buildWorker(MongoDB\BSON\ObjectId $id): TaskScheduler\Worker { $mongodb = new MongoDB\Client('mongodb://:27017'); $logger = new \A\Psr4\Compatible\Logger(); $scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); return new TaskScheduler\Worker($id, $scheduler, $mongodb->mydb, $logger); } /** * {@inheritdoc} */ public function buildManager(): TaskScheduler\WorkerManager { $logger = new \A\Psr4\Compatible\Logger(); return new TaskScheduler\WorkerManager($this, $logger); } }
创建队列节点
让我们编写一个新的队列节点。队列节点必须作为一个独立的过程启动!您应该提供一个简单的方法来启动这样的队列节点,有几种方法可以实现这一点。最简单的方法是创建一个单一的php脚本,可以通过cli启动。
$mongodb = new MongoDB\Client('mongodb://:27017'); $logger = new \A\Psr4\Compatible\Logger(); $scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $worker_factory = My\App\WorkerFactory(); #An instance of our previously created worker factory $queue = new TaskScheduler\Queue($scheduler, $mongodb, $worker_factory, $logger);
然后开始魔法
$queue->process();
注意:TaskScheduler\Queue::process()是一个阻塞调用。
邮件一旦队列节点启动并运行一些工作进程,就会立即发送。
通常您希望这些节点一直运行!它们就像在您的应用程序背后的无形执行节点。
管理作业
获取作业
您可能想检索所有计划中的作业
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $scheduler->getJobs();
默认情况下,您将收到所有状态为
- 等待
- 正在处理
- 已推迟
您可以通过可选查询来查询特定作业。
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $jobs = $scheduler->getJobs([ 'status' => TaskScheduler\JobInterface::STATUS_DONE, '$or' => [ ['class' => 'MyApp\\MyTask1'], ['class' => 'MyApp\\MyTask2'], ] ]); foreach($jobs as $job) { echo $job->getId()." done\n"; }
取消作业
您可以取消队列中等待的作业以及正在运行的作业。
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $scheduler->cancelJob(MongoDB\BSON\ObjectId $job_id);
如果您取消状态为正在处理的作业,作业将被强制终止并可能损坏数据。请注意这一点。(这类似于作业以超时状态结束)。唯一的区别是,如果具有重试 > 0 或已配置间隔的超时作业,它将被重新安排。取消的作业不会重新安排。您需要手动创建一个新作业。
修改作业
根据设计,无法修改计划中的作业。您需要取消作业并附加一个新的。
注意:这可能在v4中发生变化,该版本将具有作业的持久性。
清空队列
虽然无法修改/删除作业,但可以清空整个队列。
注意:这不是建议定期调用的。可能存在需要由于升级而清空所有作业的情况。运行中的队列节点会检测到这一点,并监听新产生的作业。
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $scheduler->flush();
处理失败作业
如果作业抛出任何类型的异常,作业将被视为失败。如果我们再次查看我们的邮件作业,但这次它会抛出异常
class MailJob extends TaskScheduler\AbstractJob { /** * {@inheritdoc} */ public function start(): bool { $transport = new Zend\Mail\Transport\Sendmail(); $mail = Message::fromString($this->data); $this->transport->send($mail); throw new \Exception('i am an exception'); return true; } }
这将导致作业一旦执行就失败。
注意:无论您返回
true还是false,只有未捕获的异常才会导致作业失败,但是您应该始终返回true。
调度器对失败的作业有内置的处理。您可以指定在失败时自动重新安排作业。以下将最多重新安排作业5次(如果以状态失败结束)以30秒为间隔。
$scheduler->addJob(MailJob::class, $mail->toString(), [ TaskScheduler\Scheduler::OPTION_RETRY => 5, TaskScheduler\Scheduler::OPTION_RETRY_INTERVAL => 30, ]);
这将使我们的邮件在一小时后执行,并且如果它失败,它将重新安排作业最多三次,间隔为一分钟。
存活ping和作业进度
TaskScheduler为您的工作实现提供内置的支持来更新作业的进度。默认情况下,作业从 0 (%) 开始,并以进度 100 (%) 结束。请注意,进度是浮点数。您可以在作业中增加所完成的工作进度。
重要:请注意,默认情况下,调度器在30秒后将作业视为孤儿并重新安排它。您可以在调度器初始化期间全局更改30秒,或者在任务实现中保持调用 ->updateProgress()。带有或没有进度的 updateProgress 调用类似于对调度器的存活ping,应该在您的任务中调用,如果它是一个包含循环的长运行任务。如果没有循环,您仍然应该以某种形式定期调用此方法以保持任务存活。设置进度为百分比值不是必需的,如果没有设置,则任务保持在0%,如果完成则设置为100%。
让我们看看这是如何与一个将文件从a复制到b的作业一起工作的。
class CopyFileJob extends TaskScheduler\AbstractJob { /** * {@inheritdoc} */ public function start(): bool { $source = $this->data['source']; $dest = $this->data['destination']; $size = filesize($source); $f = fopen($source, 'r'); $t = fopen($dest, 'w'); $read = 0; while($chunk = fread($f, 4096)) { $read += fwrite($t, $chunk); $this->updateProgress($read/$size*100); } } }
当前进度可以通过进程接口获取
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $p = $scheduler->getJob(MongoDB\BSON\ObjectId('5b3cbc39e26cf26c6d0ede69')); $p->getProgress();
注意 进度更新的速率限制默认为500ms。您可以通过配置
TaskScheduler::OPTION_PROGRESS_RATE_LIMIT来更改速率限制,如果不需要任何速率限制,则将其设置为0。
异步编程
看看这个例子
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $scheduler->addJob(MyTask::class, 'foobar') ->wait();
这将强制main()(您的进程)等待直到 MyTask::class 任务执行完毕。(无论是以状态完成、失败、取消、超时结束)。
这是一个更复杂的例子
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $stack = []; $stack[] = $scheduler->addJob(MyTask::class, 'foobar'); $stack[] = $scheduler->addJob(MyTask::class, 'barfoo'); $stack[] = $scheduler->addJob(OtherTask::class, 'barefoot'); $scheduler->waitFor($stack); //some other important stuff here
这将等待所有三个作业完成后再继续。
重要提示:
如果您正在以http模式(传入http请求)编程,并且您的应用程序需要部署任务,那么不等待是一个好习惯!最佳实践是返回一个HTTP 202代码。如果客户端需要知道这些作业的结果,您可以返回进程ID并发送第二个请求,然后等待并返回这些作业的状态,或者客户端可以通过持久连接或WebSockets获取其结果。
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $stack = $scheduler->getJobs([ '_id' => ['$in' => $job_ids_from_http_request] ]); $scheduler->waitFor(iterator_to_array($stack)); //do stuff
如果任何进程导致异常,您可以截获等待
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $stack = []; $stack[] = $scheduler->addJob(MyTask::class, 'foobar'); $stack[] = $scheduler->addJob(MyTask::class, 'barfoo'); $stack[] = $scheduler->addJob(OtherTask::class, 'barefoot'); try { $scheduler->waitFor($stack, Scheduler::OPTION_THROW_EXCEPTION); } catch(\Exception $e) { //error handling }
监听事件
您可以将调度器绑定并监听任何更改并执行一些操作:)
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $jobs = $scheduler->listen(function(TaskScheduler\Process $process) { echo "status of ".$process->getId().' change to '.$process->getStatus(); });
还可以过滤此类事件,此示例仅会通知发生在特定作业中的事件。
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $jobs = $scheduler->listen(function(TaskScheduler\Process $process) { echo "status of ".$process->getId().' change to '.$process->getStatus(); }, [ '_id' => new MongoDB\BSON\ObjectId('5b3cbc39e26cf26c6d0ede69') ]);
注意:listen()是一个阻塞调用,您可以在监听器回调中返回布尔值
true以退出监听器并继续main()。
绑定事件
除了简单的调度器监听器方法之外,您还可以将事件监听器绑定到您的TaskScheduler\Queue和/或TaskScheduler\Scheduler。
例如
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $stack = []; $stack[] = $scheduler->addJob(MyTask::class, 'foobar'); $stack[] = $scheduler->addJob(MyTask::class, 'barfoo'); $stack[] = $scheduler->addJob(OtherTask::class, 'barefoot'); $scheduler->on('waiting', function(League\Event\Event $e, TaskScheduler\Process $p) { echo 'job '.$p->getId().' is waiting'; })->on('done', function(League\Event\Event $e, TaskScheduler\Process $p) { echo 'job '.$p->getId().' is finished'; })->on('*', function(League\Event\Event $e, TaskScheduler\Process $p) { echo 'job '.$p->getId().' is '.$p->getStats(); }); $scheduler->waitFor($stack);
注意:您需要在调用
Scheduler::waitFor()之前绑定监听器,因为这是一个同步阻塞调用。
您可以在队列节点中绑定到相同的事件
$queue = new TaskScheduler\Queue($scheduler, $mongodb, $worker_factory, $logger); $queue->on('timeout', function(League\Event\Event $e, TaskScheduler\Process $p) { echo 'job '.$p->getId().' is timed out'; })->on('*', function(League\Event\Event $e, TaskScheduler\Process $p) { echo 'job '.$p->getId().' is '.$p->getStats(); }); $queue->process();
注意:您需要在调用
Queue::process()之前绑定监听器,因为这是一个同步阻塞调用。
事件
您可以绑定以下事件
自定义事件发射器
在底层,TaskScheduler\Queue和TaskScheduler\Scheduler都使用League\Event\Emitter作为事件发射器。您可以使用自己的Leage事件发射器实例创建这两个实例。
$emitter = new League\Event\Emitter(); //Queue $queue = new TaskScheduler\Queue($scheduler, $mongodb, $worker_factory, $logger, $emitter); //Scheduler $scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger, [], $emitter);
高级作业选项
TaskScheduler\Scheduler::addJob()/TaskScheduler\Scheduler::addJobOnce()还接受第三个选项(选项),允许您为作业设置更多高级选项。
注意:请小心处理超时,因为它会强制终止您的运行作业。您已经被警告了。如果您支持,请在函数中始终使用本机超时。
让我们再次添加我们的邮件作业示例,并使用一些自定义选项
注意:我们在这里使用OPTION_常量,您也可以仅使用上面文档化的名称。
$mongodb = new MongoDB\Client('mongodb://:27017'); $logger = new \A\Psr4\Compatible\Logger(); $scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $mail = new Message(); $mail->setSubject('Hello...'); $mail->setBody('World'); $mail->setFrom('root@localhost', 'root'); $scheduler->addJob(MailJob::class, $mail->toString(), [ TaskScheduler\Scheduler::OPTION_AT => time()+3600, TaskScheduler\Scheduler::OPTION_RETRY => 3, TaskScheduler\Scheduler::OPTION_RETRY_INTERVAL => 60, ]);
这将使我们的邮件在一小时后执行,并且如果它失败,它将重新安排作业最多三次,间隔为一分钟。
如果不存在则添加作业
您还可以做的另一件事是在作业尚未排队的情况下添加作业。与其使用addJob(),您可以使用addJobOnce(),调度器会检查是否已排队相同的作业。如果没有,则添加作业。调度器比较作业类型(本例中的MailJob)和提交的数据(本例中的$mail->toString())。
注意:如果选项发生变化,作业将被重新安排。
$scheduler->addJobOnce(MailJob::class, $mail->toString(), [ TaskScheduler\Scheduler::OPTION_AT => time()+3600, TaskScheduler\Scheduler::OPTION_RETRY => 3, ]);
默认情况下,TaskScheduler\Scheduler::addJobOnce()会比较作业类、提交的数据和处理状态(无论是PROCESSING、WAITING还是POSTPONED)。如果您不希望检查数据,您可以将TaskScheduler\Scheduler::OPTION_IGNORE_DATA设置为true。这将告诉调度器仅在数据更改时重新安排给定类的作业。这对于给定类的作业必须只排队一次非常有用。
注意:在此示例中,此选项没有意义。邮件可以有不同的内容。但可能发生的情况是您有清除临时存储的作业,每24小时执行一次。
$scheduler->addJobOnce(MyApp\CleanTemp::class, ['max_age' => 3600], [ TaskScheduler\Scheduler::OPTION_IGNORE_DATA => true, TaskScheduler\Scheduler::OPTION_INTERVAL => 60*60*24, ]);
如果max_age发生变化,旧作业将被取消并排队新作业。如果没有设置TaskScheduler\Scheduler::OPTION_IGNORE_DATA,我们将有两个类型为MyApp\CleanTemp::class的作业。
$scheduler->addJobOnce(MyApp\CleanTemp::class, ['max_age' => 1800], [ TaskScheduler\Scheduler::OPTION_IGNORE_DATA => true, TaskScheduler\Scheduler::OPTION_INTERVAL => 60*60*24, ]);
当然,您也可以手动查询此类作业,取消它们并重新安排。这将与上述操作相同
$jobs = $scheduler->getJobs([ 'class' => MyApp\CleanTemp::class, 'status' => ['$lte' => TaskScheduler\JobInterface::STATUS_PROCESSING] ]); foreach($jobs as $job) { $scheduler->cancelJob($job->getId()); } $scheduler->addJob(MyApp\CleanTemp::class, ['max_age' => 1800], [ TaskScheduler\Scheduler::OPTION_INTERVAL => 60*60*24, ]);
高级调度选项
您可以设置这些作业选项作为整个调度器的全局默认值。可以在初始化时或在调用 Scheduler::setOptions() 时设置自定义选项和默认值。
$mongodb = new MongoDB\Client('mongodb://:27017'); $logger = new \A\Psr4\Compatible\Logger(); $scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger, null, [ TaskScheduler\Scheduler::OPTION_JOB_QUEUE_SIZE => 1000000000, TaskScheduler\Scheduler::OPTION_EVENT_QUEUE_SIZE => 5000000000, TaskScheduler\Scheduler::OPTION_DEFAULT_RETRY => 3 ]);
之后也可以更改这些选项
$scheduler->setOptions([ TaskScheduler\Scheduler::OPTION_DEFAULT_RETRY => 2 ]);
注意:更改默认作业选项不会影响任何现有作业。
注意:选择一个适合您设置的队列大小(job_queue_size 和 event_queue_size)非常重要。
高级工作管理器选项
虽然您已经知道需要工作工厂来生成工作管理器,但您可以为其指定更多高级选项!这里是我们的工作工厂,但这次我们指定了更多选项
class WorkerFactory extends TaskScheduler\WorkerFactoryInterface { /** * {@inheritdoc} */ public function buildWorker(MongoDB\BSON\ObjectId $id): TaskScheduler\Worker { $mongodb = new MongoDB\Client('mongodb://:27017'); $logger = new \A\Psr4\Compatible\Logger(); $scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); return new TaskScheduler\Worker($id, $scheduler, $mongodb->mydb, $logger); } /** * {@inheritdoc} */ public function buildManager(): TaskScheduler\WorkerManager { $logger = new \A\Psr4\Compatible\Logger(); return new TaskScheduler\WorkerManager($this, $logger, [ TaskScheduler\WorkerManager::OPTION_MIN_CHILDREN => 10, TaskScheduler\WorkerManager::OPTION_PM => 'static' ]); } }
通过指定选项 pm 来处理工作,而动态生成工作是其默认模式。
进程管理模式(pm)
- 动态(启动 min_children 进程,在启动时创建,如果需要则动态创建新子进程,直到达到 max_children)
- 静态(启动 min_children 节点,(max_children 被忽略))
- 按需(在启动时不启动任何子进程(min_children 被忽略),为每个作业启动一个工作进程,但不超过 max_children。作业完成后(或失败、取消、超时),工作进程将死亡。)
默认为 dynamic。通常 dynamic 是合理的。在容器提供的世界中,您可能需要 static,其中队列节点数由待处理的作业数确定。例如,您可能使用 Kubernetes 自动缩放。
注意:如果使用 Scheduler::OPTION_FORCE_SPAWN 选项进行作业调度,实际子进程数可能会更高。
使用PSR-11 DIC
可选地,可以将 Psr\Container\ContainerInterface 传递给工作节点,然后调用它们来创建作业实例。您可能已经明白了,但这里又是我们的工作工厂。这次它将 PSR-11 容器实例传递给工作节点。如果您已经使用容器,那么从它那里请求管理器是非常有意义的。(当然,如果您容器的实现支持在运行时(工作 id)传递参数,您也可以从它那里请求工作实例。注意:这将与 PSR-11 规范 不兼容。)
class WorkerFactory extends TaskScheduler\WorkerFactoryInterface { /** * {@inheritdoc} */ public function build(MongoDB\BSON\ObjectId $id): TaskScheduler\Worker { $mongodb = new MongoDB\Client('mongodb://:27017'); $logger = new \A\Psr4\Compatible\Logger(); $scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger); $dic = new \A\Psr11\Compatible\Dic(); return new TaskScheduler\Worker($id, $scheduler, $mongodb->mydb, $logger, $dic); } /** * {@inheritdoc} */ public function buildManager(): TaskScheduler\WorkerManager { $dic = new \A\Psr11\Compatible\Dic(); return $dic->get(TaskScheduler\WorkerManager::class); } }
信号处理
当然,您可以终止队列节点。它们甚至可以重新安排正在运行的工作。您只需向进程发送 SIGTERM。然后队列节点会将此信息传递给工作管理器,工作管理器会将此信息发送给所有正在运行的工作进程,它们将保存其状态并优雅地退出。如果工作进程直接收到 SIGTERM,工作进程也会保存其状态。如果使用 SIGKILL 终止队列节点(或工作进程),则无法保存状态,可能会出现僵尸作业(状态为 PROCESSING 但没有实际处理这些作业的工作进程)。没有好的系统管理员会使用 SIGKILL 终止正在运行的工作,这是不可接受的,只能在您了解自己在做什么的情况下使用。
您还应该避免在作业中使用永不结束的阻塞函数,因为这样做时,php 无法处理信号。
真实世界示例
在此处添加您的项目,非常欢迎提交 PR。