chencongbao / laravel-kafka
Laravel 的 Kafka 驱动器
Requires
- php: ^7.3||^8.0
- ext-rdkafka: ^5.0|^4.0
- flix-tech/avro-serde-php: ^1.7
- monolog/monolog: ^2.3
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.0
- orchestra/testbench: ^6.20
- phpunit/phpunit: ^9.5
- predis/predis: ^1.1
This package is not auto-updated.
Last update: 2024-09-28 20:03:52 UTC
README
你在你的 Laravel 项目中使用 Kafka 吗?今天之前我看到的所有包,包括我自己构建的一些包,都没有提供良好的语法用法,或者如果提供了,使用这些包的测试过程非常痛苦。
此包提供了一种在 Laravel 项目中生产和消费 Kafka 消息的好方法。
按照以下文档安装此包,并在你的 Laravel 项目中使用 Kafka。
安装
要安装此包,您必须已安装 PHP RdKafka 扩展。您可以通过以下步骤在系统中安装 rdkafka:此处。
安装 RdKafka 后,使用 composer 需要此包
composer require mateusjunges/laravel-kafka
您可以使用以下命令发布包配置:
php artisan vendor:publish --tag=laravel-kafka-config
现在您可以开始使用了!
使用
安装包后,您就可以开始生产和消费 Kafka 消息。
生产 Kafka 消息
要将消息发布到 Kafka,您可以使用 Junges\Kafka\Facades\Kafka
类的 publishOn
方法
use Junges\Kafka\Facades\Kafka;
Kafka::publishOn('topic');
您还可以指定要发布消息的代理
use Junges\Kafka\Facades\Kafka;
Kafka::publishOn('topic', 'broker');
此方法返回一个 Junges\Kafka\Producers\ProducerBuilder::class
实例,您可以配置您的消息。
ProducerBuilder
类包含一些方法来配置您的 Kafka 生产者。以下行描述了这些方法。
ProducerBuilder 配置方法
withConfigOption
方法设置一个 \RdKafka\Conf::class
选项。您可以在此处检查所有可用选项:此处。此方法每次调用设置一个配置,您可以使用 withConfigOptions
方法传递一个配置名称和配置值的数组作为参数。以下是一个示例
use Junges\Kafka\Facades\Kafka;
Kafka::publishOn('topic')
->withConfigOption('property-name', 'property-value')
->withConfigOptions([
'property-name' => 'property-value'
]);
在开发应用程序时,您可以使用 withDebugEnabled
方法启用调试。要禁用调试模式,您可以使用 ->withDebugEnabled(false)
或 withDebugDisabled
方法。
use Junges\Kafka\Facades\Kafka;
Kafka::publishOn('topic')
->withConfigOption('property-name', 'property-value')
->withConfigOptions([
'property-name' => 'property-value'
])
->withDebugEnabled() // To enable debug mode
->withDebugDisabled() // To disable debug mode
->withDebugEnabled(false) // Also to disable debug mode
使用自定义序列化器
要使用自定义序列化器,您必须使用 usingSerializer
方法
$producer = \Junges\Kafka\Facades\Kafka::publishOn('topic')->usingSerializer(new MyCustomSerializer());
使用 AVRO 序列化器
要使用 AVRO 序列化器,请添加 AVRO 序列化器
use FlixTech\AvroSerializer\Objects\RecordSerializer;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
use GuzzleHttp\Client;
$cachedRegistry = new CachedRegistry(
new BlockingRegistry(
new PromisingRegistry(
new Client(['base_uri' => 'kafka-schema-registry:9081'])
)
),
new AvroObjectCacheAdapter()
);
$registry = new AvroSchemaRegistry($cachedRegistry);
$recordSerializer = new RecordSerializer($cachedRegistry);
//if no version is defined, latest version will be used
//if no schema definition is defined, the appropriate version will be fetched form the registry
$registry->addBodySchemaMappingForTopic(
'test-topic',
new \Junges\Kafka\Message\KafkaAvroSchema('bodySchemaName' /*, int $version, AvroSchema $definition */)
);
$registry->addKeySchemaMappingForTopic(
'test-topic',
new \Junges\Kafka\Message\KafkaAvroSchema('keySchemaName' /*, int $version, AvroSchema $definition */)
);
$serializer = new \Junges\Kafka\Message\Serializers\AvroSerializer($registry, $recordSerializer /*, AvroEncoderInterface::ENCODE_BODY */);
$producer = \Junges\Kafka\Facades\Kafka::publishOn('topic')->usingSerializer($serializer);
配置 Kafka 消息有效负载
在 Kafka 中,您可以使用消息、消息头和消息键配置您的有效负载。所有这些配置都在 ProducerBuilder
类中可用。
配置消息头
要配置消息头,请使用withHeaders
方法
use Junges\Kafka\Facades\Kafka;
Kafka::publishOn('topic')
->withHeaders([
'header-key' => 'header-value'
])
配置消息正文
您可以使用withMessage
或withBodyKey
方法来配置消息。
withMessage
设置整个消息,并接受一个Junges\Kafka\Message\Message::class
实例作为参数。
这是您应该这样使用它
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message\Message;
$message = new Message(
headers: ['header-key' => 'header-value'],
body: ['key' => 'value'],
key: 'kafka key here'
)
Kafka::publishOn('topic')->withMessage($message);
withBodyKey
方法仅设置消息中的一个键。
use Junges\Kafka\Facades\Kafka;
Kafka::publishOn('topic')->withBodyKey('key', 'value');
使用Kafka键
在Kafka中,键用于确定日志中消息附加到的分区。如果您想在消息中使用键,应使用withKafkaKey
方法。
use Junges\Kafka\Facades\Kafka;
Kafka::publishOn('topic')->withKafkaKey('your-kafka-key');
将消息发送到Kafka
配置完所有消息选项后,您必须使用send
方法将消息发送到Kafka。
use Junges\Kafka\Facades\Kafka;
/** @var \Junges\Kafka\Producers\ProducerBuilder $producer */
$producer = Kafka::publishOn('topic')
->withConfigOptions(['key' => 'value'])
->withKafkaKey('your-kafka-key')
->withKafkaKey('kafka-key')
->withHeaders(['header-key' => 'header-value']);
$producer->send();
消费Kafka消息
如果您的应用程序需要从Kafka主题中读取消息,您必须创建一个消费者对象,订阅适当的主题并开始接收消息。
要使用此包创建消费者,您可以在Kafka外观上使用createConsumer
方法。
use Junges\Kafka\Facades\Kafka;
$consumer = Kafka::createConsumer();
此方法还允许您指定要消费的topics
、broker
和消费者group id
。
use Junges\Kafka\Facades\Kafka;
$consumer = Kafka::createConsumer(['topic-1', 'topic-2'], 'group-id', 'broker');
此方法返回一个Junges\Kafka\Consumers\ConsumerBuilder::class
实例,您可以使用它来配置您的消费者。
订阅主题
创建消费者后,您可以使用subscribe
方法订阅Kafka主题。
use Junges\Kafka\Facades\Kafka;
$consumer = Kafka::createConsumer()->subscribe('topic');
当然,您可以一次性订阅多个主题,无论是使用主题数组还是逐个指定。
use Junges\Kafka\Facades\Kafka;
$consumer = Kafka::createConsumer()->subscribe('topic-1', 'topic-2', 'topic-n');
// Or, using array:
$consumer = Kafka::createConsumer()->subscribe([
'topic-1',
'topic-2',
'topic-n'
]);
配置消费者组
属于同一消费者组的Kafka消费者共享一个组ID。组内的消费者通过确定每个分区只由组中的一个消费者消费来尽可能公平地将主题分区分配给自己。
要将您的消费者附加到消费者组,您可以使用withConsumerGroupId
方法来指定消费者组ID。
use Junges\Kafka\Facades\Kafka;
$consumer = Kafka::createConsumer()->withConsumerGroupId('foo');
配置消息处理器
现在您已经创建了一个Kafka消费者,您必须为该消费者接收的消息创建一个处理器。默认情况下,消费者是一个callable
。您可以使用可调用的类或简单的回调。使用withHandler
方法指定您的处理器。
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer();
// Using callback:
$consumer->withHandler(function(\Junges\Kafka\Contracts\KafkaConsumerMessage $message) {
// Handle your message here
});
或者,使用可调用的类
class Handler
{
public function __invoke(\Junges\Kafka\Contracts\KafkaConsumerMessage $message){
// Handle your message here
}
}
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->withHandler(new Handler)
KafkaConsumerMessage
合约为您提供了一些方便的方法来获取消息属性。
getKey()
:返回Kafka消息键getTopicName()
:返回发布消息的主题getPartition()
:返回发布消息的Kafka分区getHeaders()
:返回Kafka消息头getBody()
:返回消息正文getOffset()
:返回发布消息的偏移量
配置要消费的最大消息数
如果您想消费有限数量的消息,您可以使用withMaxMessages
方法设置Kafka消费者要消费的最大消息数。
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->withMaxMessages(2);
使用pcntl停止消费者
停止消费者非常有用,如果您想确保在处理已消费消息的过程中不会杀死进程。
要优雅地停止消费者,请在消费者实例上调用stopConsume
方法。
这对于使用信号处理程序特别有用。注意 您需要安装进程控制扩展才能利用pcntl方法。
function gracefulShutdown(Consumer $consumer) {
$consumer->stopConsume(function() {
echo 'Stopped consuming';
exit(0);
});
}
$consumer = Kafka::createConsumer(['topic'])
->withConsumerGroupId('group')
->withHandler(new Handler)
->build();
pcntl_signal(SIGINT, fn() => gracefulShutdown($consumer));
$consumer->consume();
配置死信队列
在Kafka中,死信队列(或DLQ)是Kafka集群中的一个简单Kafka主题,作为无法到达预期目标的消息的接收目的地。
在包中创建一个dlq
,您可以使用withDlq
方法。如果您没有指定DLQ主题名称,它将基于您正在消费的主题创建,主题名称后添加-dlq
后缀。
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->withDlq();
//Or, specifying the dlq topic name:
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->withDlq('your-dlq-topic-name')
使用SASL
SASL允许您的生产者和消费者对您的Kafka集群进行身份验证,从而验证他们的身份。它也是客户端声明身份的安全方式。要提供SASL配置,您可以使用withSasl
方法,传递一个Junges\Kafka\Config\Sasl
实例作为参数
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()
->withSasl(new \Junges\Kafka\Config\Sasl(
password: 'password',
username: 'username'
mechanisms: 'authentication mechanism'
));
您还可以设置与sasl一起使用的安全协议。这是可选的,默认使用SASL_PLAINTEXT
,但您可以将它设置为SASL_SSL
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()
->withSasl(new \Junges\Kafka\Config\Sasl(
password: 'password',
username: 'username'
mechanisms: 'authentication mechanism',
securityProtocol: 'SASL_SSL',
));
使用中间件
中间件提供了一种方便的方式来过滤和检查您的Kafka消息。要为该包编写中间件,您可以使用withMiddleware
方法。中间件是一个可调用的函数,其中第一个参数是消息本身,第二个参数是下一个处理程序。中间件按定义的顺序执行
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()
->withMiddleware(function($message, callable $next) {
// Perform some work here
return $next($message);
});
使用自定义反序列化器
要设置您想要使用的反序列化器,请使用usingDeserializer
方法
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->usingDeserializer(new MyCustomDeserializer());
注意:反序列化器类必须使用与用于生产此消息的序列化器相同的算法。
使用AVRO反序列化器
要在您的消费者上使用AVRO反序列化器,请添加Avro反序列化器
use FlixTech\AvroSerializer\Objects\RecordSerializer;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
use GuzzleHttp\Client;
$cachedRegistry = new CachedRegistry(
new BlockingRegistry(
new PromisingRegistry(
new Client(['base_uri' => 'kafka-schema-registry:9081'])
)
),
new AvroObjectCacheAdapter()
);
$registry = new \Junges\Kafka\Message\Registry\AvroSchemaRegistry($cachedRegistry);
$recordSerializer = new RecordSerializer($cachedRegistry);
//if no version is defined, latest version will be used
//if no schema definition is defined, the appropriate version will be fetched form the registry
$registry->addBodySchemaMappingForTopic(
'test-topic',
new \Junges\Kafka\Message\KafkaAvroSchema('bodySchema' , 9 /* , AvroSchema $definition */)
);
$registry->addKeySchemaMappingForTopic(
'test-topic',
new \Junges\Kafka\Message\KafkaAvroSchema('keySchema' , 9 /* , AvroSchema $definition */)
);
// if you are only decoding key or value, you can pass that mode as additional third argument
// per default both key and body will get decoded
$deserializer = new \Junges\Kafka\Message\Deserializers\AvroDeserializer($registry, $recordSerializer /*, AvroDecoderInterface::DECODE_BODY */);
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->usingDeserializer($deserializer);
使用自动提交
自动提交检查在每次poll调用时都会调用,并检查经过的时间是否大于配置的时间。要启用自动提交,请使用withAutoCommit
方法
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->withAutoCommit();
使用自定义提交者
默认情况下,由DefaultCommitterFactory
提供的提交者被提供。
要在您的消费者上设置自定义提交者,请通过实现CommitterFactory
接口的工厂添加提交者
use \RdKafka\KafkaConsumer;
use \RdKafka\Message;
use \Junges\Kafka\Commit\Contracts\Committer;
use \Junges\Kafka\Commit\Contracts\CommitterFactory;
use \Junges\Kafka\Config\Config;
class MyCommitter implements Committer
{
public function commitMessage(Message $message, bool $success) : void {
// ...
}
public function commitDlq(Message $message) : void {
// ...
}
}
class MyCommitterFactory implements CommitterFactory
{
public function make(KafkaConsumer $kafkaConsumer, Config $config) : Committer {
// ...
}
}
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()
->usingCommitterFactory(new MyCommitterFactory())
->build();
设置Kafka配置选项
要设置配置选项,您可以使用两种方法:传递选项和选项值的数组使用withOptions
方法,或者使用withOption
方法并传递两个参数,即选项名称和选项值。
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()
->withOptions([
'option-name' => 'option-value'
]);
// Or:
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()
->withOption('option-name', 'option-value');
构建消费者
当您完成消费者的配置后,您必须调用build
方法,该方法返回一个Junges\Kafka\Consumers\Consumer
实例。
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()
// Configure your consumer here
->build();
消费kafka消息
构建消费者后,您必须调用consume
方法来消费消息
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->build();
$consumer->consume();
使用内置消费者命令
此库为您提供了一个内置的消费者命令,您可以使用它来消费消息。
要使用此命令,您必须创建一个扩展Junges\Kafka\Contracts\Consumer
的Consumer
类。
然后,只需使用以下命令
php artisan kafka:consume --consumer=\\App\\Path\\To\\Your\\Consumer --topics=topic-to-consume
使用自定义序列化器/反序列化器
序列化是将消息转换为字节数据的过程。反序列化是其逆过程——将字节流转换为对象。简而言之,它将内容转换为可读和可解释的信息。基本上,为了准备消息从生产者传输,我们使用序列化器。此包支持三种序列化器
- NullSerializer / NullDeserializer
- JsonSerializer / JsonDeserializer
- AvroSerializer / JsonDeserializer
更改默认序列化器和反序列化器
默认序列化器是通过使用MessageSerializer
和MessageDeserializer
契约解决的。默认情况下,使用Json
序列化器。
要设置默认序列化器,您可以将MessageSerializer
和MessageDeserializer
契约绑定到实现此接口的任何类。
打开您的AppServiceProvider
类,并在register
方法中添加以下行
$this->app->bind(\Junges\Kafka\Contracts\MessageSerializer::class, function () {
return new MyCustomSerializer();
});
$this->app->bind(\Junges\Kafka\Contracts\MessageDeserializer::class, function() {
return new MyCustomDeserializer();
});
创建自定义序列化器
要创建一个自定义序列化器,您需要创建一个实现了 \Junges\Kafka\Contracts\MessageSerializer
协议的类。此接口强制您声明 serialize
方法。
创建自定义反序列化器
要创建一个自定义反序列化器,您需要创建一个实现了 \Junges\Kafka\Contracts\MessageDeserializer
协议的类。此接口强制您声明 deserialize
方法。
动态替换序列化和反序列化器
序列化和反序列化器需要在生产者和消费者类中同时设置。要设置生产者序列化器,必须在 ProducerBuilder
类上使用 usingSerializer
方法。要设置消费者反序列化器,必须在 ConsumerBuilder
类上使用 usingDeserializer
方法。
$producer = \Junges\Kafka\Facades\Kafka::publishOn('topic')->usingSerializer(new MyCustomSerializer());
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->usingDeserializer(new MyCustomDeserializer());
使用 Kafka::fake()
当测试您的应用程序时,您可能希望“模拟”应用程序的某些方面,以便在给定的测试中它们实际上不会执行。此包提供方便的助手,以默认方式模拟 Kafka 生产者。这些助手主要提供了一个便利层,覆盖了 Mockery,这样您就不必手动进行复杂的 Mockery 方法调用。
Kafka 门面也提供了执行发布消息断言的方法,例如 assertPublished
、assertPublishedOn
和 assertNothingPublished
。
assertPublished
方法
use Junges\Kafka\Facades\Kafka;
use PHPUnit\Framework\TestCase;
class MyTest extends TestCase
{
public function testMyAwesomeApp()
{
Kafka::fake();
$producer = Kafka::publishOn('topic')
->withHeaders(['key' => 'value'])
->withBodyKey('foo', 'bar');
$producer->send();
Kafka::assertPublished($producer->getMessage());
}
}
您还可以在不传递消息参数的情况下使用 assertPublished
。
use Junges\Kafka\Facades\Kafka;
use PHPUnit\Framework\TestCase;
class MyTest extends TestCase
{
public function testMyAwesomeApp()
{
Kafka::fake();
Kafka::publishOn('topic')
->withHeaders(['key' => 'value'])
->withBodyKey('foo', 'bar');
Kafka::assertPublished();
}
}
assertPublishedOn
方法
如果您想断言消息被发布在特定的 Kafka 主题上,您可以使用 assertPublishedOn
方法。
use PHPUnit\Framework\TestCase;
use Junges\Kafka\Facades\Kafka;
class MyTest extends TestCase
{
public function testWithSpecificTopic()
{
Kafka::fake();
$producer = Kafka::publishOn('some-kafka-topic')
->withHeaders(['key' => 'value'])
->withBodyKey('key', 'value');
$producer->send();
Kafka::assertPublishedOn('some-kafka-topic', $producer->getMessage());
// Or:
Kafka::assertPublishedOn('some-kafka-topic');
}
}
您还可以使用回调函数,在消息中使用回调函数执行断言,其中参数是发布的消息本身。
use PHPUnit\Framework\TestCase;
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message\Message;
class MyTest extends TestCase
{
public function testWithSpecificTopic()
{
Kafka::fake();
$producer = Kafka::publishOn('some-kafka-topic')
->withHeaders(['key' => 'value'])
->withBodyKey('key', 'value');
$producer->send();
Kafka::assertPublishedOn('some-kafka-topic', $producer->getMessage(), function(Message $message) {
return $message->getHeaders()['key'] === 'value';
});
// Or:
Kafka::assertPublishedOn('some-kafka-topic', null, function(Message $message) {
return $message->getHeaders()['key'] === 'value';
});
}
}
assertNothingPublished
方法
您还可以使用 assertNothingPublished
断言没有任何消息被发布。
use PHPUnit\Framework\TestCase;
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message\Message;
class MyTest extends TestCase
{
public function testWithSpecificTopic()
{
Kafka::fake();
if (false) {
$producer = Kafka::publishOn('some-kafka-topic')
->withHeaders(['key' => 'value'])
->withBodyKey('key', 'value');
$producer->send();
}
Kafka::assertNothingPublished();
}
}
assertPublishedTimes
方法
有时,您需要断言 Kafka 发布了给定数量的消息。为此,您可以使用 assertPublishedTimes
方法。
use PHPUnit\Framework\TestCase;
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message\Message;
class MyTest extends TestCase
{
public function testWithSpecificTopic()
{
Kafka::fake();
Kafka::publishOn('some-kafka-topic')
->withHeaders(['key' => 'value'])
->withBodyKey('key', 'value');
Kafka::publishOn('some-kafka-topic')
->withHeaders(['key' => 'value'])
->withBodyKey('key', 'value');
Kafka::assertPublishedTimes(2);
}
}
assertPublishedOnTimes
方法
要断言消息在特定主题上被发布了指定次数,您可以使用 assertPublishedOnTimes
方法。
use PHPUnit\Framework\TestCase;
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message\Message;
class MyTest extends TestCase
{
public function testWithSpecificTopic()
{
Kafka::fake();
Kafka::publishOn('some-kafka-topic')
->withHeaders(['key' => 'value'])
->withBodyKey('key', 'value');
Kafka::publishOn('some-kafka-topic')
->withHeaders(['key' => 'value'])
->withBodyKey('key', 'value');
Kafka::assertPublishedOnTimes('some-kafka-topic', 2);
}
}
测试
运行 composer test
以测试此包。
贡献
感谢您考虑为 Laravel Kafka 包做出贡献!贡献指南可以在 这里 找到。