rockett / pipeline
即插即用型的管道实现。
Requires
- php: >=8.1
Requires (Dev)
- nunomaduro/collision: ^6.4
- orchestra/testbench: ^7.21
README
Rockett\Pipeline
建立在League的出色包之上,Rockett\Pipeline
提供了一个管道模式的实现。
管道模式
管道模式允许你通过链式连接阶段轻松组合顺序操作。一个 管道 由零个、一个或多个 阶段 组成。管道可以处理一个名为 旅行者 的有效负载。当管道被处理时,旅行者将被传递到第一个阶段。从那一刻起,结果输出将从阶段传递到阶段。
在最简单的情况下,执行链可以表示为一个 foreach
循环
$output = $traveler; foreach ($stages as $stage) { $output = $stage($output); } return $output;
实际上,这等同于
$output = $stage3($stage2($stage1($traveler)));
不可变性
管道作为不可变的阶段链实现,由 PipelineContract
接口约束。当你添加一个新的阶段时,管道将被克隆并添加新的阶段。这使得管道易于重用,并最小化了副作用。
用法
管道中的操作(阶段)可以接受满足 callable
类型提示的管道中的任何内容。因此,闭包和任何可调用的事物都将工作。
$pipeline = (new Pipeline)->pipe(static function ($traveler) { return $traveler * 10; });
基于类的阶段
由于阶段接受可调用对象,因此基于类的阶段也是可能的。可以在每个阶段上实现 StageContract
,确保你有正确的 __invoke
方法的签名。
use Rockett\Pipeline\Pipeline; use Rockett\Pipeline\Contracts\StageContract; class TimesTwoStage implements StageContract { public function __invoke($traveler) { return $traveler * 2; } } class PlusOneStage implements StageContract { public function __invoke($traveler) { return $traveler + 1; } } $pipeline = (new Pipeline) ->pipe(new TimesTwoStage) ->pipe(new PlusOneStage); $pipeline->process(10); // Returns 21
如果你希望对旅行者进行类型提示并设置返回类型(当注入到阶段和从阶段返回的数据类型必须始终保持相同时,这很有用且推荐),你可以自由地创建自己的阶段合约。
可重用性
因为 PipelineContract
是 StageContract
的扩展,所以管道可以作为阶段重用。这创建了一个高度可组合的模型来创建复杂的执行模式,同时保持认知负荷较低。
例如,如果你想组合一个处理API调用的管道,你可以创建如下内容
$processApiRequest = (new Pipeline) ->pipe(new ExecuteHttpRequest) // B ->pipe(new ParseJsonResponse); // C $pipeline = (new Pipeline) ->pipe(new ConvertToPsr7Request) // A ->pipe($processApiRequest) // (B and C) ->pipe(new ConvertToDataTransferObject); // D $pipeline->process(new DeleteArticle($postId));
在这里,我们创建了一个处理API请求(单一职责)的管道,并将其组合成一个删除新闻文章的管道。
管道构建器
由于管道本身是不可变的,因此引入了管道构建器以简化管道的分布式组合。这些构建器提前收集阶段,然后允许你在任何给定时间创建管道。
use Rockett\Pipeline\Builder\PipelineBuilder; // Prepare the builder $pipelineBuilder = (new PipelineBuilder) ->add(new LogicalStage) ->add(new AnotherStage) ->add(new FinalStage); // Do other work … // Build the pipeline $pipeline = $pipelineBuilder->build();
处理器
当阶段通过管道传递时,它们将使用处理器传递,处理器负责遍历每个阶段并将它们传递到所属的管道。有三种可用的处理器
FingersCrossedProcessor
(这是默认值)InterruptibleProcessor
TapProcessor
不言而喻,默认处理器仅迭代和传递阶段。它不执行其他操作,且无法在不抛出异常的情况下退出管道。
提前退出管道
另一方面,InterruptibleProcessor
提供了一种机制,允许你在需要的情况下提前退出管道。这是通过在每个阶段作为继续管道的条件调用一个 callable
来实现的
use Rockett\Pipeline\Processors\InterruptibleProcessor; $processor = new InterruptibleProcessor( static fn ($traveler) => $traveler->somethingIsntRight() ); $pipeline = (new Pipeline($processor)) ->pipe(new SafeStage) ->pipe(new UnsafeStage) ->pipe(new AnotherSafeStage); $output = $pipeline->process($traveler);
在这个例子中,传递给处理器的可调用对象将检查是否有问题,如果有,它将返回 true
,从而导致处理器退出管道并返回旅行者作为输出。
您还可以使用 continueUnless
辅助函数来实例化可中断处理器
$processor = InterruptibleProcessor::continueUnless( static fn ($traveler) => $traveler->somethingIsntRight() );
如果您想反转条件,仅在可调用返回 true 时继续,则可以使用 continueWhen
助手
$processor = InterruptibleProcessor::continueWhen( static fn ($traveler) => $traveler->everythingIsFine() );
在每个阶段调用操作
使用 TapProcessor
,您可以在一个阶段通过管道之前和/或之后调用操作。如果您想在每个阶段之外处理常见的副作用,例如记录或广播,这将很有用。
处理器接受两个可调用对象
use Rockett\Pipeline\Processors\TapProcessor; // Define and instantiate a $logger and a $broadcaster … $processor = new TapProcessor( // $beforeEach, called before a stage is piped static fn ($traveler) => $logger->info('Traveller passing through pipeline:', $traveler->toArray()), // $afterEach, called after a stage is piped and the output captured static fn ($traveler) => $broadcaster->broadcast($users, 'Something happened', $traveler) ); $pipeline = (new Pipeline($processor)) ->pipe(new StageOne) ->pipe(new StageTwo) ->pipe(new StageThree); $output = $pipeline->process($traveler);
这两个可调用对象都是 可选的。通过排除两者,处理器将以与默认的 FingersCrossedProcessor
完全相同的方式操作。
如果您只想传递一个回调,则可以使用辅助方法
$processor = (new TapProcessor)->beforeEach(/** callable **/); // or … $processor = (new TapProcessor)->afterEach(/** callable **/);
您也可以将它们链接起来,作为使用构造函数的替代方法
$processor = (new TapProcessor) ->beforeEach(/** callable **/) ->afterEach(/** callable **/);
然而,建议您使用 命名参数
$processor = new TapProcessor( beforeEach: /** optional callable **/, afterEach: /** optional callable **/, )
处理异常
在异常和其他可抛出内容方面,此包是完全透明的 - 它不会捕获异常或抑制错误。
您需要在代码中捕获这些异常,无论是在阶段内部还是在调用管道以处理有效负载时。
$pipeline = (new Pipeline)->pipe( static fn () => throw new LogicException ); try { $pipeline->process($traveler); } catch(LogicException $e) { // Handle the exception. }
许可协议
管道采用宽松的 MIT 许可协议。
贡献
欢迎贡献 - 如果您想向此包添加内容,或者发现了错误,请随意提交合并请求以供审查。