amphp/websocket-server

Amp HTTP 服务器的 Websocket 服务器。

资助包维护!
amphp

安装次数: 154,804

依赖: 19

建议者: 1

安全: 0

星标: 114

关注者: 11

分支: 17

开放问题: 1

v4.0.0 2023-12-29 00:58 UTC

README

AMPHP 是一个为 PHP 设计的事件驱动库集合,考虑到纤维和并发。这个库提供了一个 RequestHandler 来轻松使用 amphp/http-server 处理 WebSocket 连接。

要求

  • PHP 8.1+

安装

此包可以作为 Composer 依赖安装。

composer require amphp/websocket-server

文档

此库的主要组件是 Websocket 类,它是来自 amphp/http-serverRequestHandler 接口的实现。使用 Websocket 请求处理器端点的端点会将传入的请求升级为 WebSocket 连接。

创建 Websocket 端点需要用户指定一些参数

  • 将使用 Amp\Http\Server\HttpServer 实例
  • 一个 PSR-3 日志记录器实例
  • 一个 WebsocketAcceptor 来接受客户端连接
  • 一个 WebsocketClientHandler 来处理已接受的客户端连接
  • 如果服务器上启用了压缩,则可选的 WebsocketCompressionContextFactory
  • 如果创建 WebsocketClient 实例时需要自定义逻辑,则可选的 WebsocketClientFactory

接受客户端连接

接受客户端连接是通过 WebsocketAcceptor 实例完成的。此库提供了两个实现

  • Rfc6455Acceptor:根据 RFC6455 接受客户端连接,没有进一步的限制。
  • AllowOriginAcceptor:要求 HTTP 请求的 "Origin" 标头与构造函数提供的允许来源之一匹配。然后,接受连接将委托给另一个 WebsocketAcceptor 实现(默认为 Rfc6455Acceptor)。

处理客户端连接

一旦建立连接,WebSocket 连接将由 WebsocketClientHandler 的实现来处理。您的应用程序逻辑将在这个接口的实现中。

WebsocketClientHandler 有一个必须实现的方法,即 handleClient()

public function handleClient(
    WebsocketClient $client,
    Request $request,
    Response $response,
): void;

接受客户端连接后,WebsocketClientHandler::handleClient() 将使用 WebsocketClient 实例以及用于建立连接的 RequestResponse 实例调用。

此方法应在客户端连接应关闭之前返回。不应从此方法中抛出异常。任何抛出的异常都将关闭连接,并带有 UNEXPECTED_SERVER_ERROR(1011)错误代码,并将异常转发到 HTTP 服务器记录器。有一个例外:当接收或发送消息到连接失败,因为连接已关闭时抛出的 WebsocketClosedException。如果从 handleClient() 中抛出 WebsocketClosedException,则忽略该异常。

网关

一个 WebsocketGateway 提供了一种将 WebSocket 客户端收集到相关组中的方法,以便有效地(和异步地)向多个客户端广播单个消息。本库提供的 WebsocketClientGateway 可以被一个或多个客户端处理器用来将来自一个或多个端点(如果需要,也可以在一个端点上使用多个)的客户端进行分组。下面是关于客户端处理器中网关基本用法的示例服务器。示例服务器。当客户端连接关闭时,添加到网关中的客户端将自动移除。

压缩

可以通过向 Websocket 构造函数传递一个 WebsocketCompressionContextFactory 实例来在单个 WebSocket 端点上选择性地启用消息压缩。目前,可用的唯一实现是 Rfc7692CompressionFactory,它基于 RFC-7692 进行压缩。

示例服务器

下面的服务器创建了一个简单的 WebSocket 端点,该端点将所有接收到的消息广播给所有其他已连接的客户端。使用 amphp/http-server-routeramphp/http-server-static-contentWebsocket 处理器附加到特定路由,并在路由在路由器中未定义时从 /public 目录提供静态文件。

<?php

// Note that this example requires amphp/http-server-router,
// amphp/http-server-static-content and amphp/log to be installed.

use Amp\Http\Server\DefaultErrorHandler;
use Amp\Http\Server\Request;
use Amp\Http\Server\Response;
use Amp\Http\Server\Router;
use Amp\Http\Server\SocketHttpServer;
use Amp\Http\Server\StaticContent\DocumentRoot;
use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler;
use Amp\Socket;
use Amp\Websocket\Server\AllowOriginAcceptor;
use Amp\Websocket\Server\Websocket;
use Amp\Websocket\Server\WebsocketClientGateway;
use Amp\Websocket\Server\WebsocketClientHandler;
use Amp\Websocket\Server\WebsocketGateway;
use Amp\Websocket\WebsocketClient;
use Monolog\Logger;
use function Amp\trapSignal;
use function Amp\ByteStream\getStdout;

require __DIR__ . '/../../vendor/autoload.php';

$logHandler = new StreamHandler(getStdout());
$logHandler->setFormatter(new ConsoleFormatter());
$logger = new Logger('server');
$logger->pushHandler($logHandler);

$server = SocketHttpServer::createForDirectAccess($logger);

$server->expose(new Socket\InternetAddress('127.0.0.1', 1337));
$server->expose(new Socket\InternetAddress('[::1]', 1337));

$errorHandler = new DefaultErrorHandler();

$acceptor = new AllowOriginAcceptor(
    ['http://localhost:1337', 'http://127.0.0.1:1337', 'http://[::1]:1337'],
);

$clientHandler = new class implements WebsocketClientHandler {
    public function __construct(
        private readonly WebsocketGateway $gateway = new WebsocketClientGateway(),
    ) {
    }

    public function handleClient(
        WebsocketClient $client,
        Request $request,
        Response $response,
    ): void {
        $this->gateway->addClient($client);

        foreach ($client as $message) {
            $this->gateway->broadcastText(sprintf(
                '%d: %s',
                $client->getId(),
                (string) $message,
            ));
        }
    }
};

$websocket = new Websocket($server, $logger, $acceptor, $clientHandler);

$router = new Router($server, $logger, $errorHandler);
$router->addRoute('GET', '/broadcast', $websocket);
$router->setFallback(new DocumentRoot($server, $errorHandler, __DIR__ . '/public'));

$server->start($router, $errorHandler);

// Await SIGINT or SIGTERM to be received.
$signal = trapSignal([SIGINT, SIGTERM]);

$logger->info(sprintf("Received signal %d, stopping HTTP server", $signal));

$server->stop();

版本控制

与所有其他 amphp 包一样,amphp/websocket-server 遵循 semver 语义版本规范。

安全

如果您发现任何安全问题,请使用私有安全问题报告员,而不是使用公共问题跟踪器。

许可证

MIT 许可证 (MIT)。有关更多信息,请参阅 LICENSE