chenxchen/laravel-kafka-php

laravel 的 kafka-php

dev-master 2019-03-15 11:53 UTC

This package is auto-updated.

Last update: 2024-09-16 03:31:26 UTC


README

如何使用

1. 安装 CHENXCHEN/kafka-php

composer require chenxchen/kafka-php:dev-baoshi

以下是一个 composer.json 文件的简单示例

{
	"require": {
		"chenxchen/kafka-php": "dev-baoshi"
	}
}

2. 在 config/queue.php 中定义你的队列

return [
// ...
    'connections' => [
        'kafka-demo' => [
            'driver' => 'd-kafka',
            'metadataBrokerList' => '127.0.0.1:9092', // If the producer and consumer do not configure this configuration item, this configuration item will be used by producers and consumers.
            'topics' => ['local_test', ], // If the producer and consumer do not configure this configuration item, this configuration item will be used by producers and consumers.
            'brokerVersion' => '2.0.0', // If the producer and consumer do not configure this configuration item, this configuration item will be used by producers and consumers.
            'producer' => [
//                 'topics' => ['local_test', 'local_test_2', ], 
                'compression' => \Kafka\Protocol\Produce::COMPRESSION_GZIP,
                'metadataRefreshIntervalMs' => 10000,
//                'metadataBrokerList' => '',
//                'brokerVersion' => '2.0.0',
                'requiredAck' => '1',
                'produceInterval' => 500,
                'timeout' => 5000,
            ],
            'consumer' => [
//                'topics' => ['local_test', 'local_test_2', ],
                'metadataRefreshIntervalMs' => 10000,
                'consumeMode' => \Kafka\ConsumerConfig::CONSUME_BEFORE_COMMIT_OFFSET,
//                'metadataBrokerList' => '',
                'groupId' => 'test-chc',
//                'brokerVersion' => '2.0.0',
//                'sessionTimeout' => 30000,
//                'rebalanceTimeout' => 30000,
                'maxBytes' => 65536,
                'maxWaitTime' => 100,
                'executeHandle' => \App\Jobs\KafkaDemo\KafkaDemoHandle::class, // consumer handle
            ],
        ]
    ],
// ...
];

3. 定义你的作业

<?php

namespace App\Jobs\KafkaDemo;

use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;

class KafkaDemoJob implements ShouldQueue
{
    use InteractsWithQueue, Queueable, SerializesModels;

    /**
     * 内容
     * @var array
     */
    protected $content = null;

    /**
     * Create a new job instance.
     *
     * @param array $content  
     */
    public function __construct($content)
    {
        $this->content = $content;

        // connection name define in queue.php
        $this->connection = 'kafka-demo';
    }

    /**
     * You must implement this function for get data
     * @return array
     */
    public function getData() {
        return $this->content;
    }
}

然后你可以调度这个作业

$job = new KafkaDemoJob([33,567]);
dispatch($job);

4. 定义你的消费者处理程序

<?php

namespace App\Jobs\KafkaDemo;

use CHENXCHEN\LaravelQueueKafka\Message\TopicMessage;

class KafkaDemoHandle
{
    /**
     * @param TopicMessage[] $messages
     */
    public function executes($messages) {
        printf("%s\n", json_encode($messages));
    }
}

5. 启动消费者

php artisan queue:kafka kafka-demo

支持的 Laravel 版本

测试于: [5.3]