goodway / laravel-nats
Nats jetstream 队列驱动程序用于 Laravel
Requires
- php: >=8.1
- basis-company/nats: ^0.23.0
- nesbot/carbon: ^2.67
Requires (Dev)
- laravel/framework: ^8.0|^9.0|^10.0|^11.0
README
欢迎贡献或提供反馈。
先决条件
Laravel 版本
此包可用于 Laravel 8 或更高版本。所需的最低 PHP 版本是 8.1
配置文件
此包发布了一个 config/nats.php 文件。如果您已经有同名文件,必须重命名或删除它,因为它将与此包冲突。您可以选择合并自己的值与该包所需的值,只要此包期望的键存在。有关更多详细信息,请参阅源文件。
Nats 客户端
作为 Nats 客户端,我们使用一个外部的 basis-company/nats.php 包 - 最受欢迎、编写良好且功能强大的 PHP Nats 客户端。感谢 Dmitry Krokhin (nekufa)!
安装
安装库的推荐方法是使用 Composer
$ composer require goodway/laravel-nats
您应该使用以下命令发布 config/nats.php 配置文件:
$ php artisan vendor:publish --provider="Goodway\LaravelNats\NatsQueueProvider"
配置
客户端连接
客户端连接配置在 config/nats.php 文件中指定。支持多个配置
'client' => [
'configurations' => [
'default' => [
'host' => env('NATS_HOST', 'localhost'),
'port' => intval(env('NATS_PORT', 4222)),
'user' => env('NATS_USER'),
'password' => env('NATS_PASSWORD'),
'token' => env('NATS_TOKEN'), // Sets an authorization token for a connection
'nkey' => env('NATS_NKEY'), // new, highly secure public-key signature system based on Ed25519
'jwt' => env('NATS_JWT'), // Token for JWT Authentication
'reconnect' => env('NATS_RECONNECT', true),
'connection_timeout' => floatval(env('NATS_CONNECTION_TIMEOUT', 1)), // Number of seconds the client will wait for a connection to be established
'verbose_mode' => env('NATS_VERBOSE_MODE', false), // Turns on +OK protocol acknowledgements
'inbox_prefix' => env('NATS_INBOX_PREFIX', '_INBOX'), // Sets default prefix for automatically created inboxes
'ping_interval' => intval(env('NATS_PING_INTERVAL', 2)), // Number of seconds between client-sent pings
],
...
]
]
队列连接
在您的 'config/queue.php' 文件中描述队列连接配置。它支持多个客户端配置。
示例
'nats' => [
'driver' => 'nats',
'consumer_client' => 'default', // client configuration name from nats.php config file to listen queue
'publisher_client' => 'default', // client configuration name from nats.php config file to publish queue
'jetstream' => env('NATS_JETSTREAM', 'jetstream'),
'jetstream_retention_policy' => env('NATS_JETSTREAM_RETENTION_POLICY', 'workqueue'),
'consumer' => env('NATS_CONSUMER_GROUP', 'consumer'),
'consumer_iterations' => intval(env('NATS_CONSUMER_ITERATIONS', 3)),
'queue_consumer_create' => (bool)env('NATS_QUEUE_CONSUMER_CREATE', false),
'queue_consumer_prefix' => env('NATS_QUEUE_CONSUMER_PREFIX', 'con'),
'queue_separated_clients' => env('NATS_QUEUE_SEPARATE_IDENTICAL_CLIENTS', true), // separate Nats clients with the identical configuration
// 'queue_handler' => 'someClass',
'fire_events' => (bool)env('NATS_QUEUE_MESSAGE_EVENTS', true),
'default_batch_size' => intval(env('NATS_DEFAULT_BATCH_SIZE', 10)),
'verbose_mode' => (bool)env('NATS_QUEUE_VERBOSE_MODE', true),
],
字段描述
- driver - [字符串] 队列驱动程序名称。使用 'nats'
- consumer_client - [字符串] 用于监听队列的客户端配置名称
- publisher_client - [字符串] 用于发布消息的客户端配置名称
- jetstream - [字符串] Nats jetstream 名称
- jetstream_retention_policy - [字符串] Nats jetstream 保留策略。用于监听队列时生成正确的消费者名称
- consumer - [字符串] 消费者组。用于消费者的最终名称
- consumer_iterations - [整数,可选,2] 消息请求应发送的次数
- consumer_delay - [浮点数,可选,1] 如果收到空响应,则等待多长时间(以秒为单位)再发送下一个请求
- queue_consumer_create - [布尔值] 如果为 true,则队列将尝试自动创建一个新消费者,如果未找到。此功能仅在当前连接客户端具有创建消费者的必要权限时才有效
- queue_consumer_prefix - [字符串] 消费者前缀。用于消费者的最终名称
- queue_separated_clients - [布尔值] 详见下文描述
- fire_events - [布尔值] 如果为 true,则在发布和处理接收到的消息过程中将触发事件
- default_batch_size - [整数,可选,10] 批次大小。从 nats 流请求多少条消息
- queue_handler - [字符串,可选] 您自定义队列处理器的类名。如果未定义,则将使用标准处理器
- verbose_mode - [布尔值,可选,false] var_dump 一些额外的信息。例如在发布时
您可以指定用于发布者和消费者的不同连接,或者使用一个连接用于两个角色。
如果发布者客户端配置名称与消费者客户端配置名称匹配,您也可以使用一个或分开的连接。通过 "queue_separated_clients" 布尔属性使用此功能。
向队列发布
向队列发送消息最简单的方式是调用一个扩展了NatsMessageJob的简单类的dispatch()方法。这个类必须包含一个body()方法,该方法将返回消息的内容。body()的返回类型是mixed,可以是字符串或数组。这些数据将被序列化。
示例
use Goodway\LaravelNats\NatsMessageJob;
class TestNatsJob extends NatsMessageJob
{
public function body(): mixed
{
return [
'data' => Str::random(32),
'group_random' => random_int(1,3),
];
}
}
您也可以通过构造函数传递内容来使用动态体。
NatsMessageJob类使用了Dispatchable和Queueable关注点,并实现了经典的ShouldQueue接口。示例
dispatch((new TestNatsJob())->onConnection('nats')->onQueue('queue-name');
您也可以使用$subject变量指定消息的主题(默认为"default"),或者在分发作业时设置它。
protected string $subject = 'mySubject';
dispatch((new TestNatsJob())
->setSubject('mySubject'))
->onConnection('nats')->onQueue('queue-name');
头部信息
您可以使用您类的headers()方法设置消息头部信息。示例
class TestNatsJob extends NatsMessageJob
{
public function headers(): array
{
return [
'header1_key' => 'header1_value',
'header2_key' => 'header2_value',
];
}
...
}
从队列中监听
您可以使用标准的queue:work机制连接并监听来自队列的消息。示例
php artisan queue:work nats --queue=queue-name
消息对象结构
{
"body": [string] serialized data of your message body
"headers": [array] headers array
"subject": [string] subject value
"timestamp": [int] timestamp in ms
}
事件
这些事件在发布和监听过程中被触发。
NatsQueueMessageSent
在消息被发送到队列后,将触发此事件
NatsQueueMessageReceived
从队列接收到消息时将触发此事件
很快将添加更多文档...
...