aliftech/laravel-kafka

laravel 的 kafka 驱动

v1.0.5 2021-12-13 11:35 UTC

This package is auto-updated.

Last update: 2024-09-20 13:38:54 UTC


README

Latest Version On Packagist Total Downloads MIT Licensed PHP Version Require

你想在 laravel 项目中使用 kafka 吗?我所见过的多数包,都没有提供易懂的语法。

此包提供了一种在 Laravel 项目中发布(发布)和消费(订阅 & 处理)kafka 消息的好方法。

遵循以下文档来安装此包并在你的 laravel 项目中使用 kafka。

安装

要安装此包,你必须已经安装了 PHP RdKafka 扩展。首先,按照以下步骤这里安装系统中的 rdkafka 库,然后按照这里安装 PHP RdKafka。

现在,你可以使用 Composer 包管理器将 Laravel Kafka 安装到你的项目中

composer require aliftech/laravel-kafka

安装 Laravel Kafka 后,使用 kafka:install Artisan 命令发布其资产

php artisan kafka:install

配置

发布 Kafka 的资产后,其主配置文件将位于 config/kafka.php。(稍后详细介绍)

发布消息

可以将消息发布到主题。为此,你需要创建一个主题模型和一个消息 DTO

发布消息的概念

kafka 中 pub/sub 的概念通过生产者和消费者实现。生产者将消息发布到主题,消费者将订阅主题并监听新消息。主题就像事件,消息就像 DTO 对象,用于将数据从一个源传递到另一个源。

当消息发送到同一主题时,应具有相同的数据结构。为了解决这个问题,你的消息应该作为一个具有数据结构的独立类创建,并且只能发布到唯一的主题。这意味着,单个主题不应该接收具有不同数据结构的两个消息。

将主题视为表(在数据库中)。你不能向表中插入具有不同列结构的两个记录。并将记录视为你的消息。你可以向一个主题发送具有不同数据结构的两个消息。但这通常会制造问题而不是解决问题!

主题

使用以下 artisan 命令在 ./app/Kafka/Topics 文件夹中创建主题

php artisan make:topic MyTopic

主题生成后,必须设置相应的 kafka 主题键,这里为 my_topic

/**
 * Put down the correct Topic Key.
 *
 * @var string $topic_key
 */
public static string $topic_key = 'my_topic';

消息

使用以下 artisan 命令在 ./app/Kafka/Messages 文件夹中创建消息

php artisan make:message MyMessage

现在,你必须连接到相应的主题

/**
 * Topic that the message will be sent to.
 */
protected string $topic_class = MyTopic::class;

在此之后,必须在类中设置 DTO 属性

/**
 * Message variables to be transfered to Kafka.
 * they should be public only.
 */
public int $id;
public string $name;
public string $file_url;
public bool $is_checked;

此外,你也可以创建一个构造函数来在创建对象时设置属性

public function __contruct(int $id, string $name, string $file_url, bool $is) {
    $this->id = $id;
    $this->name = $name;
    $this->file_url = $file_url;
    $this->is_checked = $is_checked;
}

发布

为了将消息发布到主题,必须在消息对象上调用 publish() 方法

use App\Kafka\Messages\MyMessage;

// you can create a message object
$message = MyMessage::create(1, 'My name changed', 'file2.jpg', false);
// or
$message = new MyMessage(1, 'My name changed', 'file2.jpg', false);

// you can set properties like this too
$message->id = 1;
$message->name = 'My name changed';
$message->file_url = 'file2.jpg';
$message->is_checked = false;

// now that message object is created and
// filled with data, it's ready to be published
$message->publish();

现在,你的消息(MyMessage)已经发送到主题(MyTopic)。

订阅主题

主题可以被消费,就像事件可以被监听。当主题有新消息发布时,消费者会运行某些处理器。

消费主题的概念

要使用消费者,你需要创建主题模型和处理器。然后,你应该将处理器附加到主题上。你可以在处理器内部处理新消息。在设置好主题和处理器后,可以使用Artisan命令kafka:consume来订阅主题。此命令将在有新消息时自动按照你提供的顺序运行你的处理器。

消息处理器

消息处理器作为独立的类来处理消息。

创建处理器

可以使用以下Artisan命令在./app/Kafka/Handlers文件夹中创建处理器:

php artisan make:handler MyHandler

处理逻辑

在生成处理器(例如MyHandler)后,你应该将你的逻辑放在处理器的handle函数中。

/**
 * Handle the topic message.
 *
 * @param $message
 * @return void
 */
public function handle($message): void
{
    // handle new messages here. Put your logic here
}

使用中间件

此外,如果你想在消息到达handle函数之前对其进行过滤,你应该在处理器中使用中间件。为此,你应该使用一个特殊的函数叫做middleware

/**
 * Middleware to be passed before handling the message
 *
 * @param $message
 * @return void
 */
public function middleware($message, Closure $next): void
{
    // here put your filters
    $next($message);
}

如果你想让中间件传递并调用handle函数,就像调用函数一样调用第二个参数$next,并在其中包含消息。如果不调用$next,则表示$message无法通过你的过滤器或类似情况。当你调用$next时,确保以$next($message)的形式将$message传递给函数。

你可能想要使用中间件的情况有很多。其中一种情况是,如果你想使用多个处理器来订阅单个主题。你可能希望新消息(到达单个主题)由多个处理器处理,并且每个处理器可能通过其元数据(消息头中的元数据)来过滤消息。

注册处理器

你已经创建了你的主题和处理器。现在,你应该设置当有新消息发布到某个主题时调用哪些处理器。这设置在服务提供者(称为KafkaServiceProvider)中,该服务提供者在调用Artisan命令kafka:install时发布。

use App\Kafka\Topics\MyTopic;
use App\Kafka\Handlers\MyHandler;

/**
 * The topic consumer mappings for the application.
 *
 * @var array
 */
protected $subscribe = [
    MyTopic::class => [
        MyHandler::class,
        // MyHandler2::class,
        // MyHandler3::class,
        // ...
        // You could put a lot of handlers here
    ],
];

消费主题

现在,你已经设置了主题和处理器之间的关系。为了持续处理新消息,你应该调用以下Artisan命令:

php artisan kafka:consume

此命令将保持运行并监听新消息。但是,当此命令停止时,你的项目也将停止处理新消息。但是,Kafka是智能的,它会保存消费者的偏移量。当你再次调用Artisan命令时,消费者将从停止工作的偏移量处开始。

**在生产环境中,如果没有进程管理器,不要运行此Artisan命令,因为它可能会停止工作。推荐使用Supervisor!

使用守护进程运行消费者

相关文档即将推出!

主题命名的概念

相关文档即将推出!