werk365 / larakafka
Laravel的Kafka客户端,可以轻松处理消息的发送和接收
Requires
- illuminate/support: ~7|~8
- jobcloud/php-kafka-lib: ^1
Requires (Dev)
- orchestra/testbench: ~5|~6
- phpunit/phpunit: ~9.0
This package is auto-updated.
Last update: 2024-09-19 19:59:57 UTC
README
Laravel使用的Kafka客户端包。基于jobcloud/php-kafka-lib。
此包支持扩展以自动生成spatie activitylog活动。使用werk365/larakafka-activity
。
无需过多配置(只需确保配置中有所需的代理信息和凭据),您就可以在模型上启用Spatie Activity Logging,此包将负责将信息发送到与应用程序名称对应的主题。
除了基本的日志记录功能外,它还允许您生成和消费任何您想要的内容。
生成可以在代码中轻松完成,您可以通过以下方式启动任意数量的消费者:
$ php artisan larakafka:consume {topic}
安装
通过Composer
$ composer require werk365/larakafka
使用以下命令发布配置文件:
$ php artisan vendor:publish --provider="Werk365\LaraKafka\LaraKafkaServiceProvider"
配置
发布的配置文件如下,下面有解释。
<?php return [ 'client' => [ 'client_name' => str_replace(' ', '-', strtolower(env('APP_NAME'))), 'configs' => [ 'producer' => [ 'client.id' => str_replace(' ', '-', strtolower(env('APP_NAME'))), 'compression.codec' => 'snappy', 'security.protocol' => 'SASL_SSL', 'sasl.mechanisms' => 'PLAIN', 'sasl.username' => '', 'sasl.password' => '' ], 'consumer' => [ 'client.id' => str_replace(' ', '-', strtolower(env('APP_NAME'))), 'security.protocol' => 'SASL_SSL', 'sasl.mechanisms' => 'PLAIN', 'sasl.username' => '', 'sasl.password' => '', 'auto.offset.reset' => 'earliest' ] ], 'broker' => '', ], 'functions' => [ 'consumer' => [ 'example' => [ 'function' => 'ingest', 'namespace' => '\App\Services\KafkaService' ] ], ], 'maps' => [ 'consumer' => [ 'example' => [ 'model' => 'App\Models\Example', 'event_id' => 'uuid', 'model_id' => 'id', 'attributes' => [ 'uuid' => 'id', 'first_name' => 'name', ] ] ] ] ];
并非所有上述配置对于每个用例都是必需的。如果您只想使用活动日志功能,只需确保配置了client.configs.producer
和client.broker
。
客户端
生产者和消费者配置都在这里。这应该是相当直接的。请注意,生产者默认设置的主题将是client.client_name
的值。
函数
这目前仅用于消费者,如果您想消费一个主题,当收到消息时将调用相应的函数。在这种情况下,当消费example
主题时,将调用静态函数\App\Services\KafkaService::ingest($key, $headers, $body)
。然后由您的应用程序处理这些数据。
映射
最后,这是storeMessage()
函数的配置。此函数可以用来帮助轻松处理从Kafka接收到的数据。此函数期望一个数据数组,并将其映射和存储到数据库中。有关此函数的更多使用信息,但以下是一些配置点:
model
= 用于存储数据的模型
event_id
= 属于对象的唯一ID的关键名称
model_id
= 在模型中调用的唯一ID的关键名称
attributes
= 应为此模型存储的属性。并非所有在此配置中配置的属性都必须在消费的消息中存在,因为只能发送更新属性。其中key
代表事件中的属性键名称,value
代表在模型中调用的键。
用法
生成
use Werk365\LaraKafka\LaraKafka; $kafka = new LaraKafka(); $kafka->setTopic("string") //optional, defaults to application name ->setKey("string") // optional, default will be the caller classname ->setHeaders(["key" => "value"]) // optional, default will contain more information about caller ->setBody("string") // Body can also be set like: $kafka = new LaraKafka("body") ->produce();
其他可用方法
->setBroker("string")
设置除配置中定义的以外的代理
->setProducerConfig([])
覆盖配置设置
->addProducerConfig("key", "value")
向设置配置添加值
->addHeaders([])
合并添加的头部数组到设置的一个中
Octane消费者
当使用Laravel Octane时,您可以选择通过Swoole工作进程运行消费者,或者通过下一步骤中描述的控制台命令正常通过PHP进程运行。要使用Octane版本,首先使用以下命令创建一个新的消费者:
$ php artisan kafka:consumer topic
这将在 App\Consumers
命名空间中创建一个新的消费者类。例如,当使用 test
作为主题名称时,为 App\Consumers\TestConsumer
。
在这个消费者中,您将找到一个 handleMessage()
方法,其中包含您开始处理消息所需的一切。为了确保此消费者与您的 octane 应用程序一起启动,请将消费者添加到 octane 配置文件中的监听器中。我建议将其添加为 TickReceived
事件的监听器,如下所示
'listeners' => [ // ... TickReceived::class => [ ...Octane::prepareApplicationForNextOperation(), \App\Consumers\TestConsumer::class ], // ... ],
消费者将只启动一次并持续运行,但将其放在这里意味着它将在每个 octane-tick(每秒)启动,这样消费者可以确保它仍在运行。如果初始消费者在 60
秒内没有发出任何活动迹象,将启动一个新的消费者。
控制台消费者
要运行消费者,您只需运行
$ php artisan larakafka:consume {topic}
在读取消息时,您的配置中定义的函数将被调用。如果我们有示例配置中给出的函数,它可能看起来像这样
namespace App\Services; use Illuminate\Support\Facades\Log; class KafkaService { public static function ingest($key, $headers, $body) { Log::info(json_encode($body)); } }
如果您希望存储接收到的消息正文中的数据,您可以使用 storeMessage
方法。此方法接受一个属性数组和类型数组。这意味着一个属性数组可以映射并存储到不同的模型(类型)中。在这个示例中,我们将只存储一个模型,假设示例配置,并且假设正文有 event_attributes
,它是一个包含属性的数组。
namespace App\Services; use Werk365\LaraKafka\LaraKafka; class KafkaService { public static function ingest($key, $headers, $body) { $kafka = new LaraKafka(); $kafka->storeMessage($body->event_attributes, ["user"]); } }
变更日志
请参阅 变更日志 了解最近有哪些更改。
测试
工作进度中(WIP)
$ composer test
安全
如果您发现任何与安全相关的问题,请通过作者的电子邮件而不是使用问题跟踪器来联系。
致谢
许可协议
许可协议。有关更多信息,请参阅 许可文件。