phasync/phasync

phasync 是 PHP 的 asyncio 库,通过 PHP 纤程提供无缝且高效的协程


README

phasync

phasync:高并发 PHP

🧪 CI Latest Stable Version GitHub PHP Version Require codecov

异步编程不应该困难。这是一个用于在 PHP 中进行异步编程的新型微框架。它试图为 PHP 做的事情,与 asyncio 包为 Python 做的事情,以及 Go 默认所做的事情相同。关于 phasync 与其他异步大库(如 reactphp 和 amphp)的不同之处在于,phasync 并不试图重新设计你的编程方式。phasync 可以在你的大型应用程序的某个函数中使用,只需在你想通过并行执行来加速某些任务的地方。

它能为我的现有代码库带来哪些好处?

安装

phasync 的唯一要求是 PHP >= 8.1。它在 php-fpm 和命令行中运行良好。使用 composer 安装,或从 github 下载。

composer install phasync/phasync

文档

我们已经开始更多地关注文档。代码也经过了良好的文档说明。INTRO 文档提供了你开始所需的所有信息。

关于 phasync

文章 你的函数是什么颜色? 解释了在未为异步编程设计的一些语言中使用的一些方法。使用纤程,PHP 8.1 内置了原生的异步 IO。这个库简化了与它们的协同工作,并且为此进行了高度优化。

phasync 将 Go 启发的并发引入 PHP,利用原生和超快的协程来高效地管理数千个并发操作。通过利用现代 PHP 特性,如纤程,phasync 简化了异步编程,允许编写出既干净又易于维护的代码,在最小开销的同时执行多个任务。

使你的代码适合协程

phasync 不会接管你的应用程序并强迫你重构它。它只是在一个有限的上下文中同时运行函数的高效工具。异常会像你预期的那样抛出 - 但 没有 那些令人讨厌的 ->then()->catch() 东西。

例如,通过并发发送多个 HTTP 请求可以使这快两倍

function do_some_requesting() {
    return phasync::run(function() {
        $httpClient = new phasync\HttpClient\HttpClient();
        return [
            // These use an internal coroutine via phasync::go()
            $httpClient->get('https://www.vg.no/'),
            $httpClient->get('https://github.com/')
        ];
    });
}

你甚至可以并行化更复杂的流程

function crawl_for_urls(string $baseUrl) {
    return phasync::run(function() {
        phasync::channel($reader, $writer);
        $client = new HttpClient;
        $queue = new SplQueue;
        $queue->enqueue($baseUrl);

        // Launch 3 parallel workers, each waiting for messages from
        // the `$reader` channel.
        for ($i = 0; $i < 3; $i++) {
            phasync::go(function() use ($reader, $client, $queue) {
                while ($url = $reader->read()) {
                    $body = (string) $client->get($url)->getBody();
                    foreach (extract_urls_from_body($body) as $foundUrl) {
                        $queue->enqueue($foundUrl);
                    }
                }
            });
        }
        $alreadyCrawled = [];
        while (!$queue->isEmpty()) {
            $nextUrl = $queue->dequeue();
            if (in_array($alreadyCrawled)) {
                continue;
            }
            $alreadyCrawled[] = $nextUrl;
            $writer->write($nextUrl);
        }
    });
}

轻松使任何现有代码成为 phasync 感知

使用非阻塞执行模式转换你的应用程序处理 IO 密集型和 CPU 密集型操作的方式。无论是构建高流量 Web 应用程序、数据处理系统还是实时 API,phasync 都为你提供了平滑可靠地扩展操作的工具。

要使磁盘 IO 操作在协程中异步,透明地使用

composer require phasync/file-streamwrapper

这使得甚至类加载也不会阻塞其他协程,你可以继续使用 file_get_contents()file_put_contents() 等函数。

如果您希望使网络套接字异步,可以按照以下方法进行。您还可以在协程之外安全地使用这些方法。它们不应干扰您的软件的工作方式,但当函数在协程内部使用时,它们将使用异步IO,以便其他协程可以并发工作。

读取网络或文件流

// Instead of:
$chunk = fread($fp, 4096);

// Do this:
phasync::readable($fp);
$chunk = fread($fp, 4096);

写入文件或网络流

// Instead of:
$chunk = fwrite($fp, "Some data");

// Do this:
phasync::writable($fp);
$chunk = fwrite($fp, "Some data");

等待网络请求

// Instead of:
$resource = stream_socket_accept($socket);

// Do this:
phasync::readable($socket);
$resource = stream_socket_accept($socket);

