roadrunner-php/centrifugo

RoadRunner: Centrifugo 通道

v2.2.0 2024-07-22 07:42 UTC

This package is auto-updated.

Last update: 2024-09-22 08:08:26 UTC


README

RoadRunner Centrifugo 通道

PHP Version Require Latest Stable Version phpunit psalm Codecov Total Downloads

此存储库包含使用 RoadRunner centrifuge 插件的 PHP 通道代码库。

安装

要安装应用程序服务器和作业代码库

composer require roadrunner-php/centrifugo

您可以使用方便的安装程序下载 RoadRunner 集成最新兼容版本

composer require spiral/roadrunner-cli --dev
vendor/bin/rr get

代理 Centrifugo 请求到 PHP 应用程序

可以代理一些客户端连接事件从 Centrifugo 到 RoadRunner 应用程序服务器,并以自定义方式对其做出反应。例如,可以通过从 Centrifugo 到应用程序后端的请求进行连接身份验证,刷新客户端会话,并响应客户端通过双向连接发送的 RPC 调用。

可以代理的事件列表

  • connect – 当客户端连接到 Centrifugo 时调用,因此可以身份验证用户,向客户端返回自定义数据,订阅多个通道,附加元信息到连接等。适用于双向和单向传输。
  • refresh - 当客户端会话即将过期时调用,因此可以延长其有效期或让其过期。也可以仅用作 Centrifugo 到应用程序后端的双向连接的周期性连接存活回调。适用于双向和单向传输。
  • sub_refresh - 当需要刷新订阅时调用。Centrifugo 本身将询问您的后端关于订阅的有效性,而不是在客户端进行订阅刷新工作流程。
  • subscribe - 当客户端尝试订阅通道时调用,因此可以检查权限并返回自定义初始订阅数据。仅适用于双向传输。
  • publish - 当客户端尝试将内容发布到通道时调用,因此可以检查权限并可选地修改发布数据。仅适用于双向传输。
  • rpc - 当客户端发送 RPC 时调用,您可以根据客户端提供的 RPC 方法及其参数执行所需的任何逻辑。仅适用于双向传输。

首先,您需要将 centrifuge 部分添加到您的 RoadRunner 配置中。例如,以下配置是可行的

rpc:
  listen: tcp://127.0.0.1:6001

server:
  command: "php app.php"
  relay: pipes

centrifuge:
  proxy_address: "tcp://0.0.0.0:10001" # Centrifugo address

和 centrifugo 配置

{
  "admin": true,
  "api_key": "secret",
  "admin_password": "password",
  "admin_secret": "admin_secret",
  "allowed_origins": [
    "*"
  ],
  "token_hmac_secret_key": "test",
  "publish": true,
  "proxy_publish": true,
  "proxy_subscribe": true,
  "proxy_connect": true,
  "allow_subscribe_for_client": true,
  "proxy_connect_endpoint": "grpc://127.0.0.1:10001",
  "proxy_connect_timeout": "10s",
  "proxy_publish_endpoint": "grpc://127.0.0.1:10001",
  "proxy_publish_timeout": "10s",
  "proxy_subscribe_endpoint": "grpc://127.0.0.1:10001",
  "proxy_subscribe_timeout": "10s",
  "proxy_refresh_endpoint": "grpc://127.0.0.1:10001",
  "proxy_refresh_timeout": "10s",
  "proxy_sub_refresh_endpoint": "grpc://127.0.0.1:10001",
  "proxy_sub_refresh_timeout": "1s",
  "proxy_rpc_endpoint": "grpc://127.0.0.1:10001",
  "proxy_rpc_timeout": "10s"
}

注意 proxy_connect_endpointproxy_publish_endpointproxy_subscribe_endpointproxy_refresh_endpointproxy_sub_refresh_endpointproxy_rpc_endpoint - 激活 centrifuge 插件的 RoadRunner 服务器的端点地址。

初始化抽象 RoadRunner 工作进程

<?php

require __DIR__ . '/vendor/autoload.php';

use RoadRunner\Centrifugo\CentrifugoWorker;
use RoadRunner\Centrifugo\Payload;
use RoadRunner\Centrifugo\Request;
use RoadRunner\Centrifugo\Request\RequestFactory;
use Spiral\RoadRunner\Worker;

$worker = Worker::create();
$requestFactory = new RequestFactory($worker);

// Create a new Centrifugo Worker from global environment
$centrifugoWorker = new CentrifugoWorker($worker, $requestFactory);

while ($request = $centrifugoWorker->waitRequest()) {

    if ($request instanceof Request\Invalid) {
        $errorMessage = $request->getException()->getMessage();

        if ($request->getException() instanceof \RoadRunner\Centrifugo\Exception\InvalidRequestTypeException) {
            $payload = $request->getException()->payload;
        }

        // Handle invalid request
        // $logger->error($errorMessage, $payload ?? []);

        continue;
    }

    if ($request instanceof Request\Refresh) {
        try {
            // Do something
            $request->respond(new Payload\RefreshResponse(
                // ...
            ));
        } catch (\Throwable $e) {
            $request->error($e->getCode(), $e->getMessage());
        }

        continue;
    }

    if ($request instanceof Request\Subscribe) {
        try {
            // Do something
            $request->respond(new Payload\SubscribeResponse(
                // ...
            ));

            // You can also disconnect connection
            $request->disconnect('500', 'Connection is not allowed.');
        } catch (\Throwable $e) {
            $request->error($e->getCode(), $e->getMessage());
        }

        continue;
    }

    if ($request instanceof Request\Publish) {
        try {
            // Do something
            $request->respond(new Payload\PublishResponse(
                // ...
            ));

            // You can also disconnect connection
            $request->disconnect('500', 'Connection is not allowed.');
        } catch (\Throwable $e) {
            $request->error($e->getCode(), $e->getMessage());
        }

        continue;
    }

    if ($request instanceof Request\RPC) {
        try {
            $response = $router->handle(
                new Request(uri: $request->method, data: $request->data),
            ); // ['user' => ['id' => 1, 'username' => 'john_smith']]

            $request->respond(new Payload\RPCResponse(
                data: $response
            ));
        } catch (\Throwable $e) {
            $request->error($e->getCode(), $e->getMessage());
        }

        continue;
    }
}

注意 您可以在 这里 找到更多信息。

try Spiral Framework

许可证

MIT 许可证 (MIT)。有关更多信息,请参阅 LICENSE。由 Spiral Scout 维护。