codememory/ws-server-bundle

该软件包最新版本(v2.0)没有提供许可信息。

安装: 19

依赖项: 0

建议者: 0

安全: 0

星标: 0

关注者: 1

分支: 0

开放问题: 0

类型:symfony-bundle

v2.0 2023-09-18 17:52 UTC

This package is auto-updated.

Last update: 2024-09-18 19:56:59 UTC


README

这个pandle是一个简单的解决方案,可以在几秒钟内启动WebSocket服务器,默认情况下,它支持Swoole作为最强大的框架之一

安装

$ composer require codememory/ws-server-bundle

如果symfony flex没有为您注册此捆绑包,请注册此捆绑包

// config/bundles.php

<?php

return [
    ...
    Codememory\WebSocketServerBundle\WebSocketServerBundle::class => ['all' => true]
];

配置

  • 服务器:

    • 适配器:如果您决定实现自己的服务器,则使用服务适配器。默认值:“Swoole”,默认服务:“WebSocketServerBundle::DEFAULT_SERVER_SERVICE
    • 协议:服务器协议。默认值:“websocket
    • 主机:服务器主机。默认值:“127.0.0.1
    • 端口:服务器端口。默认值:“8079
  • 事件监听器:消息监听器集合

    • { event: "TEST", listener: "App\WebSocketEventListeners\TestHandler" }:示例事件监听器
  • 配置:服务器配置,根据服务器不同,默认为Swoole,因此请查看swoole文档。默认值:“[]

默认等待消息

{
  "event": "MESSAGE_EVENT_NAME",
  "data": {}
}

事件监听器处理程序的示例实现

namespace App\WebSocket\EventListeners;

use Codememory\WebSocketServerBundle\Interfaces\MessageEventListenerInterface;
use Codememory\WebSocketServerBundle\Interfaces\MessageInterface;
use Codememory\WebSocketServerBundle\Interfaces\ServerInterface;

final class TestHandler implements MessageEventListenerInterface 
{
    public function handle(ServerInterface $server, MessageInterface $message) : void
    {
        // Reply to a message with event "RESPONSE_EVENT"
        $server->sendMessage($message->getSenderConnectionID(), 'RESPONSE_EVENT', [
            'message' => 'Hello World'
        ]);
    }
}

// Don't forget to register this listeners in the bundle configuration

注册事件监听器

# config/packages/codememory_ws_server.yaml
codememory_ws_server:
  event_listeners:
    - { event: 'TEST', listener: App\WebSocket\EventListeners\TestHandler }

事件

  • codememory.ws_server.connection_open:连接打开

    • class: Codememory\WebSocketServerBundle\Event\ConnectionOpenEvent
  • codememory.ws_server.connection_closed:连接关闭

    • class: Codememory\WebSocketServerBundle\Event\ConnectionClosedEvent
  • codememory.ws_server.message:从连接收到新消息

    • class: Codememory\WebSocketServerBundle\Event\MessageEvent
  • codememory.ws_server.message_handler_exception:在消息监听器调用期间处理异常

    • class: Codememory\WebSocketServerBundle\Event\MessageHandlerExceptionEvent
  • codememory.ws_server.message_sent:已向连接发送消息

    • class: Codememory\WebSocketServerBundle\Event\MessageSentEvent
  • codememory.ws_server.start_server:服务器启动。通常,在此事件中,添加必要的子进程

    • class: Codememory\WebSocketServerBundle\Event\StartServerEvent
  • codememory.ws_server.start_worker:服务器启动工作进程开始启动

    • class: Codememory\WebSocketServerBundle\Event\StartWorkerEvent

让我们实现通过数据库中的ID向特定用户发送消息的任务

首先,让我们创建一个用于打开连接事件的监听器,并将所有连接保存到redis中

<?php

use Codememory\WebSocketServerBundle\Event\ConnectionOpenEvent;
use Predis\Client;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;

#[AsEventListener(ConnectionOpenEvent::NAME, 'onOpen')]
readonly class SaveConnectionToRedisEventListener
{
    public function __construct(
        private Client $client
    ) {
    }

    public function onOpen(ConnectionOpenEvent $event): void
    {
        // We save the new connection in the hash table
        $this->client->hset('websocket:connections', $event->connectionID, json_encode([
            'connection_id' => $event->connectionID,
            'websocket_sec_key' => $event->secWebsocketKey
        ]));
    }
}

现在让我们在CONNECT消息上创建一个事件监听器

