codememory / ws-server-bundle
该软件包最新版本(v2.0)没有提供许可信息。
v2.0
2023-09-18 17:52 UTC
Requires
- php: >=8.1
- ext-openswoole: *
- openswoole/core: ^22.1
- symfony/console: 6.2.*
- symfony/event-dispatcher: 6.2.*
Requires (Dev)
This package is auto-updated.
Last update: 2024-09-18 19:56:59 UTC
README
这个pandle是一个简单的解决方案,可以在几秒钟内启动WebSocket服务器,默认情况下,它支持Swoole作为最强大的框架之一
安装
$ composer require codememory/ws-server-bundle
如果symfony flex没有为您注册此捆绑包,请注册此捆绑包
// config/bundles.php <?php return [ ... Codememory\WebSocketServerBundle\WebSocketServerBundle::class => ['all' => true] ];
配置
-
服务器:
- 适配器:如果您决定实现自己的服务器,则使用服务适配器。默认值:“Swoole”,默认服务:“WebSocketServerBundle::DEFAULT_SERVER_SERVICE”
- 协议:服务器协议。默认值:“websocket”
- 主机:服务器主机。默认值:“127.0.0.1”
- 端口:服务器端口。默认值:“8079”
-
事件监听器:消息监听器集合
- { event: "TEST", listener: "App\WebSocketEventListeners\TestHandler" }:示例事件监听器
-
配置:服务器配置,根据服务器不同,默认为Swoole,因此请查看swoole文档。默认值:“[]”
默认等待消息
{ "event": "MESSAGE_EVENT_NAME", "data": {} }
事件监听器处理程序的示例实现
namespace App\WebSocket\EventListeners; use Codememory\WebSocketServerBundle\Interfaces\MessageEventListenerInterface; use Codememory\WebSocketServerBundle\Interfaces\MessageInterface; use Codememory\WebSocketServerBundle\Interfaces\ServerInterface; final class TestHandler implements MessageEventListenerInterface { public function handle(ServerInterface $server, MessageInterface $message) : void { // Reply to a message with event "RESPONSE_EVENT" $server->sendMessage($message->getSenderConnectionID(), 'RESPONSE_EVENT', [ 'message' => 'Hello World' ]); } } // Don't forget to register this listeners in the bundle configuration
注册事件监听器
# config/packages/codememory_ws_server.yaml codememory_ws_server: event_listeners: - { event: 'TEST', listener: App\WebSocket\EventListeners\TestHandler }
事件
-
codememory.ws_server.connection_open:连接打开
- class: Codememory\WebSocketServerBundle\Event\ConnectionOpenEvent
-
codememory.ws_server.connection_closed:连接关闭
- class: Codememory\WebSocketServerBundle\Event\ConnectionClosedEvent
-
codememory.ws_server.message:从连接收到新消息
- class: Codememory\WebSocketServerBundle\Event\MessageEvent
-
codememory.ws_server.message_handler_exception:在消息监听器调用期间处理异常
- class: Codememory\WebSocketServerBundle\Event\MessageHandlerExceptionEvent
-
codememory.ws_server.message_sent:已向连接发送消息
- class: Codememory\WebSocketServerBundle\Event\MessageSentEvent
-
codememory.ws_server.start_server:服务器启动。通常,在此事件中,添加必要的子进程
- class: Codememory\WebSocketServerBundle\Event\StartServerEvent
-
codememory.ws_server.start_worker:服务器启动工作进程开始启动
- class: Codememory\WebSocketServerBundle\Event\StartWorkerEvent
让我们实现通过数据库中的ID向特定用户发送消息的任务
首先,让我们创建一个用于打开连接事件的监听器,并将所有连接保存到redis中
<?php use Codememory\WebSocketServerBundle\Event\ConnectionOpenEvent; use Predis\Client; use Symfony\Component\EventDispatcher\Attribute\AsEventListener; #[AsEventListener(ConnectionOpenEvent::NAME, 'onOpen')] readonly class SaveConnectionToRedisEventListener { public function __construct( private Client $client ) { } public function onOpen(ConnectionOpenEvent $event): void { // We save the new connection in the hash table $this->client->hset('websocket:connections', $event->connectionID, json_encode([ 'connection_id' => $event->connectionID, 'websocket_sec_key' => $event->secWebsocketKey ])); } }
现在让我们在CONNECT消息上创建一个事件监听器
请注意,在当前入门示例中,我们不会使用JWT,而是立即获取用户ID。在你的例子中,你可以在传递user_id时,传递JWT令牌,检查其有效性,并从其中获取user_id
<?php use Codememory\WebSocketServerBundle\Interfaces\MessageEventListenerInterface; use Codememory\WebSocketServerBundle\Interfaces\MessageInterface; use Codememory\WebSocketServerBundle\Interfaces\ServerInterface; use Predis\Client; readonly class ConnectEventListener implements MessageEventListenerInterface { public function __construct( private Client $client ) { } public function handle(ServerInterface $server, MessageInterface $message): void { $data = $message->getData(); if (array_key_exists('user_id', $data) && is_int($data['user_id'])) { // Here we bind the user to the ws connection and save it to a new hash table $this->client->hset($this->buildKey($data['user_id']), $message->getSenderConnectionID(), json_encode([ 'timestamp' => time() ])); } } private function buildKey(int $userId): string { return "websocket:user:$userId:connections"; } } // Don't worry about registering this EventListener in codememory_ws_server.yaml
现在让我们创建一个管理器,将需要发送给特定用户的消息保存到队列中
<?php use Predis\Client; final readonly class WebSocketMessageQueueManager { public const HASH_TABLE_NAME = 'websocket:queue:messages'; public function __construct( private Client $client ) { } public function sendMessage(int $userId, string $event, array $data): void { // We get all ws connections by user ID $connections = $this->client->hgetall("websocket:user:$userId:connections"); foreach ($connections as $id => $userConnectionData) { // Receiving information about the connection by connection identifier $connection = $this->client->hget('websocket:connections', $id); if (null !== $connection) { $connectionData = json_decode($connection, true); // We save the message in a hash table, as the key we indicate the connection ID to which we need to send and its websocket-sec-key (to ensure security) $this->client->hset( self::HASH_TABLE_NAME, $this->buildMessageField($id, $connectionData['websocket_sec_key']), json_encode([ 'event' => $event, 'data' => $data ]) ); } } } private function buildMessageField(int $connectionId, string $webSocketSecKey): string { return "{$connectionId}_{$webSocketSecKey}"; } }
最后一步,我们将添加一个进程,该进程将监视redis并检查是否存在需要发送给用户的消息
<?php use App\Services\WebSocketMessageQueueManager; use Codememory\WebSocketServerBundle\Event\StartServerEvent; use Predis\Client; use Psr\Log\LoggerInterface; use Symfony\Component\EventDispatcher\Attribute\AsEventListener; use Throwable; #[AsEventListener(StartServerEvent::NAME, 'onStart')] final readonly class ProcessForSendingMessagesFromQueueEventListener { public function __construct( private Client $client, private LoggerInterface $logger ) { } public function onStart(StartServerEvent $event): void { try { $event->server->addProcess(function () use ($event) { // Receive all messages from the queue $messages = $this->client->hgetall(WebSocketMessageQueueManager::HASH_TABLE_NAME); foreach ($messages as $for => $message) { [$connectionID, $webSocketSecKey] = explode('_', $for); // We check that the message that was added to the queue belongs to the same connection that is connected if ($this->connectionCheck($connectionID, $webSocketSecKey)) { $message = json_decode($message, true); $event->server->sendMessage($connectionID, $message['event'], $message['data']); // We remove the message from the queue so that it is not sent again $this->client->hdel(WebSocketMessageQueueManager::HASH_TABLE_NAME, [$for]); } } }); } catch (Throwable $e) { $this->logger->critical($e, [ 'origin' => self::class, 'detail' => 'An error occurred while adding a process to send messages from a queue.' ]); } } private function connectionCheck(int $connectionID, string $webSocketSecKey): bool { $connection = $this->client->hget('websocket:connections', $connectionID); if (null !== $connection) { $connectionData = json_decode($connection, true); if ($connectionData['websocket_sec_key'] === $webSocketSecKey) { return true; } } return false; } }
这就结束了,这个示例并不是理想的,需要进行修改,并取决于你的需求
现在,如果我们想向ID为500的用户发送消息,我们只需在任何代码位置使用我们的管理器并调用sendMessage方法即可