innmind / mantle
Requires
- php: ~8.2
- innmind/filesystem: ~7.3
- innmind/immutable: ~5.2
- innmind/operating-system: ~4.1|~5.0
Requires (Dev)
- innmind/black-box: ~5.5
- innmind/coding-standard: ~2.0
- vimeo/psalm: ~5.12
README
在 Fiber
上构建的抽象,用于异步协调多个任务。
目标是轻松地将使用 innmind/operating-system
构建的任何代码的执行从同步上下文移动到异步上下文。这意味着更容易异步运行一段代码,如果实验不成功,则更容易回退。这也意味着您可以同步测试异步系统的每个部分。
安装
composer require innmind/mantle
用法
use Innmind\Mantle\{ Forerunner, Task, Source\Continuation, }; use Innmind\OperatingSystem\{ Factory, OperatingSystem, }; use Innmind\Filesystem\Name; use Innmind\HttpTransport\Success; use Innmind\Http\{ Request, Method, ProtocolVersion, }; use Innmind\Url\{ Url, Path, }; use Innmind\Immutable\Sequence; $run = Forerunner::of(Factory::build()); [$users] = $run( [0, 0, false], static function(array $carry, OperatingSystem $os, Continuation $continuation, Sequence $results): Continuation { [$users, $finished, $launched] = $carry; if (!$launched) { return $continuation ->carryWith([$users, $finished, true]) ->launch(Sequence::of( Task::of( static fn(OperatingSystem $os): int => $os ->remote() ->http()(Request::of( Url::of('http://some-service.tld/users/count'), Method::get, ProtocolVersion::v11, )) ->map(static fn(Success $success): string => $success->response()->body()->toString()) ->match( static fn(string $response): int => (int) $response, static fn() => throw new \RuntimeException('Failed to count the users'), ), ), Task::of( static fn(OperatingSystem $os): int => $os ->filesystem() ->mount(Path::of('some/directory/')) ->get(Name::of('users.csv')) ->map(static fn($file) => $file->content()->lines()) ->match( static fn(Sequence $lines) => $lines->reduce( 0, static fn(int $total): int => $total + 1, ), static fn() => throw new \RuntimeException('Users file not found'), ), ), )); } $finished += $results->size(); $users = $results->reduce( $users, static fn(int $total, int $result): int => $total + $result, ); $continuation = $continuation->carryWith([$users, $finished, $launched]); if ($finished === 2) { $continuation = $continuation->terminate(); } return $continuation; }, );
此示例计算来自 2 个来源的 $users
数量。
Forerunner
对象的行为类似于 reduce 操作,因此它有两个参数:一个携带的值和一个减少器(在本包中称为源)。
这里的携带值是一个数组,包含获取的用户数量、完成任务的次数以及是否已启动任务。
如果尚未完成,源将启动 2 个任务;第一个任务进行 HTTP 调用,第二个任务计算文件中的行数。一旦任务完成,源将再次被调用,并将结果放入第四个参数 $results
中,它将完成任务的次数和用户数添加到携带值数组中。如果两个任务都已完成,则源调用 $continuation->terminate()
以指示循环停止。
当源调用 ->terminate()
且所有任务都已完成时,$run()
返回携带的值。这里,它将两个任务结果的聚合分配给值 $users
。
注意 只要您使用作为参数传递的
$os
抽象,系统就会在需要时自动挂起您的代码。这意味着您甚至不需要考虑它。
注意 源
callable
也异步运行。这意味着您可以使用它构建一个套接字服务器,无限期地等待新连接,而不会影响已启动任务的执行。
警告 不要在任务或源外部返回
$os
变量,因为它可能会破坏您的代码。
注意 由于此包仅通过传递参数(无全局状态)设计,这意味着您可以使用
Forerunner
的使用进行组合,这意味着您可以在任务内部运行Forerunner
的新实例,并且它将透明地执行。(尽管此功能尚未经过测试!)
限制
信号
通常通过 $os->process()->signals()
处理的信号,如 SIGINT
、SIGTERM
等,尚不支持。这可能导致不期望的行为。
HTTP 调用
目前,HTTP 调用通过 curl
完成,但不能与其他流集成在同一循环中。为了在执行 HTTP 调用时协调多个任务,系统使用 10ms
的超时,并在该最大速率之间切换任务。
为了解决这个问题,需要创建一个完全基于 PHP 流的新实现。
同时,如果您的目标是进行多个并发 HTTP 调用,则不需要此包。 innmind/http-transport
已经支持并发调用(没有上面提到的限制)。
SQL 查询
通过$os->remote()->sql()
执行的SQL查询仍然是以同步方式执行的。
为了解决这个问题,需要创建一个完全基于 PHP 流的新实现。
任务数量
似乎这个包的当前实现有一个大约10K个并发任务的限制,在此之后它会急剧减慢。