roadrunner-php / centrifugo
RoadRunner: Centrifugo 通道
v2.2.0
2024-07-22 07:42 UTC
Requires
- php: >=8.1
- ext-json: *
- google/protobuf: ^3.7 || ^4.0
- roadrunner-php/roadrunner-api-dto: ^1.0
- spiral/goridge: ^4.0
- spiral/roadrunner: ^2023.1 || ^2024.1
- spiral/roadrunner-worker: ^3.0
Requires (Dev)
- mockery/mockery: ^1.5
- phpunit/phpunit: ^10.0
- vimeo/psalm: >= 5.8
README
RoadRunner Centrifugo 通道
此存储库包含使用 RoadRunner centrifuge 插件的 PHP 通道代码库。
安装
要安装应用程序服务器和作业代码库
composer require roadrunner-php/centrifugo
您可以使用方便的安装程序下载 RoadRunner 集成最新兼容版本
composer require spiral/roadrunner-cli --dev vendor/bin/rr get
代理 Centrifugo 请求到 PHP 应用程序
可以代理一些客户端连接事件从 Centrifugo 到 RoadRunner 应用程序服务器,并以自定义方式对其做出反应。例如,可以通过从 Centrifugo 到应用程序后端的请求进行连接身份验证,刷新客户端会话,并响应客户端通过双向连接发送的 RPC 调用。
可以代理的事件列表
connect
– 当客户端连接到 Centrifugo 时调用,因此可以身份验证用户,向客户端返回自定义数据,订阅多个通道,附加元信息到连接等。适用于双向和单向传输。refresh
- 当客户端会话即将过期时调用,因此可以延长其有效期或让其过期。也可以仅用作 Centrifugo 到应用程序后端的双向连接的周期性连接存活回调。适用于双向和单向传输。sub_refresh
- 当需要刷新订阅时调用。Centrifugo 本身将询问您的后端关于订阅的有效性,而不是在客户端进行订阅刷新工作流程。subscribe
- 当客户端尝试订阅通道时调用,因此可以检查权限并返回自定义初始订阅数据。仅适用于双向传输。publish
- 当客户端尝试将内容发布到通道时调用,因此可以检查权限并可选地修改发布数据。仅适用于双向传输。rpc
- 当客户端发送 RPC 时调用,您可以根据客户端提供的 RPC 方法及其参数执行所需的任何逻辑。仅适用于双向传输。
首先,您需要将 centrifuge
部分添加到您的 RoadRunner 配置中。例如,以下配置是可行的
rpc: listen: tcp://127.0.0.1:6001 server: command: "php app.php" relay: pipes centrifuge: proxy_address: "tcp://0.0.0.0:10001" # Centrifugo address
和 centrifugo 配置
{ "admin": true, "api_key": "secret", "admin_password": "password", "admin_secret": "admin_secret", "allowed_origins": [ "*" ], "token_hmac_secret_key": "test", "publish": true, "proxy_publish": true, "proxy_subscribe": true, "proxy_connect": true, "allow_subscribe_for_client": true, "proxy_connect_endpoint": "grpc://127.0.0.1:10001", "proxy_connect_timeout": "10s", "proxy_publish_endpoint": "grpc://127.0.0.1:10001", "proxy_publish_timeout": "10s", "proxy_subscribe_endpoint": "grpc://127.0.0.1:10001", "proxy_subscribe_timeout": "10s", "proxy_refresh_endpoint": "grpc://127.0.0.1:10001", "proxy_refresh_timeout": "10s", "proxy_sub_refresh_endpoint": "grpc://127.0.0.1:10001", "proxy_sub_refresh_timeout": "1s", "proxy_rpc_endpoint": "grpc://127.0.0.1:10001", "proxy_rpc_timeout": "10s" }
注意
proxy_connect_endpoint
、proxy_publish_endpoint
、proxy_subscribe_endpoint
、proxy_refresh_endpoint
、proxy_sub_refresh_endpoint
、proxy_rpc_endpoint
- 激活centrifuge
插件的 RoadRunner 服务器的端点地址。
初始化抽象 RoadRunner 工作进程
<?php require __DIR__ . '/vendor/autoload.php'; use RoadRunner\Centrifugo\CentrifugoWorker; use RoadRunner\Centrifugo\Payload; use RoadRunner\Centrifugo\Request; use RoadRunner\Centrifugo\Request\RequestFactory; use Spiral\RoadRunner\Worker; $worker = Worker::create(); $requestFactory = new RequestFactory($worker); // Create a new Centrifugo Worker from global environment $centrifugoWorker = new CentrifugoWorker($worker, $requestFactory); while ($request = $centrifugoWorker->waitRequest()) { if ($request instanceof Request\Invalid) { $errorMessage = $request->getException()->getMessage(); if ($request->getException() instanceof \RoadRunner\Centrifugo\Exception\InvalidRequestTypeException) { $payload = $request->getException()->payload; } // Handle invalid request // $logger->error($errorMessage, $payload ?? []); continue; } if ($request instanceof Request\Refresh) { try { // Do something $request->respond(new Payload\RefreshResponse( // ... )); } catch (\Throwable $e) { $request->error($e->getCode(), $e->getMessage()); } continue; } if ($request instanceof Request\Subscribe) { try { // Do something $request->respond(new Payload\SubscribeResponse( // ... )); // You can also disconnect connection $request->disconnect('500', 'Connection is not allowed.'); } catch (\Throwable $e) { $request->error($e->getCode(), $e->getMessage()); } continue; } if ($request instanceof Request\Publish) { try { // Do something $request->respond(new Payload\PublishResponse( // ... )); // You can also disconnect connection $request->disconnect('500', 'Connection is not allowed.'); } catch (\Throwable $e) { $request->error($e->getCode(), $e->getMessage()); } continue; } if ($request instanceof Request\RPC) { try { $response = $router->handle( new Request(uri: $request->method, data: $request->data), ); // ['user' => ['id' => 1, 'username' => 'john_smith']] $request->respond(new Payload\RPCResponse( data: $response )); } catch (\Throwable $e) { $request->error($e->getCode(), $e->getMessage()); } continue; } }
注意 您可以在 这里 找到更多信息。
许可证
MIT 许可证 (MIT)。有关更多信息,请参阅 LICENSE
。由 Spiral Scout 维护。