goodway/laravel-nats

Nats jetstream 队列驱动程序用于 Laravel

0.14.0 2024-06-20 23:46 UTC

This package is auto-updated.

Last update: 2024-09-21 00:24:29 UTC


README

License Latest Version Total Downloads

欢迎贡献或提供反馈。

先决条件

Laravel 版本

此包可用于 Laravel 8 或更高版本。所需的最低 PHP 版本是 8.1

配置文件

此包发布了一个 config/nats.php 文件。如果您已经有同名文件,必须重命名或删除它,因为它将与此包冲突。您可以选择合并自己的值与该包所需的值,只要此包期望的键存在。有关更多详细信息,请参阅源文件。

Nats 客户端

作为 Nats 客户端,我们使用一个外部的 basis-company/nats.php 包 - 最受欢迎、编写良好且功能强大的 PHP Nats 客户端。感谢 Dmitry Krokhin (nekufa)!

安装

安装库的推荐方法是使用 Composer

$ composer require goodway/laravel-nats

您应该使用以下命令发布 config/nats.php 配置文件:

$ php artisan vendor:publish --provider="Goodway\LaravelNats\NatsQueueProvider"

配置

客户端连接

客户端连接配置在 config/nats.php 文件中指定。支持多个配置

    'client' => [
        'configurations' => [
            'default' => [
                'host' => env('NATS_HOST', 'localhost'),
                'port' => intval(env('NATS_PORT', 4222)),
                'user' => env('NATS_USER'),
                'password' => env('NATS_PASSWORD'),
                'token' => env('NATS_TOKEN'), // Sets an authorization token for a connection
                'nkey' => env('NATS_NKEY'), // new, highly secure public-key signature system based on Ed25519
                'jwt' => env('NATS_JWT'), // Token for JWT Authentication
                'reconnect' => env('NATS_RECONNECT', true),
                'connection_timeout' => floatval(env('NATS_CONNECTION_TIMEOUT', 1)), // Number of seconds the client will wait for a connection to be established
                'verbose_mode' => env('NATS_VERBOSE_MODE', false), // Turns on +OK protocol acknowledgements
                'inbox_prefix' => env('NATS_INBOX_PREFIX', '_INBOX'), // Sets default prefix for automatically created inboxes
                'ping_interval' => intval(env('NATS_PING_INTERVAL', 2)), // Number of seconds between client-sent pings
            ],
            ...
        ]
    ]

队列连接

在您的 'config/queue.php' 文件中描述队列连接配置。它支持多个客户端配置。

示例

    'nats' => [
        'driver' => 'nats',
        'consumer_client' => 'default', // client configuration name from nats.php config file to listen queue
        'publisher_client' => 'default', // client configuration name from nats.php config file to publish queue
        'jetstream' => env('NATS_JETSTREAM', 'jetstream'),
        'jetstream_retention_policy' => env('NATS_JETSTREAM_RETENTION_POLICY', 'workqueue'),
        'consumer' => env('NATS_CONSUMER_GROUP', 'consumer'),
        'consumer_iterations' => intval(env('NATS_CONSUMER_ITERATIONS', 3)),
        'queue_consumer_create' => (bool)env('NATS_QUEUE_CONSUMER_CREATE', false),
        'queue_consumer_prefix' => env('NATS_QUEUE_CONSUMER_PREFIX', 'con'),
        'queue_separated_clients' => env('NATS_QUEUE_SEPARATE_IDENTICAL_CLIENTS', true), // separate Nats clients with the identical configuration
        //            'queue_handler' => 'someClass',
        'fire_events' => (bool)env('NATS_QUEUE_MESSAGE_EVENTS', true),
        'default_batch_size' => intval(env('NATS_DEFAULT_BATCH_SIZE', 10)),
        'verbose_mode' => (bool)env('NATS_QUEUE_VERBOSE_MODE', true),
    ],

字段描述

  • driver - [字符串] 队列驱动程序名称。使用 'nats'
  • consumer_client - [字符串] 用于监听队列的客户端配置名称
  • publisher_client - [字符串] 用于发布消息的客户端配置名称
  • jetstream - [字符串] Nats jetstream 名称
  • jetstream_retention_policy - [字符串] Nats jetstream 保留策略。用于监听队列时生成正确的消费者名称
  • consumer - [字符串] 消费者组。用于消费者的最终名称
  • consumer_iterations - [整数,可选,2] 消息请求应发送的次数
  • consumer_delay - [浮点数,可选,1] 如果收到空响应,则等待多长时间(以秒为单位)再发送下一个请求
  • queue_consumer_create - [布尔值] 如果为 true,则队列将尝试自动创建一个新消费者,如果未找到。此功能仅在当前连接客户端具有创建消费者的必要权限时才有效
  • queue_consumer_prefix - [字符串] 消费者前缀。用于消费者的最终名称
  • queue_separated_clients - [布尔值] 详见下文描述
  • fire_events - [布尔值] 如果为 true,则在发布和处理接收到的消息过程中将触发事件
  • default_batch_size - [整数,可选,10] 批次大小。从 nats 流请求多少条消息
  • queue_handler - [字符串,可选] 您自定义队列处理器的类名。如果未定义,则将使用标准处理器
  • verbose_mode - [布尔值,可选,false] var_dump 一些额外的信息。例如在发布时

您可以指定用于发布者和消费者的不同连接,或者使用一个连接用于两个角色。

如果发布者客户端配置名称与消费者客户端配置名称匹配,您也可以使用一个或分开的连接。通过 "queue_separated_clients" 布尔属性使用此功能。

向队列发布

向队列发送消息最简单的方式是调用一个扩展了NatsMessageJob的简单类的dispatch()方法。这个类必须包含一个body()方法,该方法将返回消息的内容。body()的返回类型是mixed,可以是字符串或数组。这些数据将被序列化。

示例

use Goodway\LaravelNats\NatsMessageJob;

class TestNatsJob extends NatsMessageJob
{
    public function body(): mixed
    {
        return [
            'data' => Str::random(32),
            'group_random' => random_int(1,3),
        ];
    }
}

您也可以通过构造函数传递内容来使用动态体。

NatsMessageJob类使用了Dispatchable和Queueable关注点,并实现了经典的ShouldQueue接口。示例

dispatch((new TestNatsJob())->onConnection('nats')->onQueue('queue-name');

您也可以使用$subject变量指定消息的主题(默认为"default"),或者在分发作业时设置它。

protected string $subject = 'mySubject';
dispatch((new TestNatsJob())
    ->setSubject('mySubject'))
    ->onConnection('nats')->onQueue('queue-name');

头部信息

您可以使用您类的headers()方法设置消息头部信息。示例

class TestNatsJob extends NatsMessageJob
{
    public function headers(): array
    {
        return [
            'header1_key' => 'header1_value',
            'header2_key' => 'header2_value',
        ];
    }
    ...
}

从队列中监听

您可以使用标准的queue:work机制连接并监听来自队列的消息。示例

php artisan queue:work nats --queue=queue-name

消息对象结构

{
    "body": [string] serialized data of your message body
    "headers": [array] headers array
    "subject": [string] subject value
    "timestamp": [int] timestamp in ms
}

事件

这些事件在发布和监听过程中被触发。

NatsQueueMessageSent

在消息被发送到队列后,将触发此事件

NatsQueueMessageReceived

从队列接收到消息时将触发此事件

很快将添加更多文档...

...