influxdata/influxdb-client-php

PHP的InfluxDB (v2+) 客户端库

3.6.0 2024-06-24 10:01 UTC

README

CircleCI codecov Packagist Version License GitHub issues GitHub pull requests PHP from Packagist Slack Status

本仓库包含InfluxDB 2.x的参考PHP客户端。

注意:使用此客户端库与InfluxDB 2.x和InfluxDB 1.8+(查看详情)一起使用。对于连接到InfluxDB 1.7或更早版本的实例,请使用influxdb-php客户端库。

文档

本节包含指向客户端库文档的链接。

安装

客户端不与HTTP客户端库(如Guzzle、Buzz等)紧密耦合。客户端使用通用抽象(PSR-7 - HTTP消息PSR-17 - HTTP工厂PSR-18 - HTTP客户端),这给您使用您喜欢的库的自由。

安装库

InfluxDB 2客户端捆绑在https://packagist.org.cn/上,并可以通过composer安装

composer require influxdata/influxdb-client-php guzzlehttp/guzzle

用法

创建客户端

使用InfluxDB2\Client创建连接到运行中的InfluxDB 2实例的客户端。

$client = new InfluxDB2\Client([
    "url" => "http://localhost:8086",
    "token" => "my-token",
    "bucket" => "my-bucket",
    "org" => "my-org",
    "precision" => InfluxDB2\Model\WritePrecision::NS
]);

客户端选项

自定义HTTP客户端

以下代码展示了如何使用和配置cURL HTTP客户端

通过composer安装依赖项
composer require influxdata/influxdb-client-php nyholm/psr7 php-http/curl-client
配置cURL客户端
$curlOptions = [
    CURLOPT_CONNECTTIMEOUT => 30, // The number of seconds to wait while trying to connect.
];
$curlClient = new Http\Client\Curl\Client(
    Http\Discovery\Psr17FactoryDiscovery::findRequestFactory(),
    Http\Discovery\Psr17FactoryDiscovery::findStreamFactory(),
    $curlOptions
);
初始化InfluxDB客户端
$client = new Client([
    "url" => "http://localhost:8086",
    "token" => "my-token",
    "bucket" => "my-bucket",
    "org" => "my-org",
    "httpClient" => $curlClient
]);

查询

QueryApi检索的结果可以格式化为

  1. 原始查询响应
  2. Flux数据结构:FluxTableFluxColumnFluxRecord
  3. FluxRecord

查询原始数据

同步执行Flux查询并返回未经处理的String结果

$this->client = new Client([
    "url" => "http://localhost:8086",
    "token" => "my-token",
    "bucket" => "my-bucket",
    "precision" => WritePrecision::NS,
    "org" => "my-org",
    "debug" => false
]);

$this->queryApi = $this->client->createQueryApi();

$result = $this->queryApi->queryRaw(
            'from(bucket:"my-bucket") |> range(start: 1970-01-01T00:00:00.000000001Z) |> last()');

同步查询

同步执行Flux查询并返回FluxTables的数组

$this->client = new Client([
    "url" => "http://localhost:8086",
    "token" => "my-token",
    "bucket" => "my-bucket",
    "precision" => WritePrecision::NS,
    "org" => "my-org",
    "debug" => false
]);

$this->queryApi = $this->client->createQueryApi();

$result = $this->queryApi->query(
            'from(bucket:"my-bucket") |> range(start: 1970-01-01T00:00:00.000000001Z) |> last()');

然后可以轻松使用json_encode进行编码

header('Content-type:application/json;charset=utf-8');
echo json_encode( $result, JSON_PRETTY_PRINT ) ;

查询流

同步执行Flux查询并返回FluxRecord

$this->client = new Client([
    "url" => "http://localhost:8086",
    "token" => "my-token",
    "bucket" => "my-bucket",
    "precision" => WritePrecision::NS,
    "org" => "my-org",
    "debug" => false
]);

$this->queryApi = $this->client->createQueryApi();

$parser = $this->queryApi->queryStream(
            'from(bucket:"my-bucket") |> range(start: 1970-01-01T00:00:00.000000001Z) |> last()');

foreach ($parser->each() as $record)
{
    ...
}

参数化查询

