greensight/laravel-phprdkafka-consumer
0.1.0
2021-09-30 18:01 UTC
Requires
- php: ^8.0
- ext-rdkafka: *
- greensight/laravel-phprdkafka: ^0.1.4
- illuminate/support: ^7 || ^8
Requires (Dev)
- brianium/paratest: ^6.2
- nunomaduro/collision: ^5.3
- orchestra/testbench: ^6.15
- phpunit/phpunit: ^9.3
- spatie/laravel-ray: ^1.9
This package is auto-updated.
Last update: 2022-01-11 08:39:07 UTC
README
已弃用,请使用https://github.com/ensi-platform/laravel-php-rdkafka-consumer代替
laravel-phprdkafka的高级消费者,具有观点性,针对greensight/laravel-phprdkafka
安装
首先,您需要安装和配置greensight/laravel-phprdkafka
然后,
composer require greensight/laravel-phprdkafka-consumer
使用以下命令发布配置文件
php artisan vendor:publish --provider="Greensight\LaravelPhpRdKafkaConsumer\LaravelPhpRdKafkaConsumerServiceProvider" --tag="kafka-consumer-config"
现在转到config/kafka-consumer.php
并在其中添加处理器。
使用方法
该包提供了php artisan kafka:consume {topic} {consumer=default}
命令,该命令执行与给定主题和消费者名称匹配的第一个处理器。消费者名称来自greensight/laravel-phprdkafka配置文件。
配置中的处理器有以下配置选项
[ /* | Optional, defaults to `null`. | Here you may specify which topic should be handled by this processor. | Processor handles all topics by default. */ 'topic' => 'stage.crm.fact.registrations.1', /* | Optional, defaults to `null`. | Here you may specify which greensight/laravel-phprdkafka consumer should be handled by this processor. | Processor handles all consumers by default. */ 'consumer' => 'default', /* | Optional, defaults to `action`. | Here you may specify processor's type. Defaults to `action` | Supported types: | - `action` - a simple class with execute method; | - `job` - Laravel Queue Job. It will be dispatched using `dispatch` or `dispatchSync` method; */ 'type' => 'action', /* | Required. | Fully qualified class name of a processor class. */ 'class' => \App\Domain\Communication\Actions\SendConfirmationEmailAction::class, /* | Optional, defaults to `false`. | Proxy messages to Laravel's queue. | Supported values: | - `false` - do not stream message. Execute processor in syncronous mode; | - `true` - stream message to Laravel's default queue; | - `<your-favorite-queue-name-as-string>` - stream message to this queue; */ 'queue' => false, /* | Optional, defaults to 5000. | Kafka consume timeout in milliseconds . */ 'consume_timeout' => 5000, ]
同步处理器
大多数时候,您只需要一个同步处理器。以下是一个这样的处理器的简单示例
class SendConfirmationEmailAction { public function execute(Message $message): void { // var_dump($message->payload); } }
可排队处理器
如果您想将消息流到Laravel自己的队列,可以使用spatie/laravel-queueable-action
如果由于某些原因您不想依赖该包,则可以切换到Laravel作业
在这两种情况下,您还需要在包的配置中为给定的处理器指定'queue' => true
或'queue' => 'my-favorite-queue'
。
使用Laravel作业的处理器示例
use RdKafka\Message; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; class ConsumeMessageJob implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable; public function __construct(protected Message $message) { } public function handle(): void { // var_dump($this->message->payload); } }
测试
composer test
变更日志
请参阅CHANGELOG了解最近更改的更多信息。
许可证
MIT许可证(MIT)。请参阅许可证文件了解更多信息。