werk365/larakafka

Laravel的Kafka客户端,可以轻松处理消息的发送和接收

1.1.8 2021-08-19 13:10 UTC

This package is auto-updated.

Last update: 2024-09-19 19:59:57 UTC


README

Latest Version on Packagist Total Downloads StyleCI

Laravel使用的Kafka客户端包。基于jobcloud/php-kafka-lib

此包支持扩展以自动生成spatie activitylog活动。使用werk365/larakafka-activity

无需过多配置(只需确保配置中有所需的代理信息和凭据),您就可以在模型上启用Spatie Activity Logging,此包将负责将信息发送到与应用程序名称对应的主题。

除了基本的日志记录功能外,它还允许您生成和消费任何您想要的内容。

生成可以在代码中轻松完成,您可以通过以下方式启动任意数量的消费者:

$ php artisan larakafka:consume {topic}

安装

通过Composer

$ composer require werk365/larakafka

使用以下命令发布配置文件:

$ php artisan vendor:publish --provider="Werk365\LaraKafka\LaraKafkaServiceProvider"

配置

发布的配置文件如下,下面有解释。

<?php

return [
    'client' => [
        'client_name' =>  str_replace(' ', '-', strtolower(env('APP_NAME'))),
        'configs' => [
            'producer' => [
                'client.id' => str_replace(' ', '-', strtolower(env('APP_NAME'))),
                'compression.codec' => 'snappy',
                'security.protocol' => 'SASL_SSL',
                'sasl.mechanisms' => 'PLAIN',
                'sasl.username' => '',
                'sasl.password' => ''
            ],
            'consumer' => [
                'client.id' => str_replace(' ', '-', strtolower(env('APP_NAME'))),
                'security.protocol' => 'SASL_SSL',
                'sasl.mechanisms' => 'PLAIN',
                'sasl.username' => '',
                'sasl.password' => '',
                'auto.offset.reset' => 'earliest'
            ]
        ],
        'broker' => '',
    ],
    'functions' => [
        'consumer' => [
            'example' => [
                'function' => 'ingest',
                'namespace' => '\App\Services\KafkaService'
            ]
        ],
    ],
    'maps' => [
        'consumer' => [
            'example' => [
                'model' => 'App\Models\Example',
                'event_id' => 'uuid',
                'model_id' => 'id',
                'attributes' => [
                    'uuid' => 'id',
                    'first_name' => 'name',
                ]
            ]
        ]
    ]
];

并非所有上述配置对于每个用例都是必需的。如果您只想使用活动日志功能,只需确保配置了client.configs.producerclient.broker

客户端

生产者和消费者配置都在这里。这应该是相当直接的。请注意,生产者默认设置的主题将是client.client_name的值。

函数

这目前仅用于消费者,如果您想消费一个主题,当收到消息时将调用相应的函数。在这种情况下,当消费example主题时,将调用静态函数\App\Services\KafkaService::ingest($key, $headers, $body)。然后由您的应用程序处理这些数据。

映射

最后,这是storeMessage()函数的配置。此函数可以用来帮助轻松处理从Kafka接收到的数据。此函数期望一个数据数组,并将其映射和存储到数据库中。有关此函数的更多使用信息,但以下是一些配置点:

model = 用于存储数据的模型

event_id = 属于对象的唯一ID的关键名称

model_id = 在模型中调用的唯一ID的关键名称

attributes = 应为此模型存储的属性。并非所有在此配置中配置的属性都必须在消费的消息中存在,因为只能发送更新属性。其中key代表事件中的属性键名称,value代表在模型中调用的键。

用法

生成

use Werk365\LaraKafka\LaraKafka;

$kafka = new LaraKafka();
$kafka->setTopic("string") //optional, defaults to application name
    ->setKey("string") // optional, default will be the caller classname
    ->setHeaders(["key" => "value"]) // optional, default will contain more information about caller
    ->setBody("string") // Body can also be set like: $kafka = new LaraKafka("body")
    ->produce();

其他可用方法

->setBroker("string") 设置除配置中定义的以外的代理

->setProducerConfig([]) 覆盖配置设置

->addProducerConfig("key", "value") 向设置配置添加值

->addHeaders([]) 合并添加的头部数组到设置的一个中

Octane消费者

当使用Laravel Octane时,您可以选择通过Swoole工作进程运行消费者,或者通过下一步骤中描述的控制台命令正常通过PHP进程运行。要使用Octane版本,首先使用以下命令创建一个新的消费者:

$ php artisan kafka:consumer topic

这将在 App\Consumers 命名空间中创建一个新的消费者类。例如,当使用 test 作为主题名称时,为 App\Consumers\TestConsumer

在这个消费者中,您将找到一个 handleMessage() 方法,其中包含您开始处理消息所需的一切。为了确保此消费者与您的 octane 应用程序一起启动,请将消费者添加到 octane 配置文件中的监听器中。我建议将其添加为 TickReceived 事件的监听器,如下所示

 'listeners' => [
       // ...

        TickReceived::class => [
            ...Octane::prepareApplicationForNextOperation(),
            \App\Consumers\TestConsumer::class
        ],

        // ...
    ],

消费者将只启动一次并持续运行,但将其放在这里意味着它将在每个 octane-tick(每秒)启动,这样消费者可以确保它仍在运行。如果初始消费者在 60 秒内没有发出任何活动迹象,将启动一个新的消费者。

控制台消费者

要运行消费者,您只需运行

$ php artisan larakafka:consume {topic}

在读取消息时,您的配置中定义的函数将被调用。如果我们有示例配置中给出的函数,它可能看起来像这样

namespace App\Services;

use Illuminate\Support\Facades\Log;

class KafkaService

{
    public static function ingest($key, $headers, $body)
    {

        Log::info(json_encode($body));
    }

}

如果您希望存储接收到的消息正文中的数据,您可以使用 storeMessage 方法。此方法接受一个属性数组和类型数组。这意味着一个属性数组可以映射并存储到不同的模型(类型)中。在这个示例中,我们将只存储一个模型,假设示例配置,并且假设正文有 event_attributes,它是一个包含属性的数组。

namespace App\Services;

use Werk365\LaraKafka\LaraKafka;

class KafkaService

{
    public static function ingest($key, $headers, $body)
    {
        $kafka = new LaraKafka();
        $kafka->storeMessage($body->event_attributes, ["user"]);
    }

}

变更日志

请参阅 变更日志 了解最近有哪些更改。

测试

工作进度中(WIP)

$ composer test

安全

如果您发现任何与安全相关的问题,请通过作者的电子邮件而不是使用问题跟踪器来联系。

致谢

许可协议

许可协议。有关更多信息,请参阅 许可文件