执行昂贵的阻塞操作

注意!当没有任何其他事情可做时,phasync::idle()才会休眠。实际上,它将等待您的应用程序没有其他事情可做,然后再运行慢速函数。大多数情况下,它根本不会休眠。

// Instead of:
$n = fibonacci(32);

// Do this:
phasync::idle(0.1); // Wait at most 0.1 seconds for the application to become idle
$n = fibonacci(32);

// Instead of:
$files = glob("*.txt");

// Do this:
phasync::idle(0.1);
$files = glob("*.txt");

实用工具

通道

通道用于在协程之间进行通信。通道是特殊的原语,使用phasync::channel($readableChannel, $writableChannel, $bufferSize=0)创建。可读通道有一个read()方法,它将返回写入可写通道的值。如果没有可用数据,协程将被挂起,直到写入器将数据写入通道。类似地,可写通道有一个write(Serializable|scalar $value)方法,如果缓冲区已满或没有等待数据的读取器,它将挂起协程。缓冲区大小允许您在通道中排队值。

通道高度优化,并能够立即恢复协程,因此可以用于在协程之间高效地调度工作。当另一侧垃圾回收或调用$channel->close()时,它会自动关闭另一个通道。当通道关闭时,可读通道将返回null

例如,如果您有一个协程专门设计用于写入磁盘或更新数据库,并且有10个协程正在爬取网站,您可以这样做

phasync::run(function() {
    phasync::channel($reader, $writer);

    // The logger coroutine
    phasync::go(function() use ($reader) {
        $fp = fopen('some-log.txt', 'a');
        while (null !== ($line = $reader->read())) {
            // this is non-blocking if you install phasync/file-streamwrapper
            fwrite($fp, trim($line) . "\n");
        }
        fclose($fp);
    });

    $writerNumber = 1;
    phasync::go(concurrent: 5, fn: function() use ($writer, $writerNumber) {
        $number = $writerNumber++;
        for ($i = 0; $i < 10; $i++) {
            $writer->write("From writer $writerNumber: This is message $i");
        }
    });
});

WaitGroups

WaitGroups提供了一种小工具,允许多个不同的协程完成其工作。例如,如果您发出10个并发HTTP请求,您可以使用WaitGroup确保所有10个协程都完成了其任务。

示例

phasync::run(function() {
    $wg = new WaitGroup();

    phasync::go(concurrent: 5, fn: function() use ($wg) {
        $wg->add(); // Inform the WaitGroup that this coroutine will be performing some work
        try {
            // Do the work
            phasync::sleep(0.5);
        } finally {
            $wg->done();
        }
    });

    // Wait until the 5 coroutines have finished their work
    $wg->wait();
});

发布者

为确保消息和事件被多个协程接收,即使在协程阻塞的情况下,您可以使用发布者。发布者是对发布者/订阅者的实现,因此任何写入发布者的消息都将按顺序由所有订阅者接收。类似于通道,这些是phasync原语,您可以使用phasync::publisher($delivery, $publisher)创建。

示例

phasync::run(function() {
    phasync::publisher($source, $writeChannel);

    // Launch 10 subscribers:
    phasync::go(concurrent: 10, fn: function() use ($source) {
        $readChannel = $delivery->subscribe();
        while ($line = $readChannel->read()) {
            echo "Subscriber got " . trim($line) . "\n";
        }
    });

    $writeChannel->write("First");
    $writeChannel->write("Second");
});

工作进行中

虽然库似乎很稳定,但尚未经过实战测试。我们希望爱好者参与构建测试、文档和围绕架构的讨论。

请贡献力量;我们希望使用异步工具

  • Process类,用于使用proc_open()运行后台进程。使用Process类,我们可以使用子php进程来运行无法转换为非阻塞的函数,如目录扫描、DNS查找等。它还可以用于扩展应用程序以利用多个CPU核心。此类另一个用途是作为单独的进程运行sqlite3命令,允许异步查询sqlite3数据库。

  • 基于mysqli构建的异步MySQL驱动程序,支持异步数据库访问所需的一切。

  • TcpServer类,用于使用stream_socket_server开发快速和并发的TCP服务器。

    TcpServer类应该使开发TCP服务器变得容易。请参阅phasync\TcpServer中正在进行的工作。结合通道、WaitGroups和发布者,可以设计出许多强大的服务。

    此类将是为phasync应用程序作为独立和并发应用程序提供服务的基础;如HttpServerFastCGIServer

  • TcpClient类,它将简化开发用于重要系统(如redismemcached)的客户端,这些系统也是异步的。

  • 使用 TcpClient 实现异步和并发请求的 HttpClient,通过 curl_multi_init

  • http://https://file:// 提供流包装器,以便实现这些异步操作。

