alpari/kafka-client

PHP 实现的 Kafka 客户端

0.10.2 2017-09-18 11:39 UTC

README

alpari/kafka-client 是一个 PHP 库,实现了 Apache Kafka 协议,包含生产者和消费者支持。它被设计得尽可能接近 PHP,尽可能保持 API 和配置与原始版本一致。

安装

alpari/kafka-client 可以使用 composer 安装。安装非常简单,只需运行以下命令让 Composer 下载库及其依赖项:

$ composer require alpari/kafka-client

此库包含多个分支,每个分支都支持 Apache Kafka 的特定版本,以下是映射关系:

  • 分支 0.8.x 适用于 Kafka 0.8.0 版本
  • 分支 0.9.x 适用于 Kafka 0.9.0 版本
  • 分支 0.10.x 适用于 Kafka 0.10.0 版本
  • 分支 master 适用于 Kafka 0.11.0 版本

生产者 API

生产者 API 允许应用程序将数据流发送到 Kafka 集群的 topics 中。

以下是如何使用生产者的示例。

use Alpari\Kafka\DTO\Message;
use Alpari\Kafka\Producer\Config;
use Alpari\Kafka\Producer\KafkaProducer;

include __DIR__ . '/vendor/autoload.php';

$producer = new KafkaProducer([
    Config::BOOTSTRAP_SERVERS => ['tcp://'],
]);
$result = $producer->send('test', Message::fromValue('foo'));

所需选项仅为 Config::BOOTSTRAP_SERVERS,它应描述用于引导连接到 Kafka 的 Kafka 服务器列表。

有关其他选项的详细信息,请参阅 Alpari\Kafka\Producer\Config 常量描述和 生产者配置

消费者 API

消费者 API 允许应用程序从 Kafka 集群的 topics 中读取数据流。

以下是如何使用消费者的示例。

use Alpari\Kafka;
use Alpari\Kafka\Consumer\Config;
use Alpari\Kafka\Consumer\KafkaConsumer;

$consumer = new KafkaConsumer([
    Config::BOOTSTRAP_SERVERS       => ['tcp://'],
    Config::GROUP_ID                => 'Kafka-Daemon',
    Config::FETCH_MAX_WAIT_MS       => 5000,
    Config::AUTO_OFFSET_RESET       => Kafka\Consumer\OffsetResetStrategy::LATEST,
    Config::SESSION_TIMEOUT_MS      => 30000,
    Config::AUTO_COMMIT_INTERVAL_MS => 10000,
    Config::METADATA_CACHE_FILE     => '/tmp/metadata.php',
]);
$consumer->subscribe(['test']);
for ($i=0; $i<100; $i++) {
    $data = $consumer->poll(1000);
    echo json_encode($data), PHP_EOL;
}

有关配置的详细描述,请访问 消费者配置

PHP 特定配置

为了与 PHP 一起快速工作,此库引入了一些特定的配置选项。

  • metadata.cache.file 存储元数据的文件名,此文件将在生产环境中被有效缓存到 Opcode 缓存中。
  • stream.async.connect 客户端是否应使用异步连接到代理。
  • stream.persistent.connection 客户端是否应使用持久连接到集群。

对于从 web 请求发布事件,建议启用持久连接并配置元数据缓存路径。在这种情况下,发布事件的速度会尽可能快。