aliftech / laravel-kafka
laravel 的 kafka 驱动
Requires
- php: ^8.0
- ext-rdkafka: ^5.0|^4.0
- flix-tech/avro-serde-php: ^1.7
- monolog/monolog: ^2.3
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.0
- orchestra/testbench: ^6.20
- phpunit/phpunit: ^9.5
- predis/predis: ^1.1
README
你想在 laravel 项目中使用 kafka 吗?我所见过的多数包,都没有提供易懂的语法。
此包提供了一种在 Laravel 项目中发布(发布)和消费(订阅 & 处理)kafka 消息的好方法。
遵循以下文档来安装此包并在你的 laravel 项目中使用 kafka。
安装
要安装此包,你必须已经安装了 PHP RdKafka 扩展。首先,按照以下步骤这里安装系统中的 rdkafka 库,然后按照这里安装 PHP RdKafka。
现在,你可以使用 Composer 包管理器将 Laravel Kafka 安装到你的项目中
composer require aliftech/laravel-kafka
安装 Laravel Kafka 后,使用 kafka:install
Artisan 命令发布其资产
php artisan kafka:install
配置
发布 Kafka 的资产后,其主配置文件将位于 config/kafka.php
。(稍后详细介绍)
发布消息
可以将消息发布到主题。为此,你需要创建一个主题模型和一个消息 DTO。
发布消息的概念
kafka 中 pub/sub 的概念通过生产者和消费者实现。生产者将消息发布到主题,消费者将订阅主题并监听新消息。主题就像事件,消息就像 DTO 对象,用于将数据从一个源传递到另一个源。
当消息发送到同一主题时,应具有相同的数据结构。为了解决这个问题,你的消息应该作为一个具有数据结构的独立类创建,并且只能发布到唯一的主题。这意味着,单个主题不应该接收具有不同数据结构的两个消息。
将主题视为表(在数据库中)。你不能向表中插入具有不同列结构的两个记录。并将记录视为你的消息。你可以向一个主题发送具有不同数据结构的两个消息。但这通常会制造问题而不是解决问题!
主题
使用以下 artisan 命令在 ./app/Kafka/Topics
文件夹中创建主题
php artisan make:topic MyTopic
主题生成后,必须设置相应的 kafka 主题键,这里为 my_topic
/** * Put down the correct Topic Key. * * @var string $topic_key */ public static string $topic_key = 'my_topic';
消息
使用以下 artisan 命令在 ./app/Kafka/Messages
文件夹中创建消息
php artisan make:message MyMessage
现在,你必须连接到相应的主题
/** * Topic that the message will be sent to. */ protected string $topic_class = MyTopic::class;
在此之后,必须在类中设置 DTO 属性
/** * Message variables to be transfered to Kafka. * they should be public only. */ public int $id; public string $name; public string $file_url; public bool $is_checked;
此外,你也可以创建一个构造函数来在创建对象时设置属性
public function __contruct(int $id, string $name, string $file_url, bool $is) { $this->id = $id; $this->name = $name; $this->file_url = $file_url; $this->is_checked = $is_checked; }
发布
为了将消息发布到主题,必须在消息对象上调用 publish()
方法
use App\Kafka\Messages\MyMessage; // you can create a message object $message = MyMessage::create(1, 'My name changed', 'file2.jpg', false); // or $message = new MyMessage(1, 'My name changed', 'file2.jpg', false); // you can set properties like this too $message->id = 1; $message->name = 'My name changed'; $message->file_url = 'file2.jpg'; $message->is_checked = false; // now that message object is created and // filled with data, it's ready to be published $message->publish();
现在,你的消息(MyMessage
)已经发送到主题(MyTopic
)。
订阅主题
主题可以被消费,就像事件可以被监听。当主题有新消息发布时,消费者会运行某些处理器。
消费主题的概念
要使用消费者,你需要创建主题模型和处理器。然后,你应该将处理器附加到主题上。你可以在处理器内部处理新消息。在设置好主题和处理器后,可以使用Artisan命令kafka:consume
来订阅主题。此命令将在有新消息时自动按照你提供的顺序运行你的处理器。
消息处理器
消息处理器作为独立的类来处理消息。
创建处理器
可以使用以下Artisan命令在./app/Kafka/Handlers
文件夹中创建处理器:
php artisan make:handler MyHandler
处理逻辑
在生成处理器(例如MyHandler
)后,你应该将你的逻辑放在处理器的handle
函数中。
/** * Handle the topic message. * * @param $message * @return void */ public function handle($message): void { // handle new messages here. Put your logic here }
使用中间件
此外,如果你想在消息到达handle
函数之前对其进行过滤,你应该在处理器中使用中间件。为此,你应该使用一个特殊的函数叫做middleware
。
/** * Middleware to be passed before handling the message * * @param $message * @return void */ public function middleware($message, Closure $next): void { // here put your filters $next($message); }
如果你想让中间件传递并调用handle
函数,就像调用函数一样调用第二个参数$next
,并在其中包含消息。如果不调用$next
,则表示$message
无法通过你的过滤器或类似情况。当你调用$next
时,确保以$next($message)
的形式将$message
传递给函数。
你可能想要使用中间件的情况有很多。其中一种情况是,如果你想使用多个处理器来订阅单个主题。你可能希望新消息(到达单个主题)由多个处理器处理,并且每个处理器可能通过其元数据(消息头中的元数据)来过滤消息。
注册处理器
你已经创建了你的主题和处理器。现在,你应该设置当有新消息发布到某个主题时调用哪些处理器。这设置在服务提供者(称为KafkaServiceProvider
)中,该服务提供者在调用Artisan命令kafka:install
时发布。
use App\Kafka\Topics\MyTopic; use App\Kafka\Handlers\MyHandler; /** * The topic consumer mappings for the application. * * @var array */ protected $subscribe = [ MyTopic::class => [ MyHandler::class, // MyHandler2::class, // MyHandler3::class, // ... // You could put a lot of handlers here ], ];
消费主题
现在,你已经设置了主题和处理器之间的关系。为了持续处理新消息,你应该调用以下Artisan命令:
php artisan kafka:consume
此命令将保持运行并监听新消息。但是,当此命令停止时,你的项目也将停止处理新消息。但是,Kafka是智能的,它会保存消费者的偏移量。当你再次调用Artisan命令时,消费者将从停止工作的偏移量处开始。
**在生产环境中,如果没有进程管理器,不要运行此Artisan命令,因为它可能会停止工作。推荐使用Supervisor!
使用守护进程运行消费者
相关文档即将推出!
主题命名的概念
相关文档即将推出!