stephanschuler/fork-job-runner

1.0.1 2022-06-21 09:37 UTC

README

创建子进程成本高昂,尤其是在需要引导框架大量部分时。

此包始终在某个引导级别保持一个单独的PHP进程,并且每当有工作要执行时都会分叉此进程。

以下所有示例代码均由项目实现,此包不提供。

此包不旨在并行处理。这可能可行,但其他库或框架(例如ReactPHP)更适合此用途。

主要目标是提供功能测试中消息队列的技术基础。特别是Flowpack Jobqueue Common包提供了一个假队列,此包旨在帮助提供替代方案。

工作

应在单独进程中运行的任何代码都称为工作。工作由父进程创建,序列化,传递给工作子进程,并在执行之前反序列化。在处理数据库连接、文件指针和其他资源时需要考虑这一点。

<?php
declare(strict_types=1);

namespace StephanSchuler\Demo;

use StephanSchuler\ForkJobRunner\Utility\WriteBack;
use StephanSchuler\ForkJobRunner\Job;

class DemoJob implements Job
{
    public function run(WriteBack $writeBack): void
    {
        // Do something
    }
}

工作是一个常规类定义。将其放在项目中,以便自动加载器可以找到它。

调度

将工作从父进程传递到隔离执行称为调度。没有路由机制将工作交给不同的工作,也不是此包执行工作的异步或并行意图。

<?php
declare(strict_types=1);

namespace StephanSchuler\Demo;

use StephanSchuler\ForkJobRunner\Dispatcher;

$dispatcher = Dispatcher::create(
    \escapeshellcmd(\PHP_BINARY) . ' ' . \escapeshellarg(\__DIR__ . '/loop.php')
);

$dispatcher->run(
    new DemoJob()
);

创建调度器应仅执行一次,因为每个调度器都保留自己的工作进程。

调度应在需要的地方进行,例如在MVC控制器的操作方法或单元测试的测试方法中。

循环

循环是等待传入工作的子进程。调度器初始化循环并传递工作

这应该是一个CLI入口点。它是调度器示例代码中提到的"loop.php"文件。

<?php
declare(strict_types=1);

use StephanSchuler\ForkJobRunner\Dispatcher;
use StephanSchuler\ForkJobRunner\Loop;

require(__DIR__ . '/vendor/autoload.php');

// Bootstrap framework here

Loop::create()
    ->run();

除非调度器的命令流打开,否则Loop::run()方法永远不会返回。

响应

工作可以响应调度过程。所有可序列化的数据都可以从工作传递到调度器

<?php
declare(strict_types=1);

use StephanSchuler\ForkJobRunner\Utility\WriteBack;
use StephanSchuler\ForkJobRunner\Dispatcher;
use StephanSchuler\ForkJobRunner\Job;
use StephanSchuler\ForkJobRunner\Response;

class DemoJob implements Job
{
    public function run(WriteBack $writer) : void
    {
        $writer->send(
            new Response\DefaultResponse('this goes to stdout')
        );
        $writer->send(
            new Response\ThrowableResponse(
                new RuntimeException('This is an exception')
            )
        );
        throw new RuntimeException('This is another exception');
    }
}

assert($dispatcher instanceof Dispatcher);

$job = new DemoJob();
$responses = $dispatcher->run($job);

foreach ($responses as $response) {
    switch (true) {
        case ($response instanceof Response\NoOpResponse):
            // No Op responses act as keep alive signal.
            // There is at least two per job, one at the beginning and one 
            // at the end.
            break;
        case ($response instanceof Response\DefaultResponse):
            // Default responses contain text.
            // They are not used internally.
            \fputs(\STDOUT, $response->get());
            break;
        case ($response instanceof Response\ThrowableResponse):
            // This catches both,
            // the explicite call of $writer->send()
            // as well as the thrown exception.
            \fputs(\STDERR, print_r($response->get(), true));
            break;
       case ($response instanceof Response\Response):
            // Feel free to implement custom responses.
            break;
    }
}