innmind / io
流和套接字的 高级抽象
2.7.0
2024-03-09 16:55 UTC
Requires
- php: ~8.2
- innmind/immutable: ~5.2
- innmind/socket: ~6.1
- innmind/stream: ~4.0
Requires (Dev)
- innmind/black-box: ~5.5
- innmind/coding-standard: ~2.0
- phpunit/phpunit: ~10.2
- vimeo/psalm: ~5.15
README
在innmind/stream
之上提供高级抽象,以便以更函数式的方式处理流。
安装
composer require innmind/io
用法
注意
以下示例使用innmind/operating-system
按块读取流
use Innmind\IO\IO; use Innmind\OperatingSystem\Factory; use Innmind\TimeContinuum\Earth\ElapsedPeriod; use Innmind\Stream\Streams; use Innmind\Immutable\Str; $os = Factory::build(); $streams = Streams::fromAmbienAuthority(); $io = IO::of($os->sockets()->watch(...)); $chunks = $io ->readable() ->wrap( $streams ->readable() ->acquire(\fopen('/some/file.ext', 'r')), ) ->toEncoding(Str\Encoding::ascii) // or call ->watch() to wait forever for the stream to be ready before // reading from it ->timeoutAfter(ElapsedPeriod::of(1_000)) ->chunks(8192) // max length of each chunk ->lazy() ->sequence();
$chunks
变量是一个包含Innmind\Innmutable\Sequence
的变量,其中包含Innmind\Immutable\Str
值,每个值的最大长度为8192
字节。在产生值之前,它会确保在从流中读取之前数据可用。如果在一秒钟内没有数据可用,则Sequence
将抛出一个异常,表示它不能从流中读取,如果你不希望它抛出异常,请用watch()
替换timeoutAfter()
,这样它将等待所需的时间。
按行读取流
use Innmind\IO\IO; use Innmind\OperatingSystem\Factory; use Innmind\TimeContinuum\Earth\ElapsedPeriod; use Innmind\Stream\Streams; use Innmind\Immutable\Str; $os = Factory::build(); $streams = Streams::fromAmbienAuthority(); $io = IO::of($os->sockets()->watch(...)); $lines = $io ->readable() ->wrap( $streams ->readable() ->acquire(\fopen('/some/file.ext', 'r')), ) ->toEncoding(Str\Encoding::ascii) // or call ->watch() to wait forever for the stream to be ready before // reading from it ->timeoutAfter(ElapsedPeriod::of(1_000)) ->lines() ->lazy() ->sequence();
这与按块读取(如上所述)相同,只不过分隔符是行结束字符\n
。
带有周期性心跳的套接字读取
use Innmind\IO\{ IO, Readable\Frame, }; use Innmind\OperatingSystem\Factory; use Innmind\TimeContinuum\Earth\ElapsedPeriod; use Innmind\Socket\{ Address, Client, }; use Innmind\Stream\Streams; use Innmind\Immutable\{ Str, Sequence, }; $socket = Client\Unix::of(Address\Unix::of('/tmp/foo'))->match( static fn($socket) => $socket, static fn() => throw new \RuntimeException; ); $os = Factory::build(); $io = IO::of($os->sockets()->watch(...)); $frame = $io ->sockets() ->clients() ->wrap($socket) ->toEncoding(Str\Encoding::ascii) ->timeoutAfter(ElapsedPeriod::of(1_000)) ->heartbeatWith(static fn() => Sequence::of(Str::of('heartbeat'))) ->frames(Frame\Line::new()) ->one() ->match( static fn($line) => $line, static fn() => throw new \RuntimeException, );
此示例将等待从套接字/tmp/foo.sock
读取单个值,并且每秒发送一个heartbeat
消息,直到接收到预期的行。
从流中读取
use Innmind\IO\IO; use Innmind\OperatingSystem\Factory; use Innmind\TimeContinuum\Earth\ElapsedPeriod; use Innmind\Stream\Streams; use Innmind\Socket\Address\Unix; use Innmind\Immutable\{ Str, Fold, Either, }; $os = Factory::build(); $streams = Streams::fromAmbienAuthority(); $io = IO::of($os->sockets()->watch(...)); $io ->readable() ->wrap( $os ->sockets() ->connectTo(Unix::of('/some/socket')), ) ->toEncoding('ASCII') // or call ->watch() to wait forever for the stream to be ready before // reading from it ->timeoutAfter(ElapsedPeriod::of(1_000)) ->chunks(8192) // max length of each chunk ->fold( Fold::with([]), static function(array $chunks, Str $chunk) { $chunks[] = $chunk->toString(); if ($chunk->contains('quit')) { return Fold::result($chunks); } if ($chunk->contains('throw')) { return Fold::fail('some error'); } return Fold::with($chunks); }, ) ->match( static fn(Either $result) => $result->match( static fn(array $chunks) => doStuff($chunks), static fn(string $error) => throw new \Exception($error), // $error === 'some error' ), static fn() => throw new \RuntimeException('Failed to read from the stream or it timed out'), );
此示例将
- 打开本地套接字
/some/socket
- 在每次尝试从它读取之前,监视套接字准备好
1
秒,然后超时 - 读取最大长度为
8192
的块 - 使用编码
ASCII
- 每次读取块时调用传递给
->fold()
的函数 - 它将一直从流中读取,直到某个块包含
quit
或throw
- 返回一个
Maybe<Either<string, list<string>>>
- 它为空时表示它无法从流中读取或超时
string
是传递给Fold::fail()
的值list<string>
是传递给Fold::result()
的值
您可以认为这个fold
操作是一个reduce,您可以通过返回Fold::fail()
或Fold::result()
来控制何时停止迭代。