请注意,在当前入门示例中,我们不会使用JWT,而是立即获取用户ID。在你的例子中,你可以在传递user_id时,传递JWT令牌,检查其有效性,并从其中获取user_id

<?php

use Codememory\WebSocketServerBundle\Interfaces\MessageEventListenerInterface;
use Codememory\WebSocketServerBundle\Interfaces\MessageInterface;
use Codememory\WebSocketServerBundle\Interfaces\ServerInterface;
use Predis\Client;

readonly class ConnectEventListener implements MessageEventListenerInterface
{
    public function __construct(
        private Client $client
    ) {
    }

    public function handle(ServerInterface $server, MessageInterface $message): void
    {
        $data = $message->getData();
        
        if (array_key_exists('user_id', $data) && is_int($data['user_id'])) {
          // Here we bind the user to the ws connection and save it to a new hash table
          $this->client->hset($this->buildKey($data['user_id']), $message->getSenderConnectionID(), json_encode([
              'timestamp' => time()
          ]));
        }
    }
    
    private function buildKey(int $userId): string
    {
        return "websocket:user:$userId:connections";
    }
}

// Don't worry about registering this EventListener in codememory_ws_server.yaml

现在让我们创建一个管理器,将需要发送给特定用户的消息保存到队列中

<?php

use Predis\Client;

final readonly class WebSocketMessageQueueManager
{
    public const HASH_TABLE_NAME = 'websocket:queue:messages';

    public function __construct(
        private Client $client
    ) {
    }

    public function sendMessage(int $userId, string $event, array $data): void
    {
        // We get all ws connections by user ID
        $connections = $this->client->hgetall("websocket:user:$userId:connections");

        foreach ($connections as $id => $userConnectionData) {
            // Receiving information about the connection by connection identifier
            $connection = $this->client->hget('websocket:connections', $id);

            if (null !== $connection) {
                $connectionData = json_decode($connection, true);

                // We save the message in a hash table, as the key we indicate the connection ID to which we need to send and its websocket-sec-key (to ensure security)
                $this->client->hset(
                    self::HASH_TABLE_NAME,
                    $this->buildMessageField($id, $connectionData['websocket_sec_key']),
                    json_encode([
                      'event' => $event,
                      'data' => $data
                    ])
                );
            }
        }
    }
    
    private function buildMessageField(int $connectionId, string $webSocketSecKey): string
    {
        return "{$connectionId}_{$webSocketSecKey}";
    }
}

最后一步,我们将添加一个进程,该进程将监视redis并检查是否存在需要发送给用户的消息

<?php

use App\Services\WebSocketMessageQueueManager;
use Codememory\WebSocketServerBundle\Event\StartServerEvent;
use Predis\Client;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
use Throwable;

#[AsEventListener(StartServerEvent::NAME, 'onStart')]
final readonly class ProcessForSendingMessagesFromQueueEventListener
{
    public function __construct(
        private Client $client,
        private LoggerInterface $logger
    ) {
    }

    public function onStart(StartServerEvent $event): void
    {
        try {
            $event->server->addProcess(function () use ($event) {
                // Receive all messages from the queue
                $messages = $this->client->hgetall(WebSocketMessageQueueManager::HASH_TABLE_NAME);
                
                foreach ($messages as $for => $message) {
                    [$connectionID, $webSocketSecKey] = explode('_', $for);
                    
                    // We check that the message that was added to the queue belongs to the same connection that is connected
                    if ($this->connectionCheck($connectionID, $webSocketSecKey)) {
                        $message = json_decode($message, true);

                        $event->server->sendMessage($connectionID, $message['event'], $message['data']);
                        
                        // We remove the message from the queue so that it is not sent again
                        $this->client->hdel(WebSocketMessageQueueManager::HASH_TABLE_NAME, [$for]);
                    }
                }
            });
        } catch (Throwable $e) {
            $this->logger->critical($e, [
                'origin' => self::class,
                'detail' => 'An error occurred while adding a process to send messages from a queue.'
            ]);
        }
    }

    private function connectionCheck(int $connectionID, string $webSocketSecKey): bool
    {
        $connection = $this->client->hget('websocket:connections', $connectionID);

        if (null !== $connection) {
            $connectionData = json_decode($connection, true);

            if ($connectionData['websocket_sec_key'] === $webSocketSecKey) {
                return true;
            }
        }

        return false;
    }
}

这就结束了,这个示例并不是理想的,需要进行修改,并取决于你的需求

现在,如果我们想向ID为500的用户发送消息,我们只需在任何代码位置使用我们的管理器并调用sendMessage方法即可