InfluxDB Cloud支持参数化查询,允许您使用InfluxDB API动态更改查询中的值。参数化查询使Flux查询更加可重用,并且也可以用来帮助防止注入攻击。

InfluxDB Cloud将params对象插入到Flux查询中,名为params的Flux记录。在您的Flux查询中使用点或方括号运算符访问params记录中的参数。参数化Flux查询仅支持intfloatstring数据类型。要将支持的数据类型转换为其他Flux基本数据类型,请使用Flux类型转换函数。

参数化查询示例

⚠️ 参数化查询目前仅在InfluxDB Cloud中支持,目前InfluxDB OSS中没有支持。

<?php
require __DIR__ . '/../vendor/autoload.php';

use InfluxDB2\Client;
use InfluxDB2\Model\Query;
use InfluxDB2\Point;
use InfluxDB2\WriteType as WriteType;

$url = "https://us-west-2-1.aws.cloud2.influxdata.com";
$organization = 'my-org';
$bucket = 'my-bucket';
$token = 'my-token';

$client = new Client([
    "url" => $url,
    "token" => $token,
    "bucket" => $bucket,
    "org" => $organization,
    "precision" => InfluxDB2\Model\WritePrecision::NS,
    "debug" => false
]);

$writeApi = $client->createWriteApi(["writeType" => WriteType::SYNCHRONOUS]);
$queryApi = $client->createQueryApi();

$today = new DateTime("now");
$yesterday = $today->sub(new DateInterval("P1D"));

$p = new Point("temperature");
$p->addTag("location", "north")->addField("value", 60)->time($yesterday);
$writeApi->write($p);
$writeApi->close();

//
// Query range start parameter using duration
//
$parameterizedQuery = "from(bucket: params.bucketParam) |> range(start: duration(v: params.startParam))";
$query = new Query();
$query->setQuery($parameterizedQuery);
$query->setParams(["bucketParam" => "my-bucket", "startParam" => "-1d"]);
$tables = $queryApi->query($query);

foreach ($tables as $table) {
    foreach ($table->records as $record) {
        var_export($record->values);
    }
}

//
// Query range start parameter using DateTime
//
$parameterizedQuery = "from(bucket: params.bucketParam) |> range(start: time(v: params.startParam))";
$query->setParams(["bucketParam" => "my-bucket", "startParam" => $yesterday]);
$query->setQuery($parameterizedQuery);
$tables = $queryApi->query($query);

foreach ($tables as $table) {
    foreach ($table->records as $record) {
        var_export($record->values);
    }
}

$client->close();

写入数据

WriteApi支持同步和批量写入InfluxDB 2.x。默认api使用同步写入。要启用批量写入,可以使用WriteOption。

$client = new InfluxDB2\Client(["url" => "http://localhost:8086", "token" => "my-token",
    "bucket" => "my-bucket",
    "org" => "my-org",
    "precision" => InfluxDB2\Model\WritePrecision::NS
]);
$write_api = $client->createWriteApi();
$write_api->write('h2o,location=west value=33i 15');

批量处理

写入将按批次处理,可以通过WriteOptions进行配置。

use InfluxDB2\Client;
use InfluxDB2\WriteType as WriteType;

$client = new Client(["url" => "http://localhost:8086", "token" => "my-token",
    "bucket" => "my-bucket",
    "org" => "my-org",
    "precision" => InfluxDB2\Model\WritePrecision::NS
]);

$writeApi = $client->createWriteApi(
    ["writeType" => WriteType::BATCHING, 'batchSize' => 1000]);

foreach (range(1, 10000) as $number) {
    $writeApi->write("mem,host=aws_europe,type=batch value=1i $number");
}

// flush remaining data
$writeApi->close();

时间精度

配置默认时间精度

$client = new InfluxDB2\Client(["url" => "http://localhost:8086", "token" => "my-token",
    "bucket" => "my-bucket",
    "org" => "my-org",
    "precision" => \InfluxDB2\Model\WritePrecision::NS
]);

配置每次写入的精度

$client = new InfluxDB2\Client([
    "url" => "http://localhost:8086",
    "token" => "my-token",
    "bucket" => "my-bucket",
    "org" => "my-org",
]);

$writeApi = $client->createWriteApi();
$writeApi->write('h2o,location=west value=33i 15', \InfluxDB2\Model\WritePrecision::MS);