示例:在 Web 控制器中异步处理文件

<?php
require '../vendor/autoload.php';
use phasync\{run, go, file_get_contents};

class MyController {

    #[Route("/", "index")]
    public function index() {
        // Initiate the event loop within your existing controller method
        phasync::run(function() {
            // Process each text file asynchronously
            foreach (glob('/some/path/*.txt') as $file) {
                phasync::go(function() use ($file) {
                    $data = file_get_contents($file);  // Non-blocking file read
                    do_something($data);  // Replace with your processing logic
                });
            }
        });
        // The run function will wait here until all file operations are complete
    }
}

优点

非侵入性:在不破坏现有 PHP 项目结构的情况下集成异步功能。性能提升:利用非阻塞 IO 处理文件操作、数据库查询和网络调用,更高效。易于采用:通过最小化对函数调用的更改,可以将同步任务转换为异步任务。

这种方法不仅保留了应用程序的现有架构,还通过将重 IO 操作卸载到 phasync 的非阻塞例程来提高响应性和可伸缩性。非常适合需要改进大量数据或高用户交互处理,但无需完全重写的应用程序。

需要测试人员、文档编写人员和贡献者!

虽然与基于 Promise 的实现(如 reactphp 或 amphp)相比,phasync 更快、更简单,特别是具有合理且易于理解的异常处理,但它仍在不断发展。我们邀请测试人员和贡献者帮助扩展其功能和生态系统。

<<<<<<< 上游更新

示例

这个综合示例记录了 phasync 的许多功能。脚本位于本项目的 examples/ 文件夹中。

<?php
require '../vendor/autoload.php';

/**
 * Channel is an efficient method for coordinating coroutines.
 * The writer will pause after writing, allowing the reader to
 * read the message. When the reader becomes blocked again (for
 * example waiting for the next message, or because it tries to
 * read a file, the write resumes and can add a new message).
 *
 * The Channel class supports multiple readers and multiple writers,
 * but messages will only be read once by the first available reader.
 *
 * A channel can be buffered (via the buffer argument in the constructor),
 * which allows messages to be temporarily held allowing the writer to
 * resume working. This can be leveraged to design a queue system.
 */
use phasync\Channel;

/**
 * Publisher is similar to Channel, but it is always buffered, and
 * any message will be delivered in order to all of the readers.
 *
 * This can be used to "multicast" the same data to many clients,
 * or as an event dispatcher. The readers will block whenever there
 * are no events pending. The read operation will return a null value
 * if the publisher goes away.
 */
use phasync\Publisher;

/**
 * WaitGroup is a mechanism to simplify waiting for many simultaneous
 * processes to complete. It is analogous to Promise.all([]) known from
 * promise based designs. Each process will invoke the $waitGroup->add()
 * method, and finally they must invoke $waitGroup->done() when they are
 * finished.
 *
 * While all the simultaneous processes perform their task, you can call
 * $waitGroup->wait() to pause until the all coroutines have invoked
 * $waitGroup->done().
 *
 * WARNING! You must ensure that the $waitGroup->done() method is invoked,
 * or the $waitGroup->wait() method will block forever.
 */
use phasync\WaitGroup;

/**
 * The library is primarily used via functions defined in the `phasync\`
 * namespace:
 */

use function phasync\run;
/**
 * `run(Closure $coroutine, mixed ...$args): mixed`
 *
 * This function will launch the coroutine and wait for it
 * to either throw an exception or return with a value.
 * When this function is used from inside another run()
 * coroutine, it will block until all the coroutines that
 * were launched inside it are done.
 */

use function phasync\go;
/**
 * `go(Closure $coroutine, mixed ...$args): Fiber`
 *
 * This function will launch a coroutine and return a value
 * that may be resolved in the future. You can wait for a
 * fiber to finish by using {@see phasync\await()}, which
 * effectively is identical to using `run()`.
 */

use function phasync\await;
/**
 * `await(Fiber $coroutine): mixed`
 *
 * This function will block the calling fiber until the
 * provided $coroutine either fails and throws an exception,
 * or returns a value.
 */

