innmind/mantle

异步编排

2.1.0 2024-03-10 14:57 UTC

This package is auto-updated.

Last update: 2024-09-10 16:07:09 UTC


README

Build Status codecov Type Coverage

Fiber 上构建的抽象,用于异步协调多个任务。

目标是轻松地将使用 innmind/operating-system 构建的任何代码的执行从同步上下文移动到异步上下文。这意味着更容易异步运行一段代码,如果实验不成功,则更容易回退。这也意味着您可以同步测试异步系统的每个部分。

安装

composer require innmind/mantle

用法

use Innmind\Mantle\{
    Forerunner,
    Task,
    Source\Continuation,
};
use Innmind\OperatingSystem\{
    Factory,
    OperatingSystem,
};
use Innmind\Filesystem\Name;
use Innmind\HttpTransport\Success;
use Innmind\Http\{
    Request,
    Method,
    ProtocolVersion,
};
use Innmind\Url\{
    Url,
    Path,
};
use Innmind\Immutable\Sequence;

$run = Forerunner::of(Factory::build());
[$users] = $run(
    [0, 0, false],
    static function(array $carry, OperatingSystem $os, Continuation $continuation, Sequence $results): Continuation {
        [$users, $finished, $launched] = $carry;

        if (!$launched) {
            return $continuation
                ->carryWith([$users, $finished, true])
                ->launch(Sequence::of(
                    Task::of(
                        static fn(OperatingSystem $os): int => $os
                            ->remote()
                            ->http()(Request::of(
                                Url::of('http://some-service.tld/users/count'),
                                Method::get,
                                ProtocolVersion::v11,
                            ))
                            ->map(static fn(Success $success): string => $success->response()->body()->toString())
                            ->match(
                                static fn(string $response): int => (int) $response,
                                static fn() => throw new \RuntimeException('Failed to count the users'),
                            ),
                    ),
                    Task::of(
                        static fn(OperatingSystem $os): int => $os
                            ->filesystem()
                            ->mount(Path::of('some/directory/'))
                            ->get(Name::of('users.csv'))
                            ->map(static fn($file) => $file->content()->lines())
                            ->match(
                                static fn(Sequence $lines) => $lines->reduce(
                                    0,
                                    static fn(int $total): int => $total + 1,
                                ),
                                static fn() => throw new \RuntimeException('Users file not found'),
                            ),
                    ),
                ));
        }

        $finished += $results->size();
        $users = $results->reduce(
            $users,
            static fn(int $total, int $result): int => $total + $result,
        );
        $continuation = $continuation->carryWith([$users, $finished, $launched]);

        if ($finished === 2) {
            $continuation = $continuation->terminate();
        }

        return $continuation;
    },
);

此示例计算来自 2 个来源的 $users 数量。

Forerunner 对象的行为类似于 reduce 操作,因此它有两个参数:一个携带的值和一个减少器(在本包中称为源)。

这里的携带值是一个数组,包含获取的用户数量、完成任务的次数以及是否已启动任务。

如果尚未完成,源将启动 2 个任务;第一个任务进行 HTTP 调用,第二个任务计算文件中的行数。一旦任务完成,源将再次被调用,并将结果放入第四个参数 $results 中,它将完成任务的次数和用户数添加到携带值数组中。如果两个任务都已完成,则源调用 $continuation->terminate() 以指示循环停止。

当源调用 ->terminate() 且所有任务都已完成时,$run() 返回携带的值。这里,它将两个任务结果的聚合分配给值 $users

注意 只要您使用作为参数传递的 $os 抽象,系统就会在需要时自动挂起您的代码。这意味着您甚至不需要考虑它。

注意callable 也异步运行。这意味着您可以使用它构建一个套接字服务器,无限期地等待新连接,而不会影响已启动任务的执行。

警告 不要在任务或源外部返回 $os 变量,因为它可能会破坏您的代码。

注意 由于此包仅通过传递参数(无全局状态)设计,这意味着您可以使用 Forerunner 的使用进行组合,这意味着您可以在任务内部运行 Forerunner 的新实例,并且它将透明地执行。(尽管此功能尚未经过测试!)

限制

信号

通常通过 $os->process()->signals() 处理的信号,如 SIGINTSIGTERM 等,尚不支持。这可能导致不期望的行为。

HTTP 调用

目前,HTTP 调用通过 curl 完成,但不能与其他流集成在同一循环中。为了在执行 HTTP 调用时协调多个任务,系统使用 10ms 的超时,并在该最大速率之间切换任务。

为了解决这个问题,需要创建一个完全基于 PHP 流的新实现。

同时,如果您的目标是进行多个并发 HTTP 调用,则不需要此包。 innmind/http-transport 已经支持并发调用(没有上面提到的限制)。

SQL 查询

通过$os->remote()->sql()执行的SQL查询仍然是以同步方式执行的。

为了解决这个问题,需要创建一个完全基于 PHP 流的新实现。

任务数量

似乎这个包的当前实现有一个大约10K个并发任务的限制,在此之后它会急剧减慢。