prwnr/laravel-streamer

使用 Redis 5.0 流的 Laravel 事件流包

4.2.0 2024-08-29 06:19 UTC

README

Streamer 是一个 Laravel 包,用于不同应用程序之间的事件功能,由 Redis Streams 提供支持。此包利用 Redis 5.0 Streams 的所有主要命令,提供将 Streams 作为事件简单使用的功能。

此包的主要概念是提供一种简单的方式从您的应用程序中发出新事件,并允许在您的其他应用程序中监听它们,这些应用程序正在使用相同的 Redis 服务器。

安装

  1. 通过 composer 命令 composer require prwnr/laravel-streamer 安装包,或者通过在 composer.json 文件中添加它并指定版本。
  2. 发现包
  3. 使用 vendor:publish 命令发布配置。
  4. 确保您有一个运行的 Redis 5.0 实例,并且 Laravel 已配置为使用它
  5. 确保您已安装 PHPRedis 扩展

用法

此包的主要用法有两个:发出新事件和监听事件。虽然发出事件需要一些工作才能使用,例如创建自己的 Event 类,但监听事件可以通过 artisan 命令轻松实现,而无需太多工作。

版本兼容性

发出新事件

为了发出新事件,需要做一些事情。首先,您将需要一个有效的类,该类实现了 Prwnr\Streamer\Contracts\Event,如下所示

class ExampleStreamerEvent implements Prwnr\Streamer\Contracts\Event {
    /**
     * Require name method, must return a string.
     * Event name can be anything, but remember that it will be used for listening
     */
    public function name(): string 
    {
        return 'example.streamer.event';
    }
    /**
     * Required type method, must return a string.
     * Type can be any string or one of predefined types from Event
     */
    public function type(): string
    {
        return Event::TYPE_EVENT;
    }
    /**
     * Required payload method, must return array
     * This array will be your message data content
     */
    public function payload(): array
    {
        return ['message' => 'content'];
    }
}

然后,在应用程序的任何位置,您只需要通过使用 Streamer 实例或 Streamer 门面来发出该事件。

$event = new ExampleStreamerEvent();
$id = \Prwnr\Streamer\Facades\Streamer::emit($event);

这将创建一个名为(如果不存在)example.streamer.event 的流上的消息。如果发出事件成功,发出方法将返回一个 ID。

监听事件的新消息

为了监听事件,您必须正确配置 config/streamer.php(或使用 ListenerStack::add 方法)并运行 php artisan streamer:listen 命令。在配置文件中,您将找到 Application listeners 配置,其中包含默认值,如果您想使用 streamer listen 命令开始监听,则需要更改这些值。添加事件监听器的另一种方法是使用 ListenersStack 静态类。此类在配置文件中使用监听器启动,然后由命令使用以获取所有监听器。此类的添加使得可以通过配置文件以及编程方式添加监听器。

请记住,本地监听器应该实现 MessageReceiver 合同,以确保它有一个接受 ReceivedMessage 作为参数的 handle 方法。

/*
|--------------------------------------------------------------------------
| Application listeners
|--------------------------------------------------------------------------
|
| Listeners classes that should be invoked with Streamer listen command
| based on streamer.event.name => [local_handlers] pairs
|
| Local listeners should implement MessageReceiver contract
|
*/
'listen_and_fire' => [
    'example.streamer.event' => [
        //List of local listeners that should be invoked
        //\App\Listeners\ExampleListener::class
    ],
],

上面的配置是包含与 Streamer 事件相关联的本地监听器的数组。当监听 example.streamer.event 时,将根据其配置定义创建所有本地监听器,并使用从流接收的消息调用其 handle 方法。因此,此包使用 Laravel 容器创建监听器实例,因此您可以在监听器构造函数中指定任何类型提示以使用 Laravel 依赖注入。

要开始监听事件,请使用 listen 命令。

命令

监听

streamer:listen example.streamer.event 

此命令将从“现在”开始监听指定的流(或由逗号分隔的流)。它将以阻塞方式监听,这意味着它将一直运行,直到Redis超时或崩溃。所有与监听器相关的错误都被捕获并记录到控制台,同时也存储在失败消息列表中,以便稍后进行调试和/或重试。

这是此命令的基本用法,其中事件名称是必需的参数(除非提供--all选项)。在这种情况下,它只简单地开始监听新事件。然而,此命令有几个选项可以扩展其用法,这些选项是

