phasync / phasync
phasync 是 PHP 的 asyncio 库,通过 PHP 纤程提供无缝且高效的协程
Requires
- php: ^8.2
- charm/options: ^1.1.5
- laravel/serializable-closure: ^1.3
- psr/http-factory: ^1.1
- psr/http-message: ^1.0 || ^2.0
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.58
- pestphp/pest: ^2.34
Provides
- dev-main
- 1.1.0-rc4
- 1.1.0-rc3
- 1.1.0-rc2
- 1.1.0-rc1
- 1.1.0-rc0
- 1.0.7
- 1.0.6
- 1.0.5
- 1.0.4
- 1.0.3
- 1.0.2
- 1.0.1
- 1.0.0
- 0.2.0-rc1
- 0.1.7-rc1
- 0.1.6-rc1
- 0.1.5-rc3
- 0.1.5-rc2
- 0.1.5-rc1
- 0.1.4-rc1
- 0.1.3-rc1
- 0.1.2-rc3
- 0.1.2-rc2
- 0.1.2-rc1
- 0.1.1-rc5
- 0.1.1-rc4
- 0.1.1-rc3
- 0.1.1-rc2
- 0.1.1-rc1
- 0.1.0-rc1
- 0.0.3-rc1
- 0.0.2-rc2
- 0.0.2-rc1
- 0.0.1-rc3
- 0.0.1-rc2
- 0.0.1-rc1
- 0.0.1-beta6
- 0.0.1-beta5
- 0.0.1-beta4
- dev-dependabot/composer/pestphp/pest-tw-2.34or-tw-3.0
This package is auto-updated.
Last update: 2024-09-09 17:47:44 UTC
README
phasync:高并发 PHP
异步编程不应该困难。这是一个用于在 PHP 中进行异步编程的新型微框架。它试图为 PHP 做的事情,与 asyncio 包为 Python 做的事情,以及 Go 默认所做的事情相同。关于 phasync 与其他异步大库(如 reactphp 和 amphp)的不同之处在于,phasync 并不试图重新设计你的编程方式。phasync 可以在你的大型应用程序的某个函数中使用,只需在你想通过并行执行来加速某些任务的地方。
安装
phasync 的唯一要求是 PHP >= 8.1。它在 php-fpm 和命令行中运行良好。使用 composer 安装,或从 github 下载。
composer install phasync/phasync
文档
我们已经开始更多地关注文档。代码也经过了良好的文档说明。INTRO 文档提供了你开始所需的所有信息。
- INTRO:
phasync::run() 和 phasync::go()
- 基本示例和实现
- 异步 IO 核心功能
- 在现有项目中使用 phasync
- 使用 CurlMulti 进行并发 HTTP 请求
- 使用 RateLimiter 类进行节流
- 使用 WaitGroup 协调多个任务
- 使用
phasync::preempt()
确保CPU密集型代码不会阻塞整个应用程序 - 编写基本的 Web 服务器
关于 phasync
文章 你的函数是什么颜色? 解释了在未为异步编程设计的一些语言中使用的一些方法。使用纤程,PHP 8.1 内置了原生的异步 IO。这个库简化了与它们的协同工作,并且为此进行了高度优化。
phasync 将 Go 启发的并发引入 PHP,利用原生和超快的协程来高效地管理数千个并发操作。通过利用现代 PHP 特性,如纤程,phasync 简化了异步编程,允许编写出既干净又易于维护的代码,在最小开销的同时执行多个任务。
使你的代码适合协程
phasync 不会接管你的应用程序并强迫你重构它。它只是在一个有限的上下文中同时运行函数的高效工具。异常会像你预期的那样抛出 - 但 没有 那些令人讨厌的 ->then()
和 ->catch()
东西。
例如,通过并发发送多个 HTTP 请求可以使这快两倍
function do_some_requesting() { return phasync::run(function() { $httpClient = new phasync\HttpClient\HttpClient(); return [ // These use an internal coroutine via phasync::go() $httpClient->get('https://www.vg.no/'), $httpClient->get('https://github.com/') ]; }); }
你甚至可以并行化更复杂的流程
function crawl_for_urls(string $baseUrl) { return phasync::run(function() { phasync::channel($reader, $writer); $client = new HttpClient; $queue = new SplQueue; $queue->enqueue($baseUrl); // Launch 3 parallel workers, each waiting for messages from // the `$reader` channel. for ($i = 0; $i < 3; $i++) { phasync::go(function() use ($reader, $client, $queue) { while ($url = $reader->read()) { $body = (string) $client->get($url)->getBody(); foreach (extract_urls_from_body($body) as $foundUrl) { $queue->enqueue($foundUrl); } } }); } $alreadyCrawled = []; while (!$queue->isEmpty()) { $nextUrl = $queue->dequeue(); if (in_array($alreadyCrawled)) { continue; } $alreadyCrawled[] = $nextUrl; $writer->write($nextUrl); } }); }
轻松使任何现有代码成为 phasync 感知
使用非阻塞执行模式转换你的应用程序处理 IO 密集型和 CPU 密集型操作的方式。无论是构建高流量 Web 应用程序、数据处理系统还是实时 API,phasync 都为你提供了平滑可靠地扩展操作的工具。
要使磁盘 IO 操作在协程中异步,透明地使用
composer require phasync/file-streamwrapper
这使得甚至类加载也不会阻塞其他协程,你可以继续使用 file_get_contents()
、file_put_contents()
等函数。
如果您希望使网络套接字异步,可以按照以下方法进行。您还可以在协程之外安全地使用这些方法。它们不应干扰您的软件的工作方式,但当函数在协程内部使用时,它们将使用异步IO,以便其他协程可以并发工作。
读取网络或文件流
// Instead of: $chunk = fread($fp, 4096); // Do this: phasync::readable($fp); $chunk = fread($fp, 4096);
写入文件或网络流
// Instead of: $chunk = fwrite($fp, "Some data"); // Do this: phasync::writable($fp); $chunk = fwrite($fp, "Some data");
等待网络请求
// Instead of: $resource = stream_socket_accept($socket); // Do this: phasync::readable($socket); $resource = stream_socket_accept($socket);
执行昂贵的阻塞操作
注意!当没有任何其他事情可做时,phasync::idle()
才会休眠。实际上,它将等待您的应用程序没有其他事情可做,然后再运行慢速函数。大多数情况下,它根本不会休眠。
// Instead of: $n = fibonacci(32); // Do this: phasync::idle(0.1); // Wait at most 0.1 seconds for the application to become idle $n = fibonacci(32); // Instead of: $files = glob("*.txt"); // Do this: phasync::idle(0.1); $files = glob("*.txt");
实用工具
通道
通道用于在协程之间进行通信。通道是特殊的原语,使用phasync::channel($readableChannel, $writableChannel, $bufferSize=0)
创建。可读通道有一个read()
方法,它将返回写入可写通道的值。如果没有可用数据,协程将被挂起,直到写入器将数据写入通道。类似地,可写通道有一个write(Serializable|scalar $value)
方法,如果缓冲区已满或没有等待数据的读取器,它将挂起协程。缓冲区大小允许您在通道中排队值。
通道高度优化,并能够立即恢复协程,因此可以用于在协程之间高效地调度工作。当另一侧垃圾回收或调用$channel->close()
时,它会自动关闭另一个通道。当通道关闭时,可读通道将返回null
。
例如,如果您有一个协程专门设计用于写入磁盘或更新数据库,并且有10个协程正在爬取网站,您可以这样做
phasync::run(function() { phasync::channel($reader, $writer); // The logger coroutine phasync::go(function() use ($reader) { $fp = fopen('some-log.txt', 'a'); while (null !== ($line = $reader->read())) { // this is non-blocking if you install phasync/file-streamwrapper fwrite($fp, trim($line) . "\n"); } fclose($fp); }); $writerNumber = 1; phasync::go(concurrent: 5, fn: function() use ($writer, $writerNumber) { $number = $writerNumber++; for ($i = 0; $i < 10; $i++) { $writer->write("From writer $writerNumber: This is message $i"); } }); });
WaitGroups
WaitGroups提供了一种小工具,允许多个不同的协程完成其工作。例如,如果您发出10个并发HTTP请求,您可以使用WaitGroup确保所有10个协程都完成了其任务。
示例
phasync::run(function() { $wg = new WaitGroup(); phasync::go(concurrent: 5, fn: function() use ($wg) { $wg->add(); // Inform the WaitGroup that this coroutine will be performing some work try { // Do the work phasync::sleep(0.5); } finally { $wg->done(); } }); // Wait until the 5 coroutines have finished their work $wg->wait(); });
发布者
为确保消息和事件被多个协程接收,即使在协程阻塞的情况下,您可以使用发布者。发布者是对发布者/订阅者的实现,因此任何写入发布者的消息都将按顺序由所有订阅者接收。类似于通道,这些是phasync原语,您可以使用phasync::publisher($delivery, $publisher)
创建。
示例
phasync::run(function() { phasync::publisher($source, $writeChannel); // Launch 10 subscribers: phasync::go(concurrent: 10, fn: function() use ($source) { $readChannel = $delivery->subscribe(); while ($line = $readChannel->read()) { echo "Subscriber got " . trim($line) . "\n"; } }); $writeChannel->write("First"); $writeChannel->write("Second"); });
工作进行中
虽然库似乎很稳定,但尚未经过实战测试。我们希望爱好者参与构建测试、文档和围绕架构的讨论。
请贡献力量;我们希望使用异步工具
-
Process
类,用于使用proc_open()
运行后台进程。使用Process
类,我们可以使用子php
进程来运行无法转换为非阻塞的函数,如目录扫描、DNS查找等。它还可以用于扩展应用程序以利用多个CPU核心。此类另一个用途是作为单独的进程运行sqlite3
命令,允许异步查询sqlite3数据库。 -
基于
mysqli
构建的异步MySQL
驱动程序,支持异步数据库访问所需的一切。 -
TcpServer
类,用于使用stream_socket_server
开发快速和并发的TCP服务器。TcpServer
类应该使开发TCP服务器变得容易。请参阅phasync\TcpServer
中正在进行的工作。结合通道、WaitGroups和发布者,可以设计出许多强大的服务。此类将是为phasync应用程序作为独立和并发应用程序提供服务的基础;如
HttpServer
或FastCGIServer
。 -
TcpClient
类,它将简化开发用于重要系统(如redis
或memcached
)的客户端,这些系统也是异步的。 -
使用
TcpClient
实现异步和并发请求的HttpClient
,通过curl_multi_init
。 -
为
http://
、https://
和file://
提供流包装器,以便实现这些异步操作。
示例:在 Web 控制器中异步处理文件
<?php require '../vendor/autoload.php'; use phasync\{run, go, file_get_contents}; class MyController { #[Route("/", "index")] public function index() { // Initiate the event loop within your existing controller method phasync::run(function() { // Process each text file asynchronously foreach (glob('/some/path/*.txt') as $file) { phasync::go(function() use ($file) { $data = file_get_contents($file); // Non-blocking file read do_something($data); // Replace with your processing logic }); } }); // The run function will wait here until all file operations are complete } }
优点
非侵入性:在不破坏现有 PHP 项目结构的情况下集成异步功能。性能提升:利用非阻塞 IO 处理文件操作、数据库查询和网络调用,更高效。易于采用:通过最小化对函数调用的更改,可以将同步任务转换为异步任务。
这种方法不仅保留了应用程序的现有架构,还通过将重 IO 操作卸载到 phasync 的非阻塞例程来提高响应性和可伸缩性。非常适合需要改进大量数据或高用户交互处理,但无需完全重写的应用程序。
需要测试人员、文档编写人员和贡献者!
虽然与基于 Promise 的实现(如 reactphp 或 amphp)相比,phasync 更快、更简单,特别是具有合理且易于理解的异常处理,但它仍在不断发展。我们邀请测试人员和贡献者帮助扩展其功能和生态系统。
<<<<<<< 上游更新
示例
这个综合示例记录了 phasync 的许多功能。脚本位于本项目的 examples/
文件夹中。
<?php require '../vendor/autoload.php'; /** * Channel is an efficient method for coordinating coroutines. * The writer will pause after writing, allowing the reader to * read the message. When the reader becomes blocked again (for * example waiting for the next message, or because it tries to * read a file, the write resumes and can add a new message). * * The Channel class supports multiple readers and multiple writers, * but messages will only be read once by the first available reader. * * A channel can be buffered (via the buffer argument in the constructor), * which allows messages to be temporarily held allowing the writer to * resume working. This can be leveraged to design a queue system. */ use phasync\Channel; /** * Publisher is similar to Channel, but it is always buffered, and * any message will be delivered in order to all of the readers. * * This can be used to "multicast" the same data to many clients, * or as an event dispatcher. The readers will block whenever there * are no events pending. The read operation will return a null value * if the publisher goes away. */ use phasync\Publisher; /** * WaitGroup is a mechanism to simplify waiting for many simultaneous * processes to complete. It is analogous to Promise.all([]) known from * promise based designs. Each process will invoke the $waitGroup->add() * method, and finally they must invoke $waitGroup->done() when they are * finished. * * While all the simultaneous processes perform their task, you can call * $waitGroup->wait() to pause until the all coroutines have invoked * $waitGroup->done(). * * WARNING! You must ensure that the $waitGroup->done() method is invoked, * or the $waitGroup->wait() method will block forever. */ use phasync\WaitGroup; /** * The library is primarily used via functions defined in the `phasync\` * namespace: */ use function phasync\run; /** * `run(Closure $coroutine, mixed ...$args): mixed` * * This function will launch the coroutine and wait for it * to either throw an exception or return with a value. * When this function is used from inside another run() * coroutine, it will block until all the coroutines that * were launched inside it are done. */ use function phasync\go; /** * `go(Closure $coroutine, mixed ...$args): Fiber` * * This function will launch a coroutine and return a value * that may be resolved in the future. You can wait for a * fiber to finish by using {@see phasync\await()}, which * effectively is identical to using `run()`. */ use function phasync\await; /** * `await(Fiber $coroutine): mixed` * * This function will block the calling fiber until the * provided $coroutine either fails and throws an exception, * or returns a value. */ use function phasync\defer; /** * `defer(Closure $cleanupFunction): void` * * This is a method for scheduling cleanup or other tasks to * run after the coroutine completes. The deferred functions * will run in reverse order of when they were scheduled. The * functions will run immediately after the coroutine finishes, * unless an exception occurs and then they will be run when * the coroutine is garbage collected. */ use function phasync\sleep; /** * `sleep(float $seconds=0): void` * * This method will pause the coroutine for a number of seconds. * By invoking `sleep()` without arguments, your coroutine will * yield to allow other coroutines to work, but resume immediately. */ use function phasync\wait_idle; /** * `wait_idle(): void` * * This function will pause the coroutine and allow it to resume only * when there is nothing else to do immediately. */ use function phasync\file_get_contents; /** * `file_get_contents(string $filename): string|false` * * This function will use non-blocking file operations to read the entire * file from disk. While the application is waiting for the disk to provide * data, other coroutines are allowed to continue working. */ use function phasync\file_put_contents; /** * `file_put_contents(string $filename, mixed $data, int $flags = 0): void` * * This function is also non-blocking but has an API identical to the native * `file_put_contents()` function in PHP. */ /** * Other functions not documented here, but which are designed after the native * PHP standard library while being non-blocking. The functions *behave* as if * they are blocking, but will allow other coroutines to work in the time they * block. * * `stream_get_contents($stream, ?int $maxLength = null, int $offset = 0): string|false` * `fread($stream, int $length): string|false` * `fgets($stream, ?int $length = null): string|false` * `fgetc($stream): string|false` * `fgetcsv($stream, ?int $length = null, string $separator = ",", string $enclosure = "\"", string $escape = "\\"): array|false` * `fwrite($stream, string $data): int|false` * `ftruncate($stream, int $size): int|false` * `flock($stream, int $operation, int &$would_block = null): bool` */ // Launch your asynchronous application: try { run(function() { $keep_running = true; $maintenance_events = new Publisher(); // launch a background task $count = go(function() use (&$keep_running, $maintenance_events) { $count = 0; while ($keep_running) { // do some maintenance work $data = file_get_contents(__FILE__); // this is asynchronous $maintenance_events->write(md5($data) . " step $count"); $count++; // wait a while before repeating sleep(0.7); // allows other tasks to do some work } return $count; }); $wait_group = new WaitGroup(); [$reader, $writer] = Channel::create(0); go(function() use ($reader) { echo "Waiting for completion messages\n"; while ($message = $reader->read()) { echo "Completed: " . $message . "\n"; } echo "No more completion messages\n"; }); $futureWithException = go(function() { throw new Exception("Just an exception"); }); // launch various workers for ($i = 0; $i < 3; $i++) { // Create a subscription for the events $subscription = $maintenance_events->subscribe(); go(function() use ($i, $subscription, $wait_group, $writer) { // Register with the $waitGroup $wait_group->add(); defer(function() use ($wait_group) { $wait_group->done(); }); echo "Worker $i waiting for events...\n"; // This worker will handle at most 10 events for ($count = 0; $count < 4; $count++) { sleep(1 * $i); $writer->write("Worker $i received: {$subscription->read()}"); } /** * If an exception is thrown here, it will appear to have been * thrown from the outer coroutine while the $waitGroup->wait() * function is blocking. */ echo "Worker $i done\n"; }); } echo "Waitgroup waiting\n"; // wait for all workers to complete $wait_group->wait(); echo "Waitgroup done\n"; // stop the background maintenance $keep_running = false; echo "A total of " . await($count) . " maintenance steps were completed\n"; echo "Trying to resolve the error value:\n"; try { await($futureWithException); } catch (Throwable $e) { echo "Could not resolve the value: \n$e\n"; } }); } catch (Throwable $e) { echo "I successfully caught the missed exception in Worker 1:\n"; echo " " . $e->getMessage() . "\n"; }
入门指南
通过 Composer 安装 phasync,并开始利用强大的异步能力增强您的 PHP 应用程序
composer require phasync/phasync
兼容性
许可证
phasync 是开源软件,许可证为 MIT 许可证。