精度的允许值有

  • WritePrecision::NS表示纳秒
  • WritePrecision::US表示微秒
  • WritePrecision::MS表示毫秒
  • WritePrecision::S表示秒

配置目的地

默认的bucketorganization目的地通过InfluxDB2\Client配置

$client = new InfluxDB2\Client([
    "url" => "http://localhost:8086",
    "token" => "my-token",
    "bucket" => "my-bucket",
]);

但也可以在每次写入时覆盖配置

$client = new InfluxDB2\Client(["url" => "http://localhost:8086", "token" => "my-token"]);

$writeApi = $client->createWriteApi();
$writeApi->write('h2o,location=west value=33i 15', \InfluxDB2\Model\WritePrecision::MS, "production-bucket", "customer-1");

数据格式

数据可以写入为以下格式:

  1. string,格式化为InfluxDB的行协议
  2. array,包含键:name、tags、fields和时间
  3. 数据点结构
  4. 上述项的Array
$client = new InfluxDB2\Client([
    "url" => "http://localhost:8086",
    "token" => "my-token",
    "bucket" => "my-bucket",
    "org" => "my-org",
    "precision" => InfluxDB2\Model\WritePrecision::US
]);

$writeApi = $client->createWriteApi();

//data in Point structure
$point=InfluxDB2\Point::measurement("h2o")
    ->addTag("location", "europe")
    ->addField("level",2)
    ->time(microtime(true));

$writeApi->write($point);

//data in array structure
$dataArray = ['name' => 'cpu', 
    'tags' => ['host' => 'server_nl', 'region' => 'us'],
    'fields' => ['internal' => 5, 'external' => 6],
    'time' => microtime(true)];

$writeApi->write($dataArray);

//write lineprotocol
$writeApi->write('h2o,location=west value=33i 15');

默认标签

有时在每次测量中存储相同的信息很有用,例如hostnamelocationcustomer。客户端能够使用静态值、应用程序设置或环境变量作为标签值。

表达式

  • California Miner - 静态值
  • ${env.hostname} - 环境属性
通过API
$this->client = new Client([
    "url" => "http://localhost:8086",
    "token" => "my-token",
    "bucket" => "my-bucket",
    "precision" => WritePrecision::NS,
    "org" => "my-org",
    "tags" => ['id' => '132-987-655', 
        'hostname' => '${env.Hostname}']
]);

$writeApi = $this->client->createWriteApi(null, ['data_center' => '${env.data_center}']);
    
$writeApi->pointSettings->addDefaultTag('customer', 'California Miner');

$point = Point::measurement('h2o')
            ->addTag('location', 'europe')
            ->addField('level', 2);

$this->writeApi->write($point);

高级用法

检查服务器状态

可以使用$client->ping();方法检查服务器可用性。这与influx ping等效。

InfluxDB 1.8 API 兼容性

InfluxDB 1.8.0引入了与InfluxDB 2.x兼容的API,这使得您能够轻松地从InfluxDB 1.x迁移到InfluxDB 2.x Cloud或开源版本。

以下是与InfluxDB 2.x兼容的API

详细信息请参阅InfluxDB 1.8示例

InfluxDB 2.x 管理API

InfluxDB 2.x API客户端使用influxdb-clients-apigen生成。源代码位于InfluxDB2\Service\InfluxDB2\Model\包中。

以下示例显示了如何使用OrganizationServiceBucketService创建新的bucket。

require __DIR__ . '/../vendor/autoload.php';

use InfluxDB2\Client;
use InfluxDB2\Model\BucketRetentionRules;
use InfluxDB2\Model\Organization;
use InfluxDB2\Model\PostBucketRequest;
use InfluxDB2\Service\BucketsService;
use InfluxDB2\Service\OrganizationsService;

$organization = 'my-org';
$bucket = 'my-bucket';
$token = 'my-token';

$client = new Client([
    "url" => "http://localhost:8086",
    "token" => $token,
    "bucket" => $bucket,
    "org" => $organization,
    "precision" => InfluxDB2\Model\WritePrecision::S
]);

function findMyOrg($client): ?Organization
{
    /** @var OrganizationsService $orgService */
    $orgService = $client->createService(OrganizationsService::class);
    $orgs = $orgService->getOrgs()->getOrgs();
    foreach ($orgs as $org) {
        if ($org->getName() == $client->options["org"]) {
            return $org;
        }
    }
    return null;
}