--all= : Will trigger listener mode to start listening on all events that are registered with local listeners classes (from the ListenersStack). Event name argument is no longer required in this case.
--group= : Name of your streaming group. Only when group is provided listener will listen on group as consumer
--consumer= : Name of your group consumer. If not provided a name will be created as groupname-timestamp
--reclaim= : Milliseconds of pending messages idle time, that should be reclaimed for current consumer in this group. Can be only used with group listening
--last_id= : ID from which listener should start reading messages (using 0-0 will process all old messages)
--keep-alive : Will keep listener alive when any unexpected non-listener related error will occur by simply restarting listening
--max-attempts= : Number of maximum attempts to restart a listener on an unexpected non-listener related error (requires --keep-alive to be used)
--purge : Will remove message from the stream if it will be processed successfully by all local listeners in the current stack.
--archive : Will remove message from the stream and store it in database if it will be processed successfully by all local listeners in the current stack.

当使用consumergroup选项时,流上的每条消息都将标记为给定消费者的已确认,因此它不会被具有相同选项的后续streamer:listen命令调用处理。请注意,如果没有设置consumer和group,则从特定ID进行监听将忽略确认。

purgearchive选项(自v2.6以来可用)旨在用于释放Redis实例的内存或存储(在有大量流消息或负载很大且Redis耗尽内存/存储的情况下)。在使用这些选项时,请注意,它们不会考虑在其他实例或其他服务器上运行的监听器 - 也就是说,当第一个连接到特定事件的监听器处理其消息时,purgearchive选项将删除等待其他监听器完成的消息。要完全使用archive选项,请参阅[流存档][#stream-archive]以获取更多详细信息和说明。

在参数中使用多个事件或使用任何其他选项(如group、consumer、last_id)与--all选项一起将应用这些选项到每个正在使用的流事件。

失败列表

streamer:failed:list

此命令将显示无法由其监听器处理的流消息列表。它将提供有关它们的所有重要信息,例如:ID、流名称、监听器类、导致它失败的错误消息以及发生日期。

表格示例

+-----+-----------+---------------------------+-------+---------------------+
| ID  | Stream    | Receiver                  | Error | Date                |
+-----+-----------+---------------------------+-------+---------------------+
| 123 | foo.bar   | Tests\Stubs\LocalListener | error | 2021-12-12 12:12:12 |
| 321 | other.bar | Tests\Stubs\LocalListener | error | 2021-12-12 12:15:12 |
+-----+-----------+---------------------------+-------+---------------------+

此命令有一个附加选项,称为--compact,它将限制表格输出仅包含ID、流和错误列。

失败重试

streamer:failed:retry

此命令旨在再次尝试失败的监听。它简单地从流中读取消息,并尝试通过最初处理它的监听器再次处理它。

如果监听器再次处理消息失败,则消息失败信息将被重新存储(带有较新的日期和更新的错误消息),并且可以再次尝试。消息可以被处理无数次。除非使用flush命令,否则它将在每次失败后保持可用。

此命令有几个选项可用

--all : retries all existing failed messages
--id= : retries only those messages that are matching given ID
--stream= : retries only those messages that are matching given stream name
--receiver= : retries only those messages that are matching given listener full class name (may require to be in quotation)

至少需要使用这些选项之一与命令一起处理失败消息。all选项只能单独使用,而其他三个选项可以一起使用或单独使用。这意味着,可以使用任何组合的idstreamreceiver来匹配任意数量的失败消息并重试它们。例如,可以同时使用streamid,或者使用idreceiver,或者只使用其中之一,或者同时使用所有三个,一切取决于用例。

失败清除

streamer:failed:flush

此命令将从消息存储中删除所有现有的失败消息。可以用于删除无法由监听器处理的条目。

此命令不会从流本身中删除消息 - 消息将保持原样,但由原始消费者确认(如果使用)。

列表

streamer:list

此命令将列出所有已注册的事件及其关联的监听器。选项--compact将仅提供事件列表,跳过监听器列。

此命令可用于查看实际由监听器处理的事件,这有助于找出缺失的部分。此列表也可以用于通过第三方应用程序开始监听可用的事件。

表格示例

+------------------------+------------------------------------+
| Event                  | Listeners                          |
+------------------------+------------------------------------+
| example.streamer.event | none                               |
| foo.bar                | Tests\Stubs\LocalListener          |
| other.foo.bar          | Tests\Stubs\LocalListener          |
|                        | Tests\Stubs\AnotherLocalListener   |
+------------------------+------------------------------------+

归档

streamer:archive

此命令将归档从选定流中过时的消息(天数/周数等)。它将处理所有流消息,验证它们的created时间戳,并尝试归档(从redis中删除它们并尝试将它们存储在相关的归档存储)。

此命令有两个必选项

--streams : list of streams separated by comma to archive messages from
--older_than= : information how old messages should be to archive them. The suggested format is: 60 min, 1 day, 1 week, 5 days, 2 weeks etc.

请注意使用此命令,因为它不会考虑监听器是否已处理它尝试归档的消息。应谨慎使用,并且仅用于旧消息,这样就可以更确定所有监听器已处理其消息。

清除

streamer:purge

此命令将清除选定流中过时的消息(天数/周数等)。它将处理所有流消息,验证它们的created时间戳,并尝试清除它们(从redis中完全删除)。

此命令有两个必选项

--streams : list of streams separated by comma to purge messages from
--older_than= : information how old messages should be to purge them. The suggested format is: 60 min, 1 day, 1 week, 5 days, 2 weeks etc.

请注意使用此命令,因为它不会考虑监听器是否已处理它尝试清除的消息。应谨慎使用,并且仅用于旧消息,这样就可以更确定所有监听器已处理其消息。

归档还原

streamer:archive:restore

此命令将恢复从相关归档存储中归档的流消息。它基本上会检索消息(全部或选择),并尝试将它们放回流中,同时从归档中删除。此操作将触发连接到已恢复流的监听器!

此命令有几个选项可用

--all : restores all archived messages back to the stream.
--id= : restores archived message back to the stream by ID. Requires --stream option to be used as well.
--stream= : restores all archived messages from a selected stream.

至少需要这些选项之一才能尝试恢复消息。如果在恢复消息时发生任何错误,它将报告该特定尝试,但不会阻止其他消息被处理。

恢复消息将其放回具有新ID的流中 - 这是Redis的要求和限制,即任何添加到流中的消息,其ID都需要高于最后生成的ID。正在恢复的消息的原始ID将存储在original_id字段的元信息中。

流归档

从2.6版本开始,流归档允许将处理过的消息存储在任何类型的存储中,以释放Redis内存和/或空间。归档允许恢复这些消息,将它们释放回流中。

要完全使用归档存储,需要编写并添加新的存储驱动程序到管理器。为此,需要完成几个快速步骤

  1. 定义您的存储驱动程序类,并使其实现\Prwnr\Streamer\Contracts\ArchiveStorage合同。此接口是强制性的。
  2. 以这种方式扩展管理器,以使用您的驱动程序
$manager = $this->app->make(StorageManager::class);
$manager->extend('your_driver_name', static function () {
    return new YourDriver();
});
  1. 在流配置文件中将您的驱动程序定义为默认值
'archive' => [
    'storage_driver' => 'your_driver_name'
]   

重放事件

从2.2版本开始,流事件可以“重放”。这意味着,可以“重建”具有唯一标识符的特定消息,直到“现在”(或直到选定的日期)。

“重放”消息实际上意味着什么?这意味着,流中的所有消息都将从开始处读取,每个条目的负载将“合并”成消息的最终版本 - 如果在历史中存在,则每个字段都将被其“较新”的值替换。

这对于事件非常有用,这些事件可能不包含它们所代表资源的所有信息,但只包含有关已更改字段的 数据。

例如,假设有一个具有namesurname字段的资源,我们将发出3个不同的事件

  • 首先为其创建填充两个字段的值(name: foo; surname: bar
  • 第二个事件只会将 name 改为 foo bar
  • 第三个事件再次将名称改为 bar foo

在重放这组消息时(请记住,每个消息都有一个相同的唯一标识符),我们最终重放的资源将是:name: bar foo; surname: bar。如果我们重放事件直到第三个更改之前的时间,我们将有 name: foo bar; surname: bar

用法

为了使事件可重放,需要实现 Prwnr\Streamer\Contracts\Replayable 接口。这将强制添加一个 getIdentifier 方法,该方法应返回资源的唯一标识符(例如,该事件所代表的资源的UUID)。通过满足此接口,所有将通过 Streamer 的 emit 方法传递的事件也将“标记”为可重放。

要实际重放消息,需要使用 Hsitory 接口实现。

应该使用的方法是:replay(string $event, string $identifier, Carbon $until = null): array。此方法将返回事件的“当前状态”,从其历史中重建。如方法定义所示,它需要事件字符串名称和资源标识符(由 Replayable 接口应用)。第三个参数是可选的,如果使用,则将在遇到第一个匹配日期的消息时停止重放消息。

Eloquent 模型事件

通过使用 EmitsStreamerEvents 特性,您可以轻松地使您的 Eloquent 模型发出基本事件。此特性将您的模型与 Streamer 集成,并在诸如 savecreatedelete 等操作上发出事件。它将发出以模型名称加上操作后缀的事件,以及发生的操作的负载。在 createsave 操作中,负载将包含已更改的字段列表以及每个字段的 before/after(在 create 操作中,字段之前的值基本上都设置为 null),在 delete 操作中,负载将简单地声明模型已被删除。每个负载都包括一个 [key_name => key_value] 对,这是您的模型 ID。

默认情况下,事件将使用其模型的名称加上操作后缀作为名称,但可以通过将其分配给 baseEventName 属性来更改名称。此名称将替换模型名称,但将保留所执行的操作的后缀。

查看此包中的示例目录,了解如何使用 Stream 和 Consumer 实例确切地使用每个命令。