流和套接字的 高级抽象

2.7.0 2024-03-09 16:55 UTC

This package is auto-updated.

Last update: 2024-09-09 18:03:27 UTC


README

Build Status codecov Type Coverage

innmind/stream之上提供高级抽象,以便以更函数式的方式处理流。

安装

composer require innmind/io

用法

注意

以下示例使用innmind/operating-system

按块读取流

use Innmind\IO\IO;
use Innmind\OperatingSystem\Factory;
use Innmind\TimeContinuum\Earth\ElapsedPeriod;
use Innmind\Stream\Streams;
use Innmind\Immutable\Str;

$os = Factory::build();
$streams = Streams::fromAmbienAuthority();
$io = IO::of($os->sockets()->watch(...));
$chunks = $io
    ->readable()
    ->wrap(
        $streams
            ->readable()
            ->acquire(\fopen('/some/file.ext', 'r')),
    )
    ->toEncoding(Str\Encoding::ascii)
    // or call ->watch() to wait forever for the stream to be ready before
    // reading from it
    ->timeoutAfter(ElapsedPeriod::of(1_000))
    ->chunks(8192) // max length of each chunk
    ->lazy()
    ->sequence();

$chunks变量是一个包含Innmind\Innmutable\Sequence的变量,其中包含Innmind\Immutable\Str值,每个值的最大长度为8192字节。在产生值之前,它会确保在从流中读取之前数据可用。如果在一秒钟内没有数据可用,则Sequence将抛出一个异常,表示它不能从流中读取,如果你不希望它抛出异常,请用watch()替换timeoutAfter(),这样它将等待所需的时间。

按行读取流

use Innmind\IO\IO;
use Innmind\OperatingSystem\Factory;
use Innmind\TimeContinuum\Earth\ElapsedPeriod;
use Innmind\Stream\Streams;
use Innmind\Immutable\Str;

$os = Factory::build();
$streams = Streams::fromAmbienAuthority();
$io = IO::of($os->sockets()->watch(...));
$lines = $io
    ->readable()
    ->wrap(
        $streams
            ->readable()
            ->acquire(\fopen('/some/file.ext', 'r')),
    )
    ->toEncoding(Str\Encoding::ascii)
    // or call ->watch() to wait forever for the stream to be ready before
    // reading from it
    ->timeoutAfter(ElapsedPeriod::of(1_000))
    ->lines()
    ->lazy()
    ->sequence();

这与按块读取(如上所述)相同,只不过分隔符是行结束字符\n

带有周期性心跳的套接字读取

use Innmind\IO\{
    IO,
    Readable\Frame,
};
use Innmind\OperatingSystem\Factory;
use Innmind\TimeContinuum\Earth\ElapsedPeriod;
use Innmind\Socket\{
    Address,
    Client,
};
use Innmind\Stream\Streams;
use Innmind\Immutable\{
    Str,
    Sequence,
};

$socket = Client\Unix::of(Address\Unix::of('/tmp/foo'))->match(
    static fn($socket) => $socket,
    static fn() => throw new \RuntimeException;
);
$os = Factory::build();
$io = IO::of($os->sockets()->watch(...));
$frame = $io
    ->sockets()
    ->clients()
    ->wrap($socket)
    ->toEncoding(Str\Encoding::ascii)
    ->timeoutAfter(ElapsedPeriod::of(1_000))
    ->heartbeatWith(static fn() => Sequence::of(Str::of('heartbeat')))
    ->frames(Frame\Line::new())
    ->one()
    ->match(
        static fn($line) => $line,
        static fn() => throw new \RuntimeException,
    );

此示例将等待从套接字/tmp/foo.sock读取单个值,并且每秒发送一个heartbeat消息,直到接收到预期的行。

从流中读取

use Innmind\IO\IO;
use Innmind\OperatingSystem\Factory;
use Innmind\TimeContinuum\Earth\ElapsedPeriod;
use Innmind\Stream\Streams;
use Innmind\Socket\Address\Unix;
use Innmind\Immutable\{
    Str,
    Fold,
    Either,
};

$os = Factory::build();
$streams = Streams::fromAmbienAuthority();
$io = IO::of($os->sockets()->watch(...));
$io
    ->readable()
    ->wrap(
        $os
            ->sockets()
            ->connectTo(Unix::of('/some/socket')),
    )
    ->toEncoding('ASCII')
    // or call ->watch() to wait forever for the stream to be ready before
    // reading from it
    ->timeoutAfter(ElapsedPeriod::of(1_000))
    ->chunks(8192) // max length of each chunk
    ->fold(
        Fold::with([]),
        static function(array $chunks, Str $chunk) {
            $chunks[] = $chunk->toString();

            if ($chunk->contains('quit')) {
                return Fold::result($chunks);
            }

            if ($chunk->contains('throw')) {
                return Fold::fail('some error');
            }

            return Fold::with($chunks);
        },
    )
    ->match(
        static fn(Either $result) => $result->match(
            static fn(array $chunks) => doStuff($chunks),
            static fn(string $error) => throw new \Exception($error), // $error === 'some error'
        ),
        static fn() => throw new \RuntimeException('Failed to read from the stream or it timed out'),
    );

此示例将

  • 打开本地套接字/some/socket
  • 在每次尝试从它读取之前,监视套接字准备好1秒,然后超时
  • 读取最大长度为8192的块
  • 使用编码ASCII
  • 每次读取块时调用传递给->fold()的函数
  • 它将一直从流中读取,直到某个块包含quitthrow
  • 返回一个Maybe<Either<string, list<string>>>
    • 它为空时表示它无法从流中读取或超时
    • string是传递给Fold::fail()的值
    • list<string>是传递给Fold::result()的值

您可以认为这个fold操作是一个reduce,您可以通过返回Fold::fail()Fold::result()来控制何时停止迭代。