machinateur/php-sse

此包以框架无关、低依赖和面向对象的方式实现了PHP中的服务器发送事件。

1.0.0 2022-05-22 14:13 UTC

This package is auto-updated.

Last update: 2024-09-29 06:03:19 UTC


README

此包以框架无关、低依赖和面向对象的方式实现了PHP中的服务器发送事件。

先决条件

是的,你没看错,PHP 5.6是这个包能使用的最低版本要求。这样,即使是旧项目也可以为客户端应用程序提供SSE支持。

也许查看MDN网络文档中的SSE信息也是个好主意,以了解这项技术。我发现它是在这个主题上的一个极好的起点,同时还有底层示例。

安装

通过composer

# install the latest version
composer require "machinateur/php-sse"

用法

以下是一个纯PHP的简单用例。有关完整示例(带有客户端代码),请查看demo目录。

<?php

namespace App;

use Machinateur\SSE\MessageStream;
use Machinateur\SSE\Exception\TimeoutException;

// ...

$stream = new MessageStream();

// ...

// Count to 10, then quit at 5.
$stream->run(function () {
    foreach (\range(1, 10, 1) as $i) {
        if ($i > 5) {
            throw TimeoutException::toTriggerStreamShutdown();
        }

        yield $i;

        \sleep(1);
    }
});

请务必阅读有关何时为什么使用此包的内容,以确保它符合您的用例。

要查看演示,请在终端中执行php -S 127.0.0.1:8001 -t ./demo,然后在新的标签页中打开http://127.0.0.1:8001/message_stream.html。请确保检查浏览器开发者工具的网络标签。

以下是常见用例场景的附加说明。

推荐头部

推荐头部可以从\Machinateur\SSE\MessageStreamInterface的默认实现中获取。

$recommendedHeaders = \Machinateur\SSE\MessageStream::getRecommendedHeaders();

日志支持

可以使用setter设置自定义日志记录器。

$stream = new \Machinateur\SSE\MessageStream();
$stream->setLogger($myLogger);
// ...

关于日志支持的一个说明:\Machinateur\SSE\MessageStream类支持psr日志记录器,以通知关闭信号(notice)和发送到客户端的任何输出(debug)。作为demo应用程序的一部分,提供了一个将日志记录到消息流的日志记录器实现。

自定义消息流

通过实现\Machinateur\SSE\MessageStreamInterface接口,可以轻松地创建自定义消息流。

<?php

namespace App;

use Machinateur\SSE\Exception\TimeoutException;
use Machinateur\SSE\MessageStream;
use Machinateur\SSE\MessageStreamInterface;
use Machinateur\SSE\Message\MessageInterface;

/**
 * A naive implementation of {@see MessageStreamInterface}.
 */
class CustomMessageStream extends MessageStream implements MessageStreamInterface
{
    /**
     * @inheritDoc
     */
    public function run(callable $callback)
    {
        try {
            foreach ($callback() as $message) {
                \assert($message instanceof MessageInterface);

                $this->printOutput($message->getMessageFormat());
                $this->checkConnection();
            }
        } catch (TimeoutException $exception) {
        }
    }
}

// ...

$stream = new CustomMessageStream();

在实现自定义消息流时,请注意接口强制的支持数组结果和生成器函数的要求。

Any implementation must support passing a callback function with array result or a generator function in its
 stead. In the latter case, a {@see \Machinateur\SSE\Exception\TimeoutException} throw must shut down the
 stream (due to time-out). A proper client implementation will resume the connection after its `retry` period.

自定义消息

通过简单地实现\Machinateur\SSE\Message\MessageInterface接口,也可以创建自定义消息。

<?php

namespace App;

use Machinateur\SSE\Format\StreamFormat;
use Machinateur\SSE\Message\MessageInterface;

/**
 * Custom message to be yielded by a generator function. It holds a data array, which is given to `json_encode()`
 *  when sent by the message stream.
 */
class CustomMessage implements MessageInterface
{
    const FLAGS = \JSON_PRETTY_PRINT | \JSON_UNESCAPED_SLASHES | \JSON_UNESCAPED_UNICODE | \JSON_PRESERVE_ZERO_FRACTION;

    /** @var array */
    private $data = array();
    
    /**
     * @param array $data
     */
    public function setData($data)
    {
        $this->data = $data;
    }

    /**
     * @inheritDoc
     */
    public function getStreamFormat()
    {
        return [
            StreamFormat::FIELD_COMMENT => 'source: ' . self::class,
            StreamFormat::FIELD_DATA => \json_encode($this->data, self::FLAGS)
        ];
    }
}

数组与生成器

传递给\Machinateur\SSE\MessageStreamInterface::run()方法的回调函数可以返回一个数组,或者它本身就是一个生成器函数。这种实现的自由度自然会引发是否使用生成器函数的问题。

这完全取决于您的用例。对于服务器端的轮询情况(例如,消费消息队列),后者显然是更好的选择,因为收集大量消息并一次性返回它们,会使SSE变得多余。为什么事件支持数组返回值呢?支持两者可以简化集成和采用,同时使测试变得容易一些。

值得一提的是,\Machinateur\SSE\MessageStream(接口的简单但足够的实现)内部将任何数组返回值转换为生成器。

框架集成

为了保持简单,该库故意不与框架自动集成。但这并不意味着无法与框架一起使用。

