ensi/laravel-phprdkafka-consumer

为 laravel-phprdkafka 提供的具有偏见的最高级别消费者

2.0.2 2024-09-05 09:00 UTC

This package is auto-updated.

Last update: 2024-09-05 09:00:45 UTC


README

Latest Version on Packagist Tests Total Downloads

ensi/laravel-phprdkafka 提供具有偏见的最高级别消费者

安装

首先,您必须安装并配置 ensi/laravel-phprdkafka

然后,

composer require ensi/laravel-phprdkafka-consumer

使用以下命令发布配置文件:

php artisan vendor:publish --provider="Ensi\LaravelPhpRdKafkaConsumer\LaravelPhpRdKafkaConsumerServiceProvider" --tag="kafka-consumer-config"

现在转到 config/kafka-consumer.php 并在那里添加处理器。

版本兼容性

基本用法

此软件包提供 php artisan kafka:consume {topic} {consumer=default} {--max-events=0} {--max-time=0} {--once} 命令,该命令执行第一个匹配给定主题和消费者名称的处理器。消费者名称来自 ensi/laravel-phprdkafka 配置文件。

配置中的处理器有以下配置选项

[
   /*
   | Optional, defaults to `null`.
   | Here you may specify which topic should be handled by this processor.
   | Processor handles all topics by default.
   */
   'topic' => 'stage.crm.fact.registrations.1',

   /*
   | Optional, defaults to `null`.
   | Here you may specify which ensi/laravel-phprdkafka consumer should be handled by this processor.
   | Processor handles all consumers by default.
   */
   'consumer' => 'default',

   /*
   | Optional, defaults to `action`.
   | Here you may specify processor's type. Defaults to `action`
   | Supported types:
   |  - `action` - a simple class with execute method;
   |  - `job` - Laravel Queue Job. It will be dispatched using `dispatch` or `dispatchSync` method;
   */
   'type' => 'action',

   /*
   | Required.
   | Fully qualified class name of a processor class.
   */
   'class' => \App\Domain\Communication\Actions\SendConfirmationEmailAction::class,
   
   /*
   | Optional, defaults to `false`.
   | Proxy messages to Laravel's queue.
   | Supported values:
   |  - `false` - do not stream message. Execute processor in syncronous mode;
   |  - `true` - stream message to Laravel's default queue;
   |  - `<your-favorite-queue-name-as-string>` - stream message to this queue;
   */
   'queue' => false,

   /*
   | Optional, defaults to 5000.
   | Kafka consume timeout in milliseconds .
   */
   'consume_timeout' => 5000,
]

重要!某些主题必须具有不同的消费者设置,例如从主题开始读取或不存在时不创建主题。
对于此类情况,您需要配置多个消费者并使用合适的消费者。

同步处理器

大多数时候您只需要一个同步处理器。以下是一个此类处理器的简单示例

use RdKafka\Message;

class SendConfirmationEmailAction
{
   public function execute(Message $message): void
   {
      // var_dump($message->payload);
   }
}

可排队的处理器

如果您想将消息流式传输到 Laravel 的自用队列,可以使用 spatie/laravel-queueable-action

如果由于某些原因您不想依赖该软件包,您可以将切换到 Laravel Jobs

在两种情况下,您还需要在软件包的配置中指定 'queue' => true'queue' => 'my-favorite-queue' 以针对给定处理器。

使用 Laravel Job 的处理器示例

use RdKafka\Message;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;

class ConsumeMessageJob implements ShouldQueue
{
   use Dispatchable, InteractsWithQueue, Queueable;

   public function __construct(protected Message $message)
   {
   }

   public function handle(): void
   {
      // var_dump($this->message->payload);
   }
}

处理信号

php artisan kafka:consume ... 命令可以配置为在接收到某些操作系统信号后优雅地停止。
可以在软件包配置的 stop_signals 键中设置这些信号,例如 'stop_signals' => [SIGINT, SIGQUIT]
您可以使用 pcntl 扩展定义的任何常量 https://php.ac.cn/manual/en/pcntl.constants.php

贡献

有关详细信息,请参阅 CONTRIBUTING

消费者模拟

已添加测试工具以测试开发的手柄。您可以创建一个模拟的消费者并调用监听主题的命令

use Ensi\LaravelPhpRdKafkaConsumer\Commands\KafkaConsumeCommand;
use Ensi\LaravelPhpRdKafkaConsumer\Tests\ConsumerFaker;
use RdKafka\Message;

ConsumerFaker::new('test-model')
    ->addMessage(new Message())
    ->addMessage(new Message())
    ->consume();

测试

测试

  1. composer install
  2. composer test

安全漏洞

请审查我们的安全策略,了解如何报告安全漏洞 我们的安全策略

许可

MIT 许可证(MIT)。有关更多信息,请参阅 许可文件