amphp / cluster
使用 PHP 构建多核网络应用程序。
Requires
- php: >=8.1
- ext-sockets: *
- amphp/amp: ^3
- amphp/byte-stream: ^2
- amphp/log: ^2
- amphp/parallel: ^2.2
- amphp/pipeline: ^1.1
- amphp/process: ^2
- amphp/serialization: ^1
- amphp/socket: ^2
- amphp/sync: ^2
- league/climate: ^3
- monolog/monolog: ^3|^2|^1.23
- psr/log: ^3|^2|^1
- revolt/event-loop: ^1
Requires (Dev)
- ext-pcntl: *
- amphp/file: ^3
- amphp/http-server: ^3
- amphp/php-cs-fixer-config: ^2
- amphp/phpunit-util: ^3
- phpunit/phpunit: ^9
- psalm/phar: ^5.11
Suggests
- ext-sockets: Required for socket transfer on systems that do not support SO_REUSEPORT
- amphp/file: Required for logging to a file
Conflicts
- amphp/file: <3 || >=4
This package is auto-updated.
Last update: 2024-09-17 20:33:01 UTC
README
AMPHP 是一个针对 PHP 的事件驱动库集合,设计时考虑了纤程和并发性。 amphp/cluster
提供了将网络套接字传输到独立 PHP 进程的工具,以及一个轻量级框架来创建多进程服务器集群。
要求
- PHP 8.1+
ext-sockets
安装
此包可以作为 Composer 依赖项安装。
composer require amphp/cluster
文档
传输套接字
通过将套接字从父进程传输到子进程来构建集群,每个子进程都监听连接和处理客户端套接字。此库提供了一些底层组件,可以独立于集群框架使用。这些组件允许您编写自己的服务器逻辑,将服务器套接字或客户端套接字传输到子进程以分配负载或分组相关客户端。
传输客户端套接字
ClientSocketReceivePipe
和 ClientSocketSendPipe
这对类用于通过两个进程之间现有的 IPC 连接将客户端套接字从一个 PHP 进程发送到另一个 PHP 进程。
以下示例演示了在父进程中创建一个新的子进程,然后在父进程和子进程之间建立一个新的 IPC 套接字。该套接字用于在父进程中创建一个 ClientSocketSendPipe
,并在子进程中创建相应的 ClientSocketReceivePipe
。然后父进程创建一个套接字服务器并监听连接。当收到连接时,客户端套接字被传输到子进程进行处理。
// parent.php use Amp\Cluster\ClientSocketSendPipe; use Amp\Parallel\Context\ProcessContextFactory; use Amp\Parallel\Ipc\LocalIpcHub; use Amp\Socket; use Revolt\EventLoop; use function Amp\Socket\listen; $ipcHub = new LocalIpcHub(); // Sharing the IpcHub instance with the context factory isn't required, // but reduces the number of opened sockets. $contextFactory = new ProcessContextFactory(ipcHub: $ipcHub); $context = $contextFactory->start(__DIR__ . '/child.php'); $connectionKey = $ipcHub->generateKey(); $context->send(['uri' => $ipcHub->getUri(), 'key' => $connectionKey]); // $socket will be a bidirectional socket to the child. $socket = $ipcHub->accept($connectionKey); $socketPipe = new ClientSocketSendPipe($socket); $server = listen('127.0.0.1:1337'); // Close server when SIGTERM is received. EventLoop::onSignal(SIGTERM, $server->close(...)); $clientId = 0; while ($client = $server->accept()) { // $clientId is an example of arbitrary data which may be // associated with a transferred socket. $socketPipe->send($client, ++$clientId); }
// child.php use Amp\Cluster\ClientSocketReceivePipe; use Amp\Socket\Socket; use Amp\Sync\Channel; return function (Channel $channel): void { ['uri' => $uri, 'key' => $connectionKey] = $channel->receive(); // $socket will be a bidirectional socket to the parent. $socket = Amp\Parallel\Ipc\connect($uri, $connectionKey); $socketPipe = new ClientSocketReceivePipe($socket); while ($transferredSocket = $socketPipe->receive()) { // Handle client socket in a separate coroutine (fiber). async( function (Socket $client, int $id) { /* ... */ }, $transferredSocket->getSocket(), // Transferred socket $transferredSocket->getData(), // Associated data ); } };
虽然这个例子有些牵强,因为将所有客户端发送到单个进程几乎没有理由,但很容易将这样的例子扩展到父进程,该父进程平衡一组子进程或根据某些其他因素分配客户端。在将套接字传输之前,可以在父进程中的客户端套接字上进行读取和写入。例如,HTTP 服务器可能在将套接字传输到子进程之前建立 WebSocket 连接。请参阅 amphp/http-server
和 amphp/websocket-server
以获取构建此类服务器的其他组件。
传输服务器套接字
以下示例演示了在父进程中创建一个新的子进程,然后在父进程和子进程之间建立一个新的 IPC 套接字。在父进程中,将 IPC 套接字传递给 ServerSocketPipeProvider::provideFor()
,该函数监听 IPC 套接字上的服务器套接字请求。在子进程中,将 IPC 套接字提供给 ServerSocketPipeFactory
的一个实例。当子进程使用 ServerSocketPipeFactory
创建服务器套接字时,服务器套接字在父进程中创建,然后发送到子进程。如果父进程创建了多个子进程,请求相同服务器套接字的任何子进程都会收到对该套接字的另一个引用,允许多个子进程监听相同的地址和端口。操作系统会轮询选择传入的客户端连接。为了更好地控制客户端分配,请考虑在单个进程中接受客户端,然后将客户端套接字传输到子进程。
ServerSocketPipeFactory
实现 ServerSocketFactory
,允许它替代在同一进程内创建服务器套接字的工厂。
// parent.php use Amp\CancelledException; use Amp\Cluster\ClientSocketSendPipe; use Amp\Cluster\ServerSocketPipeProvider; use Amp\Parallel\Context\ProcessContextFactory; use Amp\Parallel\Ipc\LocalIpcHub; use Amp\SignalCancellation; use Revolt\EventLoop; use function Amp\async; use function Amp\Socket\listen; $ipcHub = new LocalIpcHub(); $serverProvider = new ServerSocketPipeProvider(); // Sharing the IpcHub instance with the context factory isn't required, // but reduces the number of opened sockets. $contextFactory = new ProcessContextFactory(ipcHub: $ipcHub); $context = $contextFactory->start(__DIR__ . '/child.php'); $connectionKey = $ipcHub->generateKey(); $context->send(['uri' => $ipcHub->getUri(), 'key' => $connectionKey]); // $socket will be a bidirectional socket to the child. $socket = $ipcHub->accept($connectionKey); // Listen for requests for server sockets on the given socket until cancelled by signal. try { $serverProvider->provideFor($socket, new SignalCancellation(SIGTERM)); } catch (CancelledException) { // Signal cancellation expected. }
// child.php use Amp\Cluster\ClientSocketReceivePipe; use Amp\Cluster\ServerSocketPipeFactory; use Amp\Sync\Channel; return function (Channel $channel): void { ['uri' => $uri, 'key' => $connectionKey] = $channel->receive(); // $socket will be a bidirectional socket to the parent. $socket = Amp\Parallel\Ipc\connect($uri, $connectionKey); $serverFactory = new ServerSocketPipeFactory($socket); // Requests the server socket from the parent process. $server = $serverFactory->listen('127.0.0.1:1337'); while ($client = $server->accept()) { // Handle client socket in a separate coroutine (fiber). async(function () use ($client) { /* ... */ }); } };
集群
集群是由使用 Cluster
的静态方法创建的,这些方法用于创建在作为集群运行时与父监视器进程通信的组件。当直接运行脚本时,某些 Cluster
方法也可能被调用,返回一个不需要监视器进程的独立组件。例如,Cluster::getServerSocketFactory()
在集群内运行时返回一个实例,它从监视器进程中创建并传输服务器套接字,或者当直接运行脚本时返回一个 ResourceSocketServerFactory
。
可以使用包含的可执行文件 vendor/bin/cluster
从命令行运行集群脚本,或者使用 ClusterWatcher
类在应用程序中程序化运行。
vendor/bin/cluster --workers=4 path/to/script.php
将此命令作为项目的依赖项安装后,将启动一个包含 4 个工作进程的集群,每个工作进程都在 path/to/script.php
运行脚本。
或者,您的应用程序可以使用 ClusterWatcher
从代码中启动集群。
use Amp\Cluster\ClusterWatcher; use Revolt\EventLoop; $watcher = new ClusterWatcher('path/to/script.php'); $watcher->start(4); // Start cluster with 4 workers. // Using a signal to stop the cluster for this example. EventLoop::onSignal(SIGTERM, fn () => $watcher->stop()); foreach ($watcher->getMessageIterator() as $message) { // Handle received message from worker. }
创建服务器
必须使用套接字服务器的 AMPHP 组件使用 Amp\Socket\SocketServerFactory
实例来创建这些套接字服务器。其中之一是位于 amphp/http-server
的 Amp\Http\Server\SocketHttpServer
。在集群脚本中,应该使用 Cluster::getServerSocketFactory()
来创建一个套接字工厂,该工厂将本地创建套接字或从集群监视器请求服务器套接字。
以下 示例 HTTP 服务器 展示了使用 Cluster::getServerSocketFactory()
来创建 ServerSocketFactory
实例,并在创建 SocketHttpServer
时提供它。
日志记录
可以使用 Cluster::createLogHandler()
将日志条目发送到集群监视器,以便将其记录到单个流中。此处理程序可以附加到 Monolog\Logger
实例。以下 示例 HTTP 服务器 根据脚本是否为集群工作进程或作为独立脚本运行创建一个日志处理程序。
Cluster::createLogHandler()
仅在作为集群的一部分运行集群脚本时调用。使用 Cluster::isWorker()
检查脚本是否作为集群工作进程运行。
进程终止
集群脚本可以使用 Cluster::awaitTermination()
等待来自信号(其中之一为 SIGTERM
、SIGINT
、SIGQUIT
或 SIGHUP
)的终止。
发送和接收消息
集群监视器和工作进程可以相互发送可序列化数据。集群监视器通过 ClusterWatcher::getMessageIterator()
返回的并发迭代器接收来自工作进程的消息。迭代器发出 ClusterWorkerMessage
实例,其中包含接收到的数据以及发送消息的 ClusterWorker
的引用,可以用于仅向该工作进程发送回复。集群监视器可以使用 ClusterWatcher::broadcast()
向所有工作进程广播消息。
工作进程可以使用从 Cluster::getChannel()
返回的 Channel
发送和接收消息。此方法只能在作为集群的一部分运行集群脚本时调用。使用 Cluster::isWorker()
检查脚本是否作为集群工作进程运行。
重启
可以随时调用 ClusterWatcher::restart()
来停止所有现有工作进程并创建新的工作进程以替换那些已停止的工作进程。当使用进程作为工作进程(即不使用通过 ext-parallel
的线程)时,工作进程中的代码将在新进程启动时重新加载。
IntelliJ / PhpStorm 中的热重载
当使用集群可执行文件(vendor/bin/cluster
)运行集群时,IntelliJ 的文件监视器可以用作触发器,在每次文件保存时自动将 SIGUSR1
信号发送给集群的监视器进程。在启动集群时,需要使用 --pid-file /path/to/file.pid
编写 PID 文件,然后根据以下设置在设置中设置文件监视器
- 程序:
bash
- 参数:
-c "if test -f ~/test-cluster.pid; then kill -10 $(cat ~/test-cluster.pid); fi"
示例 HTTP 服务器
以下示例(可在examples 目录中找到,文件名为simple-http-server.php)使用amphp/http-server
创建一个可以同时在任意数量的进程中运行的 HTTP 服务器。
<?php require __DIR__ . "/vendor/autoload.php"; use Amp\ByteStream; use Amp\Cluster\Cluster; use Amp\Http\Server\Driver\ConnectionLimitingServerSocketFactory; use Amp\Http\Server\Driver\SocketClientFactory; use Amp\Http\Server\RequestHandler\ClosureRequestHandler; use Amp\Http\Server\SocketHttpServer; use Amp\Log\ConsoleFormatter; use Amp\Log\StreamHandler; use Monolog\Logger; $id = Cluster::getContextId() ?? getmypid(); // Creating a log handler in this way allows the script to be run in a cluster or standalone. if (Cluster::isWorker()) { $handler = Cluster::createLogHandler(); } else { $handler = new StreamHandler(ByteStream\getStdout()); $handler->setFormatter(new ConsoleFormatter()); } $logger = new Logger('worker-' . $id); $logger->pushHandler($handler); $logger->useLoggingLoopDetection(false); // Cluster::getServerSocketFactory() will return a factory which creates the socket // locally or requests the server socket from the cluster watcher. $socketFactory = Cluster::getServerSocketFactory(); $clientFactory = new SocketClientFactory($logger); $httpServer = new SocketHttpServer($logger, $socketFactory, $clientFactory); $httpServer->expose('127.0.0.1:1337'); // Start the HTTP server $httpServer->start( new ClosureRequestHandler(function (): Response { return new Response(HttpStatus::OK, [ "content-type" => "text/plain; charset=utf-8", ], "Hello, World!"); }), new DefaultErrorHandler(), ); // Stop the server when the cluster watcher is terminated. Cluster::awaitTermination(); $server->stop();
版本控制
amphp/cluster
遵循与其他所有amphp
包相同的semver语义化版本规范。
安全性
如果您发现任何安全相关的问题,请使用私人安全问题报告者而不是公共问题跟踪器。
许可证
MIT 许可证(MIT)。更多信息请参阅LICENSE
。