consilience/laravel-message-flow

使用 Laravel 队列在应用程序之间推送消息。

2.0.0 2023-06-23 17:28 UTC

This package is auto-updated.

Last update: 2024-08-23 19:49:50 UTC


README

Latest Stable Version Total Downloads Latest Unstable Version License

Laravel Message Flow

背景

此包在两个(或更多)Laravel 应用程序之间提供管道,将一个应用程序上的模型消息发送到另一个应用程序上的模型。它使用 Laravel 队列在应用程序之间传递消息。它试图以健壮、可靠和灵活的方式完成此操作。

如果您可以在 Laravel 应用程序之间共享单个队列数据库,则此包将支持在这些应用程序之间传递消息。

您可以混合任意数量的队列连接和队列,以在多个应用程序之间路由消息。

此包的使用场景是替换一系列应用程序之间的许多 webhooks。这些 webhooks 设置起来变得困难,难以维护和监控。此包旨在解决这些问题。

消息

消息是可以以可移植方式序列化为 JSON 的任何数据或对象。可移植性意味着对象可以在不参考源应用程序中的任何模型或对象的情况下反序列化。它只是可以独立存在的数据。

每个消息都分配了一个 UUID,该 UUID 与其一起传递,并且分配了一个名称,该名称在发送时用于将消息路由到特定队列,或在接收时路由到特定作业。

在其最基本实现中,此包将从一个应用程序上的模型中保存的消息移动到另一个应用程序上的模型。它使用 laravel 队列作为其通信渠道。

Basic Message

可靠性

此包确保消息被分发到队列。一旦分发,责任就转移到队列代理,并且被视为已发送。尽管可以通过简单地在相反方向发送消息轻松实现端到端确认,但不会进行端到端确认。

出站消息状态

此包将监视 MessageFlowOut 模型。当一个实例的状态达到 new 状态时,它将被分发到其路由到的队列。如果成功分发,则其状态将移动到 processed。如果无法将其分发到队列,则其状态将为 failed

您可以使用其他状态的其他状态使用附加状态,并将被忽略。

可选地,可以自动删除已处理的出站 MessageFlowOut 模型实例。失败的实例永远不会被删除,因此可以重试。这里采取的方法是一旦消息安全地被队列接受,则不再由发送模型负责。

但是,如果知道消息已被接收方安全接收并处理,则可以设置一个响应通道来确认此信息,只需配置发送方和接收方应用程序在相反方向发送消息即可。将使用不同的消息名称来区分原始消息及其确认响应的不同内容和目的。

Basic Message

灵活性

要发送消息,将创建一个 MessageFlowOut 模型实例作为有效载荷。然后,一系列操作将消息路由到连接两个应用程序的正确队列,并在其安全地进入队列后删除它。

尽管如此,您对该管道拥有完全的控制权,可以根据需要添加或删除操作。例如,您可以删除 DeleteCompleteMessage 操作,这样队列中的消息就可以在发送应用程序上保留更长时间。您可以添加自定义路由规则,或者将消息“tee”到多个目的地(假设这不是您在队列代理中可以做的事情)。您可能只想添加额外的日志记录。灵活性是有的。

出站路由

待办事项:描述消息名称如何路由到队列和连接。

配置

以下是配置步骤概述

  1. 在发送方和接收方上,创建和配置一个将同时由两个应用程序共享的 Laravel 队列。以下是一个共享 Redis 队列的示例配置,但可以使用任何驱动程序。
  2. 配置消息流包以使用共享队列。
  3. 在接收方创建一个观察者来处理传入的消息。

安装

要求

  • Laravel 8 或更高版本(计划支持 laravel 7)
  • PHP 7.4 或更高版本

此包目前尚未在 packagist 上注册。在此期间,请将此条目添加到您的 composer.json 仓库块中。

{
    "repositories": [
        {
            "type": "vcs",
            "url": "https://github.com/consilience/laravel-message-flow"
        }
    ]
}

使用 Composer 安装

composer require consilience/laravel-message-flow

发布迁移和配置

