hauptmedia/rdkafka

该包已被废弃,不再维护。未建议替代包。

dev-master 2018-08-19 21:43 UTC

This package is auto-updated.

Last update: 2022-12-15 23:59:47 UTC


README

php-rdkafka 扩展的 PHP 模板。

参见https://github.com/arnaud-lb/php-rdkafka

命名空间 Kafka 中的包装/外观对象

您可以使用命名空间 Kafka 中提供的包装对象,以获得对 RdKafka 对象的更 PHP 风格的接口。

生产者示例

include_once('vendor/autoload.php');

use Kafka\Configuration\ProducerConfiguration;
use Kafka\Manager;

$producerConfiguration = (new ProducerConfiguration())
    ->setRequestRequiredAcks(0);

$manager = new Manager(['hostname']);
$topic = $manager->createProducerTopicFacade("test", $producerConfiguration);

for($i=1;$i<=1000;$i++) {
    $topic->produce(0 /* partition */, "test" . time());
}

消费者示例

实现 \Kafka\ConsumerInterface 并将该类添加到 ConsumerTopicFacade 对象中。

include_once('../vendor/autoload.php');

class MyConsumer implements \Kafka\ConsumerInterface {
    /**
     * @param string $topic Topic name
     * @param int $partition Partition
     * @param int $offset Message offset
     * @param string $key Optional message key
     * @param string $payload Message payload
     * @return mixed
     */
    public function consume($topic, $partition, $offset, $key, $payload)
    {
        echo "Received message with payload " . $payload;
    }
}
$consumerConfiguration = (new ConsumerConfiguration())
    ->setAutoCommitIntervalMs(1000)
    ->setOffsetStoreSyncIntervalMs(60);

$manager = new Manager(['hostname']);

$consumerTopic = $manager->createConsumerTopicFacade("test", $consumerConfiguration);
$consumerTopic->addConsumer(new MyConsumer());

$consumerTopic->consumeStart(0 /* partition */, 0 /* offset */);
$consumerTopic->consume(0 /* partition */, 1000 /* timeout in ms */);
$consumerTopic->consumeStop(0 /* partition */);