mgid/kafka-bundle

Symfony Kafka Bundle

安装次数: 10,933

依赖者: 0

建议者: 0

安全: 0

星标: 0

关注者: 7

分支: 2

开放问题: 1

类型:symfony-bundle

1.0.0 2020-12-24 16:49 UTC

This package is auto-updated.

Last update: 2024-08-25 00:44:47 UTC


README

Scrutinizer Code Quality Code Coverage Total Downloads Latest Stable Version License

使用说明

  • 安装包
composer req mgid/kafka-bundle
  • 创建消费者。示例(src/Consumer/EmailSendConsumer.php)
<?php

namespace App\Consumer;

use Swift_Mailer;
use Mgid\KafkaBundle\Command\Consumer;

class EmailSendConsumer extends Consumer
{
    public const QUEUE_NAME = 'email_send_queue';

    /**
     * @var Swift_Mailer
     */
    private $mailer;

    /**
     * @required
     *
     * @param Swift_Mailer $mailer
     */
    public function setMailer(Swift_Mailer $mailer)
    {
        $this->mailer = $mailer;
    }

    /**
     * {@inheritdoc}
     */
    protected function onMessage(array $data): void
    {
        $message = (new \Swift_Message($data['subject']))
            ->setFrom($data['sender'])
            ->setTo($data['recipient'])
            ->setBody($data['body']);

        $this->mailer->send($message);
    }
}
  • 生产消息。示例(src/Service/EmailService.php)
<?php

namespace App\Service;

use App\Consumer\DemoConsumer;
use Mgid\KafkaBundle\DependencyInjection\Traits\ProducerTrait;

class EmailService
{
    use ProducerTrait;

    /**
     * @param array $data
     */
    public function send(array $data): void
    {
        $this->producer->send(DemoConsumer::QUEUE_NAME, $data);
    }
}
  • 运行消费者。示例
php bin/console app:consumer:email-send

默认配置

# config/packages/mgid_kafka.yaml
mgid_kafka:
    producers:
        configuration:
            group.id: 'main_group'
            log.connection.close: 'false'
            metadata.broker.list: '%env(KAFKA_BROKERS)%'
            queue.buffering.max.messages: 100000

    consumers:
        configuration:
            group.id: 'main_group'
            auto.offset.reset: 'smallest'
            log.connection.close: 'false'
            metadata.broker.list: '%env(KAFKA_BROKERS)%'

了解更多关于支持的配置属性: librdkafka 配置