php artisan vendor:publish \
    --provider="Consilience\Laravel\MessageFlow\MessageFlowServiceProvider"

然后,您可以通过运行 php artisan migrate 来迁移数据库。

使用 Redis 的示例配置

我们将展示如何使用 Redis 设置发送者和接收者应用程序的包示例。首先,我们需要一个在发送者和接收者应用程序之间共享的队列连接。我们将使用 redis 作为队列驱动程序。

Laravel 默认为所有 Redis 键设置了一个前缀,该前缀对应用程序是唯一的。这允许多个应用程序使用单个 Redis 数据库而不会发生键冲突。对于我们的目的,我们希望一个前缀在应用程序之间是共享的。

添加到 Redis 键的应用程序范围前缀在 config/database.php 中定义

    'redis' => [

        'client' => env('REDIS_CLIENT', 'phpredis'),

        'options' => [
            'cluster' => env('REDIS_CLUSTER', 'redis'),

            // Remove this default global prefix option:

            'prefix' => env('REDIS_PREFIX', Str::slug(env('APP_NAME', 'laravel'), '_').'_database_'),
        ],

        // ...
    ],

然后,需要将前缀添加到 defaultcache 数据库条目中。这将恢复前缀以防止它们与其他使用相同 Redis 数据库的应用程序冲突

config/database.php:

    'redis' => [

        'client' => env('REDIS_CLIENT', 'phpredis'),

        'options' => [
            'cluster' => env('REDIS_CLUSTER', 'redis'),
        ],

        'default' => [
            'url' => env('REDIS_URL'),
            'host' => env('REDIS_HOST', '127.0.0.1'),
            'password' => env('REDIS_PASSWORD', null),
            'port' => env('REDIS_PORT', '6379'),
            'database' => env('REDIS_DB', '0'),

            // Prefix needed here:

            'prefix' => env('REDIS_PREFIX', Str::slug(env('APP_NAME', 'laravel'), '_').'_database_'),
        ],

        'cache' => [
            'url' => env('REDIS_URL'),
            'host' => env('REDIS_HOST', '127.0.0.1'),
            'password' => env('REDIS_PASSWORD', null),
            'port' => env('REDIS_PORT', '6379'),
            'database' => env('REDIS_CACHE_DB', '1'),
            
            // Prefix needed here:

            'prefix' => env('REDIS_PREFIX', Str::slug(env('APP_NAME', 'laravel'), '_').'_database_'),
        ],
    ],

现在,添加共享连接。这提供了对两个应用程序将共享的 Redis 数据库的访问。

config/database.php:

    'redis' => [
        'client' => env('REDIS_CLIENT', 'phpredis'),

        // ...

        // Give this connection any name other than your application name.
        // You may need to set different credentials if the shared redis
        // database is not the default database.

        'message-flow-database' => [
            'url' => env('REDIS_URL'),
            'host' => env('REDIS_HOST', '127.0.0.1'),
            'password' => env('REDIS_PASSWORD', null),
            'port' => env('REDIS_PORT', '6379'),
            'database' => env('REDIS_CACHE_DB', '0'),
            'prefix' => 'message-flow:',
        ],
    ],

使用 Redis 的共享队列连接现在应该完成。我们可以使用共享连接配置一个队列

config/queue.php:

    'connections' => [
        // ...

        'message-flow-queue-connection' => [
            'driver' => 'redis',
            'connection' => 'message-flow-database',
            'queue' => env('REDIS_QUEUE', 'default'),
            'retry_after' => 90,
            'block_for' => null,
        ],
    ],

现在,我们配置 Message Flow 使用此连接和数据库。

config/message-flow.php:

    'name-mappings' => [
        'default' => [
            'queue-connection' => 'message-flow-queue-connection',
            'queue-name' => 'message-flow',
        ],
    ],

发送者和接收者应用程序将具有相同的设置。接收者应用程序将监听队列以处理传入的消息

php artisan queue:work message-flow-queue-connection --queue=message-flow

