amphp / websocket-server
Amp HTTP 服务器的 Websocket 服务器。
Requires
- php: >=8.1
- amphp/amp: ^3
- amphp/byte-stream: ^2.1
- amphp/http: ^2.1
- amphp/http-server: ^3.2
- amphp/socket: ^2.2
- amphp/websocket: ^2
- psr/log: ^1|^2|^3
- revolt/event-loop: ^1
Requires (Dev)
Suggests
- ext-zlib: Required for compression
This package is auto-updated.
Last update: 2024-09-17 00:25:57 UTC
README
AMPHP 是一个为 PHP 设计的事件驱动库集合,考虑到纤维和并发。这个库提供了一个 RequestHandler
来轻松使用 amphp/http-server
处理 WebSocket 连接。
要求
- PHP 8.1+
安装
此包可以作为 Composer 依赖安装。
composer require amphp/websocket-server
文档
此库的主要组件是 Websocket
类,它是来自 amphp/http-server
的 RequestHandler
接口的实现。使用 Websocket
请求处理器端点的端点会将传入的请求升级为 WebSocket 连接。
创建 Websocket
端点需要用户指定一些参数
- 将使用
Amp\Http\Server\HttpServer
实例 - 一个 PSR-3 日志记录器实例
- 一个
WebsocketAcceptor
来接受客户端连接 - 一个
WebsocketClientHandler
来处理已接受的客户端连接 - 如果服务器上启用了压缩,则可选的
WebsocketCompressionContextFactory
- 如果创建
WebsocketClient
实例时需要自定义逻辑,则可选的WebsocketClientFactory
接受客户端连接
接受客户端连接是通过 WebsocketAcceptor
实例完成的。此库提供了两个实现
Rfc6455Acceptor
:根据 RFC6455 接受客户端连接,没有进一步的限制。AllowOriginAcceptor
:要求 HTTP 请求的"Origin"
标头与构造函数提供的允许来源之一匹配。然后,接受连接将委托给另一个WebsocketAcceptor
实现(默认为Rfc6455Acceptor
)。
处理客户端连接
一旦建立连接,WebSocket 连接将由 WebsocketClientHandler
的实现来处理。您的应用程序逻辑将在这个接口的实现中。
WebsocketClientHandler
有一个必须实现的方法,即 handleClient()
。
public function handleClient( WebsocketClient $client, Request $request, Response $response, ): void;
接受客户端连接后,WebsocketClientHandler::handleClient()
将使用 WebsocketClient
实例以及用于建立连接的 Request
和 Response
实例调用。
此方法应在客户端连接应关闭之前返回。不应从此方法中抛出异常。任何抛出的异常都将关闭连接,并带有 UNEXPECTED_SERVER_ERROR
(1011)错误代码,并将异常转发到 HTTP 服务器记录器。有一个例外:当接收或发送消息到连接失败,因为连接已关闭时抛出的 WebsocketClosedException
。如果从 handleClient()
中抛出 WebsocketClosedException
,则忽略该异常。
网关
一个 WebsocketGateway
提供了一种将 WebSocket 客户端收集到相关组中的方法,以便有效地(和异步地)向多个客户端广播单个消息。本库提供的 WebsocketClientGateway
可以被一个或多个客户端处理器用来将来自一个或多个端点(如果需要,也可以在一个端点上使用多个)的客户端进行分组。下面是关于客户端处理器中网关基本用法的示例服务器。示例服务器。当客户端连接关闭时,添加到网关中的客户端将自动移除。
压缩
可以通过向 Websocket
构造函数传递一个 WebsocketCompressionContextFactory
实例来在单个 WebSocket 端点上选择性地启用消息压缩。目前,可用的唯一实现是 Rfc7692CompressionFactory
,它基于 RFC-7692 进行压缩。
示例服务器
下面的服务器创建了一个简单的 WebSocket 端点,该端点将所有接收到的消息广播给所有其他已连接的客户端。使用 amphp/http-server-router
和 amphp/http-server-static-content
将 Websocket
处理器附加到特定路由,并在路由在路由器中未定义时从 /public
目录提供静态文件。
<?php // Note that this example requires amphp/http-server-router, // amphp/http-server-static-content and amphp/log to be installed. use Amp\Http\Server\DefaultErrorHandler; use Amp\Http\Server\Request; use Amp\Http\Server\Response; use Amp\Http\Server\Router; use Amp\Http\Server\SocketHttpServer; use Amp\Http\Server\StaticContent\DocumentRoot; use Amp\Log\ConsoleFormatter; use Amp\Log\StreamHandler; use Amp\Socket; use Amp\Websocket\Server\AllowOriginAcceptor; use Amp\Websocket\Server\Websocket; use Amp\Websocket\Server\WebsocketClientGateway; use Amp\Websocket\Server\WebsocketClientHandler; use Amp\Websocket\Server\WebsocketGateway; use Amp\Websocket\WebsocketClient; use Monolog\Logger; use function Amp\trapSignal; use function Amp\ByteStream\getStdout; require __DIR__ . '/../../vendor/autoload.php'; $logHandler = new StreamHandler(getStdout()); $logHandler->setFormatter(new ConsoleFormatter()); $logger = new Logger('server'); $logger->pushHandler($logHandler); $server = SocketHttpServer::createForDirectAccess($logger); $server->expose(new Socket\InternetAddress('127.0.0.1', 1337)); $server->expose(new Socket\InternetAddress('[::1]', 1337)); $errorHandler = new DefaultErrorHandler(); $acceptor = new AllowOriginAcceptor( ['http://localhost:1337', 'http://127.0.0.1:1337', 'http://[::1]:1337'], ); $clientHandler = new class implements WebsocketClientHandler { public function __construct( private readonly WebsocketGateway $gateway = new WebsocketClientGateway(), ) { } public function handleClient( WebsocketClient $client, Request $request, Response $response, ): void { $this->gateway->addClient($client); foreach ($client as $message) { $this->gateway->broadcastText(sprintf( '%d: %s', $client->getId(), (string) $message, )); } } }; $websocket = new Websocket($server, $logger, $acceptor, $clientHandler); $router = new Router($server, $logger, $errorHandler); $router->addRoute('GET', '/broadcast', $websocket); $router->setFallback(new DocumentRoot($server, $errorHandler, __DIR__ . '/public')); $server->start($router, $errorHandler); // Await SIGINT or SIGTERM to be received. $signal = trapSignal([SIGINT, SIGTERM]); $logger->info(sprintf("Received signal %d, stopping HTTP server", $signal)); $server->stop();
版本控制
与所有其他 amphp
包一样,amphp/websocket-server
遵循 semver 语义版本规范。
安全
如果您发现任何安全问题,请使用私有安全问题报告员,而不是使用公共问题跟踪器。
许可证
MIT 许可证 (MIT)。有关更多信息,请参阅 LICENSE
。