$bucketsService = $client->createService(BucketsService::class);

$rule = new BucketRetentionRules();
$rule->setEverySeconds(3600);

$bucketName = "example-bucket-" . microtime();
$bucketRequest = new PostBucketRequest();
$bucketRequest->setName($bucketName)
    ->setRetentionRules([$rule])
    ->setOrgId(findMyOrg($client)->getId());

//create bucket
$respBucket = $bucketsService->postBuckets($bucketRequest);
print $respBucket;

$client->close();

通过UDP写入

在执行时间至关重要以避免将指标发送到InfluxDB时潜在的延迟(甚至超时)的问题时,通过UDP发送将非常有用。
众所周知,与TCP(HTTP)不同,通过UDP发送不会等待响应。

UDP Writer需求

  1. 已安装ext-sockets
  2. 由于Influxdb 2.0+版本不支持UDP协议,您需要安装并配置Telegraf插件:https://docs.influxdb.org.cn/telegraf/v1.16/plugins/#socket_listener
  3. 传递给客户端的额外配置选项:udpPort。您可以选择指定udpHost,否则udpHost将解析自url选项
  4. 传递给客户端的额外配置选项:ipVersion。您可以选择指定IP版本,默认为IPv4
$client = new InfluxDB2\Client(["url" => "http://localhost:8086", "token" => "my-token",
    "bucket" => "my-bucket",
    "org" => "my-org",
    "precision" => InfluxDB2\Model\WritePrecision::NS,
    "udpPort" => 8094,
    "ipVersion" => 6,
]);
$writer = $client->createUdpWriter();
$writer->write('h2o,location=west value=33i 15');
$writer->close();

删除数据

DefaultService.php 支持从InfluxDB桶中删除

<?php
/**
 * Shows how to delete data from InfluxDB by client
 */
use InfluxDB2\Client;
use InfluxDB2\Model\DeletePredicateRequest;
use InfluxDB2\Service\DeleteService;

$url = 'http://localhost:8086';
$token = 'my-token';
$org = 'my-org';
$bucket = 'my-bucket';

$client = new Client([
    "url" => $url,
    "token" => $token,
    "bucket" => $bucket,
    "org" => $org,
    "precision" => InfluxDB2\Model\WritePrecision::S
]);

//
// Delete data by measurement and tag value
//
/** @var DeleteService $service */
$service = $client->createService(DeleteService::class);

$predicate = new DeletePredicateRequest();
$predicate->setStart(DateTime::createFromFormat('Y', '2020'));
$predicate->setStop(new DateTime());
$predicate->setPredicate("_measurement=\"mem\" AND host=\"host1\"");

$service->postDelete($predicate, null, $org, $bucket);

$client->close();

更多详情请参阅 DeleteDataExample.php

代理和重定向

您可以通过两种方式在代理后面配置InfluxDB PHP客户端

1. 使用环境变量

根据您的服务器URL的方案设置环境变量 HTTP_PROXYHTTPS_PROXY。更多信息请参阅Guzzle文档 - 环境变量

2. 通过选项配置客户端以使用代理

您可以在创建客户端时传递一个 proxy 配置

$client = new InfluxDB2\Client([
  "url" => "http://localhost:8086", 
  "token" => "my-token",
  "bucket" => "my-bucket",
  "org" => "my-org",
  "proxy" => "http://192.168.16.1:10",
]);

更多信息请参阅Guzzle文档 - 代理

重定向

客户端会自动遵循HTTP重定向。您可以通过 allow_redirects 配置来配置重定向行为

$client = new InfluxDB2\Client([
  "url" => "http://localhost:8086", 
  "token" => "my-token",
  "bucket" => "my-bucket",
  "org" => "my-org",
  "allow_redirects" => false,
]);

更多信息请参阅重定向插件文档 - allow_redirects

本地测试

运行一次以安装依赖项

make deps

运行单元测试和集成测试

make test

贡献

欢迎在GitHub上提交错误报告和拉取请求,网址为 https://github.com/influxdata/influxdb-client-php

许可

该软件包在MIT许可协议下作为开源软件提供。