use function phasync\defer;
/**
 * `defer(Closure $cleanupFunction): void`
 *
 * This is a method for scheduling cleanup or other tasks to
 * run after the coroutine completes. The deferred functions
 * will run in reverse order of when they were scheduled. The
 * functions will run immediately after the coroutine finishes,
 * unless an exception occurs and then they will be run when
 * the coroutine is garbage collected.
 */

use function phasync\sleep;
/**
 * `sleep(float $seconds=0): void`
 *
 * This method will pause the coroutine for a number of seconds.
 * By invoking `sleep()` without arguments, your coroutine will
 * yield to allow other coroutines to work, but resume immediately.
 */

use function phasync\wait_idle;
/**
 * `wait_idle(): void`
 *
 * This function will pause the coroutine and allow it to resume only
 * when there is nothing else to do immediately.
 */

use function phasync\file_get_contents;
/**
 * `file_get_contents(string $filename): string|false`
 *
 * This function will use non-blocking file operations to read the entire
 * file from disk. While the application is waiting for the disk to provide
 * data, other coroutines are allowed to continue working.
 */

 use function phasync\file_put_contents;
 /**
  * `file_put_contents(string $filename, mixed $data, int $flags = 0): void`
  *
  * This function is also non-blocking but has an API identical to the native
  * `file_put_contents()` function in PHP.
  */

/**
 * Other functions not documented here, but which are designed after the native
 * PHP standard library while being non-blocking. The functions *behave* as if
 * they are blocking, but will allow other coroutines to work in the time they
 * block.
 *
 * `stream_get_contents($stream, ?int $maxLength = null, int $offset = 0): string|false`
 * `fread($stream, int $length): string|false`
 * `fgets($stream, ?int $length = null): string|false`
 * `fgetc($stream): string|false`
 * `fgetcsv($stream, ?int $length = null, string $separator = ",", string $enclosure = "\"", string $escape = "\\"): array|false`
 * `fwrite($stream, string $data): int|false`
 * `ftruncate($stream, int $size): int|false`
 * `flock($stream, int $operation, int &$would_block = null): bool`
 */


// Launch your asynchronous application:
try {
    run(function() {

        $keep_running = true;
        $maintenance_events = new Publisher();

        // launch a background task
        $count = go(function() use (&$keep_running, $maintenance_events) {
            $count = 0;

            while ($keep_running) {
                // do some maintenance work
                $data = file_get_contents(__FILE__); // this is asynchronous
                $maintenance_events->write(md5($data) . " step $count");
                $count++;
                // wait a while before repeating
                sleep(0.7); // allows other tasks to do some work
            }

            return $count;
        });

        $wait_group = new WaitGroup();
        [$reader, $writer] = Channel::create(0);

        go(function() use ($reader) {
            echo "Waiting for completion messages\n";
            while ($message = $reader->read()) {
                echo "Completed: " . $message . "\n";
            }
            echo "No more completion messages\n";
        });

        $futureWithException = go(function() {
            throw new Exception("Just an exception");
        });

        // launch various workers
        for ($i = 0; $i < 3; $i++) {
            // Create a subscription for the events
            $subscription = $maintenance_events->subscribe();
            go(function() use ($i, $subscription, $wait_group, $writer) {
                // Register with the $waitGroup
                $wait_group->add();
                defer(function() use ($wait_group) {
                    $wait_group->done();
                });

                echo "Worker $i waiting for events...\n";

                // This worker will handle at most 10 events
                for ($count = 0; $count < 4; $count++) {
                    sleep(1 * $i);
                    $writer->write("Worker $i received: {$subscription->read()}");
                }

                /**
                 * If an exception is thrown here, it will appear to have been
                 * thrown from the outer coroutine while the $waitGroup->wait()
                 * function is blocking.
                 */

                echo "Worker $i done\n";

            });
        }

        echo "Waitgroup waiting\n";

        // wait for all workers to complete
        $wait_group->wait();
        echo "Waitgroup done\n";


        // stop the background maintenance
        $keep_running = false;

        echo "A total of " . await($count) . " maintenance steps were completed\n";

        echo "Trying to resolve the error value:\n";
        try {
            await($futureWithException);
        } catch (Throwable $e) {
            echo "Could not resolve the value: \n$e\n";
        }

    });
} catch (Throwable $e) {
    echo "I successfully caught the missed exception in Worker 1:\n";
    echo " " . $e->getMessage() . "\n";
}

入门指南

通过 Composer 安装 phasync,并开始利用强大的异步能力增强您的 PHP 应用程序

composer require phasync/phasync

兼容性

许可证

phasync 是开源软件,许可证为 MIT 许可证。