注意:对于每个共享队列,只有一个应用程序将订阅它并处理推送到它的消息。相反,任何数量的应用程序都可以将消息推送到该队列。如果有多个应用程序相互发送消息,最好为每个共享队列命名,并参考订阅它的应用程序。

发送示例消息

您可以通过从您的发送者应用程序创建一个新的 MessageFlowOut 模型来简单发送消息

use Consilience\Laravel\MessageFlow\Models\MessageFlowOut;

MessageFlowOut::create(["payload" => ["data" => "test data here"]]);

MessageFlowOut::create(["payload" => $myModel]);

要从接收者应用程序检索消息,可以将监听器指向传入的模型。创建一个观察者

php artisan make:observer MessageFlowObserver \
    --model='Consilience\Laravel\MessageFlow\Models\MessageFlowIn'

一个示例观察者可能设置如下

<?php

namespace App\Observers;

use Consilience\Laravel\MessageFlow\Models\MessageFlowIn;

class MessageFlowObserver
{
    /**
     * Handle the MessageFlowIn "created" event.
     *
     * @param  \Consilience\Laravel\MessageFlow\Models\MessageFlowIn $messageFlowIn
     * @return void
     */
    public function created(MessageFlowIn $messageFlowIn)
    {
        if ($messageFlowIn->isNew()) {
            // Process the message.

            // ...

            // A number of options once processed, either here or in
            // a dispatched job:

            $messageFlowIn->setComplete()->save(); // Set it as processed
            $messageFlowIn->setFailed()->save(); // Set it as unprocessed
            $messageFlowIn->delete(); // Delete the message (not before a dispatched job is processed)
            // or a custom action or status.
        }
    }
}

观察者需要在例如 App\Providers\EventServiceProvider 中进行注册

<?php

namespace App\Providers;

use Illuminate\Foundation\Support\Providers\EventServiceProvider as ServiceProvider;
use Consilience\Laravel\MessageFlow\Models\MessageFlowIn;
use App\Observers\MessageFlowObserver;

class EventServiceProvider extends ServiceProvider
{
    //...

    /**
     * Register any events for your application.
     *
     * @return void
     */
    public function boot()
    {
        MessageFlowIn::observe(MessageFlowObserver::class);
    }
}

Artisan 命令

此包引入了一些新的 artisan 命令

创建消息

此命令允许您创建一个新的出站消息。

php artisan message-flow:create-message \
    --name='routing-name' \
    --payload='{"json":"payload"}' \
    --status=new

如果没有提供选项,名称将为 default,状态为 new,有效载荷为空对象。

列出消息

此命令将列出当前在缓存表中。这些是正在发送或已发送但尚未删除的消息。它们也是已接收但尚未删除的消息。

php artisan message-flow:list-messages \
    --direction={inbound|outbound} \
    --status={new|complete|failed|other} \
    --uuid={uuid-of-message} \
    --limit=20 \
    --page=1 \
    --process

statusuuid 选项可以接受多个值。

limit 选项设置返回的记录数。这实际上是页面大小。

page” 选项指定要显示的页(大小为 limit)。页码从第一页开始计数为1。

process” 选项将为尚未处理的消息调度作业。对于处于“new”或“failed”状态的外出消息。这通常仅用于测试或启动失败观察者。对于处于“new”状态的内联消息,这将触发优雅的“created”事件来启动自定义观察者。

使用“-v”选项,将在列表中包含负载。一些负载可能很大。

测试

可以使用 docker-compose 运行单元测试,请参阅此处出色的说明:https://thephp.website/en/issue/php-docker-quick-setup/

测试设置为在 PHP 7.4 下运行。要执行单元测试,请运行 phpunit 服务。

docker-compose run phpunit

如果需要,可以提供额外的选项。composer 服务为在同一PHP版本下运行composer提供支持。

docker-compose run composer list
docker-compose run composer update

待办事项

  • 状态概述。
  • 名称和路由(高级配置)。
  • 外出管道(高级配置)。
  • 待完成的测试。
  • 支持针对多个框架版本(7和8)和PHP版本进行测试。