ensi / laravel-phprdkafka-consumer
为 laravel-phprdkafka 提供的具有偏见的最高级别消费者
Requires
- php: ^8.1
- ext-rdkafka: *
- ensi/laravel-phprdkafka: ^0.4.0
- laravel/framework: ^10.0 || ^11.0
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.2
- nunomaduro/collision: ^6.0 || ^7.0 || ^8.1
- orchestra/testbench: ^7.0 || ^8.0 || ^9.0
- pestphp/pest: ^1.22 || ^2.0
- pestphp/pest-plugin-laravel: ^1.1 || ^2.0
- phpstan/extension-installer: ^1.3
- phpstan/phpstan: ^1.11
- spaze/phpstan-disallowed-calls: ^2.15
README
为 ensi/laravel-phprdkafka 提供具有偏见的最高级别消费者
安装
首先,您必须安装并配置 ensi/laravel-phprdkafka
然后,
composer require ensi/laravel-phprdkafka-consumer
使用以下命令发布配置文件:
php artisan vendor:publish --provider="Ensi\LaravelPhpRdKafkaConsumer\LaravelPhpRdKafkaConsumerServiceProvider" --tag="kafka-consumer-config"
现在转到 config/kafka-consumer.php
并在那里添加处理器。
版本兼容性
基本用法
此软件包提供 php artisan kafka:consume {topic} {consumer=default} {--max-events=0} {--max-time=0} {--once}
命令,该命令执行第一个匹配给定主题和消费者名称的处理器。消费者名称来自 ensi/laravel-phprdkafka
配置文件。
配置中的处理器有以下配置选项
[ /* | Optional, defaults to `null`. | Here you may specify which topic should be handled by this processor. | Processor handles all topics by default. */ 'topic' => 'stage.crm.fact.registrations.1', /* | Optional, defaults to `null`. | Here you may specify which ensi/laravel-phprdkafka consumer should be handled by this processor. | Processor handles all consumers by default. */ 'consumer' => 'default', /* | Optional, defaults to `action`. | Here you may specify processor's type. Defaults to `action` | Supported types: | - `action` - a simple class with execute method; | - `job` - Laravel Queue Job. It will be dispatched using `dispatch` or `dispatchSync` method; */ 'type' => 'action', /* | Required. | Fully qualified class name of a processor class. */ 'class' => \App\Domain\Communication\Actions\SendConfirmationEmailAction::class, /* | Optional, defaults to `false`. | Proxy messages to Laravel's queue. | Supported values: | - `false` - do not stream message. Execute processor in syncronous mode; | - `true` - stream message to Laravel's default queue; | - `<your-favorite-queue-name-as-string>` - stream message to this queue; */ 'queue' => false, /* | Optional, defaults to 5000. | Kafka consume timeout in milliseconds . */ 'consume_timeout' => 5000, ]
重要!某些主题必须具有不同的消费者设置,例如从主题开始读取或不存在时不创建主题。
对于此类情况,您需要配置多个消费者并使用合适的消费者。
同步处理器
大多数时候您只需要一个同步处理器。以下是一个此类处理器的简单示例
use RdKafka\Message; class SendConfirmationEmailAction { public function execute(Message $message): void { // var_dump($message->payload); } }
可排队的处理器
如果您想将消息流式传输到 Laravel 的自用队列,可以使用 spatie/laravel-queueable-action
如果由于某些原因您不想依赖该软件包,您可以将切换到 Laravel Jobs
在两种情况下,您还需要在软件包的配置中指定 'queue' => true
或 'queue' => 'my-favorite-queue'
以针对给定处理器。
使用 Laravel Job 的处理器示例
use RdKafka\Message; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; class ConsumeMessageJob implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable; public function __construct(protected Message $message) { } public function handle(): void { // var_dump($this->message->payload); } }
处理信号
php artisan kafka:consume ...
命令可以配置为在接收到某些操作系统信号后优雅地停止。
可以在软件包配置的 stop_signals
键中设置这些信号,例如 'stop_signals' => [SIGINT, SIGQUIT]
。
您可以使用 pcntl 扩展定义的任何常量 https://php.ac.cn/manual/en/pcntl.constants.php
贡献
有关详细信息,请参阅 CONTRIBUTING
消费者模拟
已添加测试工具以测试开发的手柄。您可以创建一个模拟的消费者并调用监听主题的命令
use Ensi\LaravelPhpRdKafkaConsumer\Commands\KafkaConsumeCommand; use Ensi\LaravelPhpRdKafkaConsumer\Tests\ConsumerFaker; use RdKafka\Message; ConsumerFaker::new('test-model') ->addMessage(new Message()) ->addMessage(new Message()) ->consume();
测试
测试
- composer install
- composer test
安全漏洞
请审查我们的安全策略,了解如何报告安全漏洞 我们的安全策略
许可
MIT 许可证(MIT)。有关更多信息,请参阅 许可文件