alpari / kafka-client
PHP 实现的 Kafka 客户端
0.10.2
2017-09-18 11:39 UTC
Requires
- php: ~5.6
This package is auto-updated.
Last update: 2024-09-25 20:12:25 UTC
README
alpari/kafka-client
是一个 PHP 库,实现了 Apache Kafka 协议,包含生产者和消费者支持。它被设计得尽可能接近 PHP,尽可能保持 API 和配置与原始版本一致。
安装
alpari/kafka-client
可以使用 composer 安装。安装非常简单,只需运行以下命令让 Composer 下载库及其依赖项:
$ composer require alpari/kafka-client
此库包含多个分支,每个分支都支持 Apache Kafka 的特定版本,以下是映射关系:
- 分支 0.8.x 适用于 Kafka 0.8.0 版本
- 分支 0.9.x 适用于 Kafka 0.9.0 版本
- 分支 0.10.x 适用于 Kafka 0.10.0 版本
- 分支 master 适用于 Kafka 0.11.0 版本
生产者 API
生产者 API 允许应用程序将数据流发送到 Kafka 集群的 topics 中。
以下是如何使用生产者的示例。
use Alpari\Kafka\DTO\Message; use Alpari\Kafka\Producer\Config; use Alpari\Kafka\Producer\KafkaProducer; include __DIR__ . '/vendor/autoload.php'; $producer = new KafkaProducer([ Config::BOOTSTRAP_SERVERS => ['tcp://'], ]); $result = $producer->send('test', Message::fromValue('foo'));
所需选项仅为 Config::BOOTSTRAP_SERVERS
,它应描述用于引导连接到 Kafka 的 Kafka 服务器列表。
有关其他选项的详细信息,请参阅 Alpari\Kafka\Producer\Config
常量描述和 生产者配置。
消费者 API
消费者 API 允许应用程序从 Kafka 集群的 topics 中读取数据流。
以下是如何使用消费者的示例。
use Alpari\Kafka; use Alpari\Kafka\Consumer\Config; use Alpari\Kafka\Consumer\KafkaConsumer; $consumer = new KafkaConsumer([ Config::BOOTSTRAP_SERVERS => ['tcp://'], Config::GROUP_ID => 'Kafka-Daemon', Config::FETCH_MAX_WAIT_MS => 5000, Config::AUTO_OFFSET_RESET => Kafka\Consumer\OffsetResetStrategy::LATEST, Config::SESSION_TIMEOUT_MS => 30000, Config::AUTO_COMMIT_INTERVAL_MS => 10000, Config::METADATA_CACHE_FILE => '/tmp/metadata.php', ]); $consumer->subscribe(['test']); for ($i=0; $i<100; $i++) { $data = $consumer->poll(1000); echo json_encode($data), PHP_EOL; }
有关配置的详细描述,请访问 消费者配置。
PHP 特定配置
为了与 PHP 一起快速工作,此库引入了一些特定的配置选项。
metadata.cache.file
存储元数据的文件名,此文件将在生产环境中被有效缓存到 Opcode 缓存中。stream.async.connect
客户端是否应使用异步连接到代理。stream.persistent.connection
客户端是否应使用持久连接到集群。
对于从 web 请求发布事件,建议启用持久连接并配置元数据缓存路径。在这种情况下,发布事件的速度会尽可能快。