teewurst / pipeline
递归管道构建
3.0.1
2022-05-23 17:36 UTC
Requires
- php: ^7.4|^8.0
- psr/container: ^1.0|^2.0
Requires (Dev)
- jangregor/phpstan-prophecy: ^1.0
- phpspec/prophecy-phpunit: ^2.0
- phpstan/phpstan: ^1
- phpstan/phpstan-phpunit: ^1.0
- phpunit/phpunit: ^9.5
- roave/security-advisories: dev-master
This package is auto-updated.
Last update: 2024-09-23 22:25:09 UTC
README
使用这个库,我们可以将长执行时间分割成多个任务。管道可以让你更好地满足单一责任要求。这有助于提高可维护性和可测试性。
如何使用
- 管道将有效载荷及其自身传递到任务中。
public function __invoke(PayloadInterface $payload, PipelineInterface $pipeline): PayloadInterface;
- 第一个任务调用下一个任务,调用下一个任务,以此类推...
$payloadAfterAllSteps = $pipeline->handle($payload);
- 类似于PSR-7(请求对象),你现在可以通过添加和从有效载荷对象获取信息来处理所有信息
$value = $genericPayload->getValue();
$value++;
$genericPayload->setValue($value);
逐步操作
- 创建任务
- 初始化管道
- 填充有效载荷
- 执行管道
- 评估管道
使用管道服务
您可以从DI(需要PSR-11容器服务管理器)简单地创建管道。只需两行代码(+配置)
PipelineService 类允许您以服务哈希的形式传递任务。
// .. In your factory $tasksFromEnvConfig = $config->getTasks() // somewhere in your config: [Task1::class, Task2::class, Task3::class]; $pipeline = (new PipelineService)->createPsr11($serviceManager, $tasksFromEnvConfig);
使用teewurst/Pipeline,您能够迅速创建相当复杂的任务
// Configuration of BiPRO Request (= German XML Request Standard) $tasklist = [ CheckServiceAvailabilityTask::class, [ // do Request ErrorHandlerTask::class, // catch execution of submission even on error [ PrepareDataTask::class, ValidateDataTask::class, DoGetOfferRequestTask::class ], // .. some additional things like set quote, upload documents etc. ], [ // do something additionally LogResultLocalyTask::class, LogResultInDWTask::class, ] ];
最佳实践
- 使用
RecursivePipeline创建“子管道”=> 动态任务 - 由于结构,每个任务都有一个“在处理下一个步骤之前”和“在处理下一个步骤之后”
- 通过抛出异常或返回 $payload 而不处理下一个步骤来中断管道
示例
手动初始化管道
当然,最好使用DI而不是始终使用 new
// Create Tasks in order of execution $tasks = [ new ExceptionTask(), new ErrorBagTask(), new OpenStreamTask(), new ReadValuesTask(), new ValidateValuesTask(), new RequireDocumentsTask(), new HandleImportTask(), ]; // Create Pipeline $pipeline = new \teewurst\Pipeline\Pipeline($tasks); // Create Payload $payload = new ImportPayload(); // Setup Payload $payload->setEnvironment($env); // <= here you also could use $pipeline->setOptions(...) $payload->setConfig($config); // Do the thing the pipeline is constructed for $payload = $pipeline->handle($payload); // Evaluate result $payload->getMessage();
简单的执行某些操作的任务
使用管道对象传递配置和任务间处理数据
public function __invoke(PayloadInterface $payload, PipelineInterface $pipeline): PayloadInterface { // there is something, so we need to continue with our tasks if ($payload->getValue() > 0) { return $pipeline->handle($payload); } // we do not allow negative values => Completely interrupt process if ($payload->getValue() < 0) { throw new \Exception("Negative Values are not allowed here"); } // Value is 0, let's pretend there is nothing more to do. => Interrupt from here and go back up the callstack return $payload; }
异常处理任务
在管道内处理(可能只是特定的)异常
public function __invoke(PayloadInterface $payload, PipelineInterface $pipeline): PayloadInterface { try { // handle next steps BEFORE doing something $payload = $pipeline->handle($payload); } catch (\Throwable $e) { $this->logger->logException($e, ['context' => $payload]); } return $payload; }
错误包
使用错误包处理多个错误
public function __invoke(PayloadInterface $payload, PipelineInterface $pipeline): PayloadInterface { // Do something before handle next steps $payload->setErrorBag($this->errorBag); // Pipeline will add errors to error bag $payload = $pipeline->handle($payload); // "On the way back" and after handling, we check if something is inside the error bag if ($payload->getErrorBag()->hasErrors()) { $payload->setMessage($payload->getErrorBag()->toString()); } return $payload; }