rockett/pipeline

即插即用型的管道实现。

3.0.0 2023-02-07 06:15 UTC

This package is auto-updated.

Last update: 2024-09-05 15:36:55 UTC


README

Rockett\Pipeline

GitHub License Packagist Version Packagist Downloads GitHub Workflow Status

建立在League的出色包之上,Rockett\Pipeline提供了一个管道模式的实现。

管道模式

管道模式允许你通过链式连接阶段轻松组合顺序操作。一个 管道 由零个、一个或多个 阶段 组成。管道可以处理一个名为 旅行者 的有效负载。当管道被处理时,旅行者将被传递到第一个阶段。从那一刻起,结果输出将从阶段传递到阶段。

在最简单的情况下,执行链可以表示为一个 foreach 循环

$output = $traveler;

foreach ($stages as $stage) {
  $output = $stage($output);
}

return $output;

实际上,这等同于

$output = $stage3($stage2($stage1($traveler)));

不可变性

管道作为不可变的阶段链实现,由 PipelineContract 接口约束。当你添加一个新的阶段时,管道将被克隆并添加新的阶段。这使得管道易于重用,并最小化了副作用。

用法

管道中的操作(阶段)可以接受满足 callable 类型提示的管道中的任何内容。因此,闭包和任何可调用的事物都将工作。

$pipeline = (new Pipeline)->pipe(static function ($traveler) {
  return $traveler * 10;
});

基于类的阶段

由于阶段接受可调用对象,因此基于类的阶段也是可能的。可以在每个阶段上实现 StageContract,确保你有正确的 __invoke 方法的签名。

use Rockett\Pipeline\Pipeline;
use Rockett\Pipeline\Contracts\StageContract;

class TimesTwoStage implements StageContract
{
  public function __invoke($traveler)
  {
    return $traveler * 2;
  }
}

class PlusOneStage implements StageContract
{
  public function __invoke($traveler)
  {
    return $traveler + 1;
  }
}

$pipeline = (new Pipeline)
  ->pipe(new TimesTwoStage)
  ->pipe(new PlusOneStage);

$pipeline->process(10); // Returns 21

如果你希望对旅行者进行类型提示并设置返回类型(当注入到阶段和从阶段返回的数据类型必须始终保持相同时,这很有用且推荐),你可以自由地创建自己的阶段合约。

可重用性

因为 PipelineContractStageContract 的扩展,所以管道可以作为阶段重用。这创建了一个高度可组合的模型来创建复杂的执行模式,同时保持认知负荷较低。

例如,如果你想组合一个处理API调用的管道,你可以创建如下内容

$processApiRequest = (new Pipeline)
  ->pipe(new ExecuteHttpRequest) // B
  ->pipe(new ParseJsonResponse); // C

$pipeline = (new Pipeline)
  ->pipe(new ConvertToPsr7Request) // A
  ->pipe($processApiRequest) // (B and C)
  ->pipe(new ConvertToDataTransferObject); // D

$pipeline->process(new DeleteArticle($postId));

在这里,我们创建了一个处理API请求(单一职责)的管道,并将其组合成一个删除新闻文章的管道。

管道构建器

由于管道本身是不可变的,因此引入了管道构建器以简化管道的分布式组合。这些构建器提前收集阶段,然后允许你在任何给定时间创建管道。

use Rockett\Pipeline\Builder\PipelineBuilder;

// Prepare the builder
$pipelineBuilder = (new PipelineBuilder)
  ->add(new LogicalStage)
  ->add(new AnotherStage)
  ->add(new FinalStage);

// Do other work …

// Build the pipeline
$pipeline = $pipelineBuilder->build();

处理器

当阶段通过管道传递时,它们将使用处理器传递,处理器负责遍历每个阶段并将它们传递到所属的管道。有三种可用的处理器

  • FingersCrossedProcessor(这是默认值)
  • InterruptibleProcessor
  • TapProcessor

不言而喻,默认处理器仅迭代和传递阶段。它不执行其他操作,且无法在不抛出异常的情况下退出管道。

提前退出管道

另一方面,InterruptibleProcessor 提供了一种机制,允许你在需要的情况下提前退出管道。这是通过在每个阶段作为继续管道的条件调用一个 callable 来实现的

use Rockett\Pipeline\Processors\InterruptibleProcessor;

$processor = new InterruptibleProcessor(
  static fn ($traveler) => $traveler->somethingIsntRight()
);

$pipeline = (new Pipeline($processor))
  ->pipe(new SafeStage)
  ->pipe(new UnsafeStage)
  ->pipe(new AnotherSafeStage);

$output = $pipeline->process($traveler);

在这个例子中,传递给处理器的可调用对象将检查是否有问题,如果有,它将返回 true,从而导致处理器退出管道并返回旅行者作为输出。

您还可以使用 continueUnless 辅助函数来实例化可中断处理器

$processor = InterruptibleProcessor::continueUnless(
  static fn ($traveler) => $traveler->somethingIsntRight()
);

如果您想反转条件,仅在可调用返回 true 时继续,则可以使用 continueWhen 助手

$processor = InterruptibleProcessor::continueWhen(
  static fn ($traveler) => $traveler->everythingIsFine()
);

在每个阶段调用操作

使用 TapProcessor,您可以在一个阶段通过管道之前和/或之后调用操作。如果您想在每个阶段之外处理常见的副作用,例如记录或广播,这将很有用。

处理器接受两个可调用对象

use Rockett\Pipeline\Processors\TapProcessor;

// Define and instantiate a $logger and a $broadcaster …

$processor = new TapProcessor(
  // $beforeEach, called before a stage is piped
  static fn ($traveler) => $logger->info('Traveller passing through pipeline:', $traveler->toArray()),

  // $afterEach, called after a stage is piped and the output captured
  static fn ($traveler) => $broadcaster->broadcast($users, 'Something happened', $traveler)
);

$pipeline = (new Pipeline($processor))
  ->pipe(new StageOne)
  ->pipe(new StageTwo)
  ->pipe(new StageThree);

$output = $pipeline->process($traveler);

这两个可调用对象都是 可选的。通过排除两者,处理器将以与默认的 FingersCrossedProcessor 完全相同的方式操作。

如果您只想传递一个回调,则可以使用辅助方法

$processor = (new TapProcessor)->beforeEach(/** callable **/); // or …
$processor = (new TapProcessor)->afterEach(/** callable **/);

您也可以将它们链接起来,作为使用构造函数的替代方法

$processor = (new TapProcessor)
  ->beforeEach(/** callable **/)
  ->afterEach(/** callable **/);

然而,建议您使用 命名参数

$processor = new TapProcessor(
  beforeEach: /** optional callable **/,
  afterEach: /** optional callable **/,
)

处理异常

在异常和其他可抛出内容方面,此包是完全透明的 - 它不会捕获异常或抑制错误。

您需要在代码中捕获这些异常,无论是在阶段内部还是在调用管道以处理有效负载时。

$pipeline = (new Pipeline)->pipe(
  static fn () => throw new LogicException
);

try {
  $pipeline->process($traveler);
} catch(LogicException $e) {
  // Handle the exception.
}

许可协议

管道采用宽松的 MIT 许可协议

贡献

欢迎贡献 - 如果您想向此包添加内容,或者发现了错误,请随意提交合并请求以供审查。