amphp / websocket-server
Amp HTTP 服务器的 Websocket 服务器。
Requires
- php: >=8.1
- amphp/amp: ^3
- amphp/byte-stream: ^2.1
- amphp/http: ^2.1
- amphp/http-server: ^3.2
- amphp/socket: ^2.2
- amphp/websocket: ^2
- psr/log: ^1|^2|^3
- revolt/event-loop: ^1
Requires (Dev)
Suggests
- ext-zlib: Required for compression
This package is auto-updated.
Last update: 2024-09-17 00:25:57 UTC
README
AMPHP 是一个为 PHP 设计的事件驱动库集合,考虑到纤维和并发。这个库提供了一个 RequestHandler 来轻松使用 amphp/http-server 处理 WebSocket 连接。
要求
- PHP 8.1+
安装
此包可以作为 Composer 依赖安装。
composer require amphp/websocket-server
文档
此库的主要组件是 Websocket 类,它是来自 amphp/http-server 的 RequestHandler 接口的实现。使用 Websocket 请求处理器端点的端点会将传入的请求升级为 WebSocket 连接。
创建 Websocket 端点需要用户指定一些参数
- 将使用
Amp\Http\Server\HttpServer实例 - 一个 PSR-3 日志记录器实例
- 一个
WebsocketAcceptor来接受客户端连接 - 一个
WebsocketClientHandler来处理已接受的客户端连接 - 如果服务器上启用了压缩,则可选的
WebsocketCompressionContextFactory - 如果创建
WebsocketClient实例时需要自定义逻辑,则可选的WebsocketClientFactory
接受客户端连接
接受客户端连接是通过 WebsocketAcceptor 实例完成的。此库提供了两个实现
Rfc6455Acceptor:根据 RFC6455 接受客户端连接,没有进一步的限制。AllowOriginAcceptor:要求 HTTP 请求的"Origin"标头与构造函数提供的允许来源之一匹配。然后,接受连接将委托给另一个WebsocketAcceptor实现(默认为Rfc6455Acceptor)。
处理客户端连接
一旦建立连接,WebSocket 连接将由 WebsocketClientHandler 的实现来处理。您的应用程序逻辑将在这个接口的实现中。
WebsocketClientHandler 有一个必须实现的方法,即 handleClient()。
public function handleClient( WebsocketClient $client, Request $request, Response $response, ): void;
接受客户端连接后,WebsocketClientHandler::handleClient() 将使用 WebsocketClient 实例以及用于建立连接的 Request 和 Response 实例调用。
此方法应在客户端连接应关闭之前返回。不应从此方法中抛出异常。任何抛出的异常都将关闭连接,并带有 UNEXPECTED_SERVER_ERROR(1011)错误代码,并将异常转发到 HTTP 服务器记录器。有一个例外:当接收或发送消息到连接失败,因为连接已关闭时抛出的 WebsocketClosedException。如果从 handleClient() 中抛出 WebsocketClosedException,则忽略该异常。
网关
一个 WebsocketGateway 提供了一种将 WebSocket 客户端收集到相关组中的方法,以便有效地(和异步地)向多个客户端广播单个消息。本库提供的 WebsocketClientGateway 可以被一个或多个客户端处理器用来将来自一个或多个端点(如果需要,也可以在一个端点上使用多个)的客户端进行分组。下面是关于客户端处理器中网关基本用法的示例服务器。示例服务器。当客户端连接关闭时,添加到网关中的客户端将自动移除。
压缩
可以通过向 Websocket 构造函数传递一个 WebsocketCompressionContextFactory 实例来在单个 WebSocket 端点上选择性地启用消息压缩。目前,可用的唯一实现是 Rfc7692CompressionFactory,它基于 RFC-7692 进行压缩。
示例服务器
下面的服务器创建了一个简单的 WebSocket 端点,该端点将所有接收到的消息广播给所有其他已连接的客户端。使用 amphp/http-server-router 和 amphp/http-server-static-content 将 Websocket 处理器附加到特定路由,并在路由在路由器中未定义时从 /public 目录提供静态文件。
<?php // Note that this example requires amphp/http-server-router, // amphp/http-server-static-content and amphp/log to be installed. use Amp\Http\Server\DefaultErrorHandler; use Amp\Http\Server\Request; use Amp\Http\Server\Response; use Amp\Http\Server\Router; use Amp\Http\Server\SocketHttpServer; use Amp\Http\Server\StaticContent\DocumentRoot; use Amp\Log\ConsoleFormatter; use Amp\Log\StreamHandler; use Amp\Socket; use Amp\Websocket\Server\AllowOriginAcceptor; use Amp\Websocket\Server\Websocket; use Amp\Websocket\Server\WebsocketClientGateway; use Amp\Websocket\Server\WebsocketClientHandler; use Amp\Websocket\Server\WebsocketGateway; use Amp\Websocket\WebsocketClient; use Monolog\Logger; use function Amp\trapSignal; use function Amp\ByteStream\getStdout; require __DIR__ . '/../../vendor/autoload.php'; $logHandler = new StreamHandler(getStdout()); $logHandler->setFormatter(new ConsoleFormatter()); $logger = new Logger('server'); $logger->pushHandler($logHandler); $server = SocketHttpServer::createForDirectAccess($logger); $server->expose(new Socket\InternetAddress('127.0.0.1', 1337)); $server->expose(new Socket\InternetAddress('[::1]', 1337)); $errorHandler = new DefaultErrorHandler(); $acceptor = new AllowOriginAcceptor( ['https://:1337', 'http://127.0.0.1:1337', 'http://[::1]:1337'], ); $clientHandler = new class implements WebsocketClientHandler { public function __construct( private readonly WebsocketGateway $gateway = new WebsocketClientGateway(), ) { } public function handleClient( WebsocketClient $client, Request $request, Response $response, ): void { $this->gateway->addClient($client); foreach ($client as $message) { $this->gateway->broadcastText(sprintf( '%d: %s', $client->getId(), (string) $message, )); } } }; $websocket = new Websocket($server, $logger, $acceptor, $clientHandler); $router = new Router($server, $logger, $errorHandler); $router->addRoute('GET', '/broadcast', $websocket); $router->setFallback(new DocumentRoot($server, $errorHandler, __DIR__ . '/public')); $server->start($router, $errorHandler); // Await SIGINT or SIGTERM to be received. $signal = trapSignal([SIGINT, SIGTERM]); $logger->info(sprintf("Received signal %d, stopping HTTP server", $signal)); $server->stop();
版本控制
与所有其他 amphp 包一样,amphp/websocket-server 遵循 semver 语义版本规范。
安全
如果您发现任何安全问题,请使用私有安全问题报告员,而不是使用公共问题跟踪器。
许可证
MIT 许可证 (MIT)。有关更多信息,请参阅 LICENSE。