stephanschuler / fork-job-runner
1.0.1
2022-06-21 09:37 UTC
Requires
- php: ^7.2
- ext-pcntl: *
- ext-posix: *
Requires (Dev)
- phpstan/phpstan: ^0.12
- phpunit/phpunit: ^6.0
This package is auto-updated.
Last update: 2024-09-21 14:48:04 UTC
README
创建子进程成本高昂,尤其是在需要引导框架大量部分时。
此包始终在某个引导级别保持一个单独的PHP进程,并且每当有工作要执行时都会分叉此进程。
以下所有示例代码均由项目实现,此包不提供。
此包不旨在并行处理。这可能可行,但其他库或框架(例如ReactPHP)更适合此用途。
主要目标是提供功能测试中消息队列的技术基础。特别是Flowpack Jobqueue Common包提供了一个假队列,此包旨在帮助提供替代方案。
工作
应在单独进程中运行的任何代码都称为工作
。工作由父进程创建,序列化,传递给工作子进程,并在执行之前反序列化。在处理数据库连接、文件指针和其他资源时需要考虑这一点。
<?php declare(strict_types=1); namespace StephanSchuler\Demo; use StephanSchuler\ForkJobRunner\Utility\WriteBack; use StephanSchuler\ForkJobRunner\Job; class DemoJob implements Job { public function run(WriteBack $writeBack): void { // Do something } }
工作
是一个常规类定义。将其放在项目中,以便自动加载器可以找到它。
调度
将工作从父进程传递到隔离执行称为调度。没有路由机制将工作交给不同的工作,也不是此包执行工作的异步或并行意图。
<?php declare(strict_types=1); namespace StephanSchuler\Demo; use StephanSchuler\ForkJobRunner\Dispatcher; $dispatcher = Dispatcher::create( \escapeshellcmd(\PHP_BINARY) . ' ' . \escapeshellarg(\__DIR__ . '/loop.php') ); $dispatcher->run( new DemoJob() );
创建调度器应仅执行一次,因为每个调度器都保留自己的工作进程。
调度应在需要的地方进行,例如在MVC控制器的操作方法或单元测试的测试方法中。
循环
循环
是等待传入工作
的子进程。调度器初始化循环
并传递工作
。
这应该是一个CLI入口点。它是调度器
示例代码中提到的"loop.php"文件。
<?php declare(strict_types=1); use StephanSchuler\ForkJobRunner\Dispatcher; use StephanSchuler\ForkJobRunner\Loop; require(__DIR__ . '/vendor/autoload.php'); // Bootstrap framework here Loop::create() ->run();
除非调度器的命令流打开,否则Loop::run()
方法永远不会返回。
响应
工作可以响应调度过程。所有可序列化的数据都可以从工作
传递到调度器
。
<?php declare(strict_types=1); use StephanSchuler\ForkJobRunner\Utility\WriteBack; use StephanSchuler\ForkJobRunner\Dispatcher; use StephanSchuler\ForkJobRunner\Job; use StephanSchuler\ForkJobRunner\Response; class DemoJob implements Job { public function run(WriteBack $writer) : void { $writer->send( new Response\DefaultResponse('this goes to stdout') ); $writer->send( new Response\ThrowableResponse( new RuntimeException('This is an exception') ) ); throw new RuntimeException('This is another exception'); } } assert($dispatcher instanceof Dispatcher); $job = new DemoJob(); $responses = $dispatcher->run($job); foreach ($responses as $response) { switch (true) { case ($response instanceof Response\NoOpResponse): // No Op responses act as keep alive signal. // There is at least two per job, one at the beginning and one // at the end. break; case ($response instanceof Response\DefaultResponse): // Default responses contain text. // They are not used internally. \fputs(\STDOUT, $response->get()); break; case ($response instanceof Response\ThrowableResponse): // This catches both, // the explicite call of $writer->send() // as well as the thrown exception. \fputs(\STDERR, print_r($response->get(), true)); break; case ($response instanceof Response\Response): // Feel free to implement custom responses. break; } }