使用 \Symfony\Component\HttpFoundation\Response (symfony) 或 \Illuminate\Http\Response (laravel) 非常简单。

  • 示例 #1: \Symfony\Component\HttpFoundation\Response
use Machinateur\SSE\MessageStream;

// ...

$headers = array();

foreach (MessageStream::getRecommendedHeaders() as $header) {
    list($key, $value) = \explode(':', $header, 2);
    $headers[$key] = $value;
}

$response->headers->add($headers);
  • 示例 #2: \Illuminate\Http\Response (继承自 \Symfony\Component\HttpFoundation\Response)
use Machinateur\SSE\MessageStream;

// ...

$headers = array();

foreach (MessageStream::getRecommendedHeaders() as $header) {
    list($key, $value) = \explode(':', $header, 2);
    $headers[$key] = $value;
}

$response->withHeaders($headers);
// or
$response->headers->add($headers);

与其他 SSE 实现的使用

该库受到 hhxsv5/php-sse 的启发和影响,因此下面是如何实现两者之间的互操作性的说明。此示例基于 来自 hhxsv5/php-sse 的现有 php-fpm 示例

<?php

namespace App;

use Hhxsv5\SSE\Event;
use Hhxsv5\SSE\StopSSEException;
use Machinateur\SSE\Exception\TimeoutException;
use Machinateur\SSE\Format\StreamFormat;
use Machinateur\SSE\MessageStream;

foreach (MessageStream::getRecommendedHeaders() as $header) {
    \header($header);
}

// The callback for `hhxsv5/php-sse`.
$callback = function () {
    $id = \mt_rand(1, 1000);

    // Get news from database or service.
    $news = [
        [
            'id' => $id,
            'title' => 'title ' . $id,
            'content' => 'content ' . $id,
        ],
    ];

    // Stop here when no news available.
    if (empty($news)) {
        return false;
    }
    
    // In case something went wrong.
    $shouldStop = false;
    if ($shouldStop) {
        throw new StopSSEException();
    }
    
    return \json_encode(\compact('news'));
    // return ['event' => 'ping', 'data' => 'ping data'];
    // return ['id' => uniqid(), 'data' => json_encode(compact('news'))];
};

$event = new Event($callback, 'news');
unset($callback);

/**
 * Wrapper for better access to protected fields of `\Hhxsv5\SSE\Event`.
 * 
 * @property string $id
 * @property string $event
 * @property string $data
 * @property string $retry
 * @property string $comment
 */
class EventWrapper extends Event
{
    /** @var Event */
    protected $eventObject;

    public function __construct(Event $event)
    {
        $this->eventObject = $event;
    }

    /**
     * @inheritDoc
     */
    public function __get($name)
    {
        if (\in_array($name, ['id', 'event', 'data', 'retry', 'comment']) && \property_exists($this, $name)) {
            return $this->eventObject->{$name};
        }

        throw new LogicException(\sprintf('Unknown property: %s', $name));
    }

    /**
     * @inheritDoc
     */
    public function fill()
    {
        $this->eventObject->fill();
    }

    public function __toString()
    {
        return $this->eventObject->__toString();
    }
}

$event = new EventWrapper($event);

// The callback for `machinateur/php-sse`.
$callback = function () use ($event) {
    try {
        $event->fill();
        yield [
            StreamFormat::FIELD_COMMENT => $event->comment;
            StreamFormat::FIELD_ID => $event->id;
            StreamFormat::FIELD_RETRY => $event->retry;
            StreamFormat::FIELD_EVENT => $event->event;
            StreamFormat::FIELD_DATA => $event->data;
        ];
    } catch (StopSSEException $exception) {
        throw TimeoutException::toTriggerStreamShutdown();
    }
};

$messageStream = new MessageStream();
$messageStream->setLogger($myLogger);
$messageStream->run($callback);

关于

以下是关于此包本身及其背后的意图的一些基本信息。

SSE 究竟是什么?

传统上,网页必须向服务器发送请求以接收新数据;也就是说,页面从服务器请求数据。使用服务器发送事件,服务器可以在任何时候通过向网页推送消息将新数据发送到网页。

(摘自 MDN 网络文档中的 "Server-sent events" "Server-sent events")

[...] 服务器发送事件是单向的;也就是说,数据消息在一个方向上传递,从服务器到客户端(例如用户的网页浏览器)。这使得它们在没有需要以消息形式从客户端发送数据到服务器时是一个非常好的选择。例如,EventSource 是处理社交媒体状态更新、新闻源或将数据传输到客户端存储等事物的有用方法 [...]

(摘自 MDN 网络文档中的 "EventSource" "EventSource")

简而言之,SSE 允许我们从服务器向客户端网页发送事件。

为什么使用这个包?

此包...

  • ... 无框架依赖。
  • ... 无依赖。
  • ... 使用面向对象的方法。
  • ... 可扩展。
  • ... 兼容 PHP >= 5.6

何时使用此包?

此包可用于在服务器端实现简单的 SSE。

客户端可能使用 Remy Sharp 的 EventSource polyfill) 或 Yaffle 的 EventSource polyfill) 或浏览器的本地实现 native browser implementation (参见 caniuse)。

对于更复杂的用例,更灵活的替代方案如 Mercure 可能更合适。

有用的阅读

许可证

这是 MIT 许可。