greensight/laravel-phprdkafka-consumer

此包已被废弃且不再维护。作者建议使用ensi/laravel-phprdkafka-consumer包。

laravel-phprdkafka的高级消费者,具有观点性

0.1.0 2021-09-30 18:01 UTC

This package is auto-updated.

Last update: 2022-01-11 08:39:07 UTC


README

已弃用,请使用https://github.com/ensi-platform/laravel-php-rdkafka-consumer代替

laravel-phprdkafka的高级消费者,具有观点性,针对greensight/laravel-phprdkafka

安装

首先,您需要安装和配置greensight/laravel-phprdkafka

然后,

composer require greensight/laravel-phprdkafka-consumer

使用以下命令发布配置文件

php artisan vendor:publish --provider="Greensight\LaravelPhpRdKafkaConsumer\LaravelPhpRdKafkaConsumerServiceProvider" --tag="kafka-consumer-config"

现在转到config/kafka-consumer.php并在其中添加处理器。

使用方法

该包提供了php artisan kafka:consume {topic} {consumer=default}命令,该命令执行与给定主题和消费者名称匹配的第一个处理器。消费者名称来自greensight/laravel-phprdkafka配置文件。

配置中的处理器有以下配置选项

[
   /*
   | Optional, defaults to `null`.
   | Here you may specify which topic should be handled by this processor.
   | Processor handles all topics by default.
   */
   'topic' => 'stage.crm.fact.registrations.1',

   /*
   | Optional, defaults to `null`.
   | Here you may specify which greensight/laravel-phprdkafka consumer should be handled by this processor.
   | Processor handles all consumers by default.
   */
   'consumer' => 'default',

   /*
   | Optional, defaults to `action`.
   | Here you may specify processor's type. Defaults to `action`
   | Supported types:
   |  - `action` - a simple class with execute method;
   |  - `job` - Laravel Queue Job. It will be dispatched using `dispatch` or `dispatchSync` method;
   */
   'type' => 'action',

   /*
   | Required.
   | Fully qualified class name of a processor class.
   */
   'class' => \App\Domain\Communication\Actions\SendConfirmationEmailAction::class,
   
   /*
   | Optional, defaults to `false`.
   | Proxy messages to Laravel's queue.
   | Supported values:
   |  - `false` - do not stream message. Execute processor in syncronous mode;
   |  - `true` - stream message to Laravel's default queue;
   |  - `<your-favorite-queue-name-as-string>` - stream message to this queue;
   */
   'queue' => false,

   /*
   | Optional, defaults to 5000.
   | Kafka consume timeout in milliseconds .
   */
   'consume_timeout' => 5000,
]

同步处理器

大多数时候,您只需要一个同步处理器。以下是一个这样的处理器的简单示例

class SendConfirmationEmailAction
{
   public function execute(Message $message): void
   {
      // var_dump($message->payload);
   }
}

可排队处理器

如果您想将消息流到Laravel自己的队列,可以使用spatie/laravel-queueable-action

如果由于某些原因您不想依赖该包,则可以切换到Laravel作业

在这两种情况下,您还需要在包的配置中为给定的处理器指定'queue' => true'queue' => 'my-favorite-queue'

使用Laravel作业的处理器示例

use RdKafka\Message;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;

class ConsumeMessageJob implements ShouldQueue
{
   use Dispatchable, InteractsWithQueue, Queueable;

   public function __construct(protected Message $message)
   {
   }

   public function handle(): void
   {
      // var_dump($this->message->payload);
   }
}

测试

composer test

变更日志

请参阅CHANGELOG了解最近更改的更多信息。

许可证

MIT许可证(MIT)。请参阅许可证文件了解更多信息。