magnusjt/mongotd

Mongodb时序数据库

安装: 68

依赖项: 0

建议者: 0

安全: 0

星标: 0

关注者: 2

分支: 0

开放问题: 0

类型:项目

2.0.0 2015-12-04 09:25 UTC

This package is not auto-updated.

Last update: 2024-09-28 18:47:40 UTC


README

这是一个基于mongodb的时序数据库的PHP实现。它负责存储时序数据,并使用不同的技术(目前有3-sigma、Holt-winters或Kolmogorov-Smirnov)对这些数据执行异常检测。

时序数据使用sid(传感器ID,即数据表示什么)和nid(节点ID,即数据收集自何处)进行存储。这两个ID共同唯一地表示一个时序。时序可能是仪表或增量型。在后一种情况下,连续的值从彼此中减去以获得变化率。

  • 仪表计数器的示例包括温度、当前访客数等。
  • 增量计数器的示例包括累积访客数、累积请求数等。

为了提高存储效率,数据以一天为一个块进行存储,使用哈希键为午夜以来的秒数。这避免了为每个数据点存储元数据。

在检索数据时,可以在时间(每小时、每天等的总和/平均值/最大值/最小值)、空间(某些节点上的总和/平均值/最大值/最小值)或给定公式上执行聚合。但请注意,没有进行预聚合。这是为了避免时区问题(即某些天可能包含不同的小时,具体取决于查看数据时所在的时区)。

安装

使用composer安装

composer require magnusjt/mongotd

基本用法

首先我们获取一个mongodb连接并初始化mongotd。

$host      = '127.0.0.1'; // Or mongodb connection string
$dbName    = 'mongotd';
$colPrefix = 'mongotd'; // All collections used by mongotd will have this prefix and an underscore

$conn = new \Mongotd\Connection($host, $dbName, $colPrefix);

关于传感器ID和节点ID

传感器ID和节点ID存储为数据库中的字符串。因此,提供的任何ID都必须可转换为字符串。

关于插入的值

插入的值总是整数或双精度浮点数,并且必须可转换为这些数据类型。

插入数据

通过一组存储中间件来执行数据插入。您可以选择所需的中间件,甚至可以创建自己的中间件。在最极端的情况下,实际上可以替换mongodb为另一个数据库。

例如

$storage = new Storage();
$storage->addMiddleware(new FilterCounterValues());
$storage->addMiddleware(new CalculateDeltas($conn, new Logger(null), Resolution::FIVE_MINUTES));
$storage->addMiddleware(new InsertCounterValues($conn));
$storage->addMiddleware(new FindAnomaliesUsingSigmaTest($conn));
$storage->addMiddleware(new StoreAnomalies($conn));

通过CounterValue数组存储时序数据

$storage->store([
    new CounterValue($sid, $nid, $datetime, $value, $isIncremental)
]);

检索数据

由于有那么多处理时序数据的方法,我们使用管道解决方案来处理原始值。管道可以以许多不同的方式组装,并且可以根据您的需求添加新的或修改过的处理步骤。

以下是一些使用默认管道可以执行的操作的示例

通过总和、平均值、最大值、最小值将值汇总为日值

$start = new DateTime('2015-11-01 00:00:00');
$end = new DateTime('2015-12-01 00:00:00');
DateTimeHelper::normalizeTimeRange($start, $end, Resolution::DAY);

$sequence = [
    new Find($conn, $sid, $nid, $start, $end),
    new RollupTime(Resolution::DAY, Aggregation::Sum),
    new Pad(Resolution::DAY, $start, $end)
];

$pipeline = new Pipeline();
$series = $pipeline->run($sequence);

使用总和、平均值、最大值、最小值组合不同的系列

$sequence = [
    [
        [
            new Find($conn, $sid, $nid1, $start, $end),
            new RollupTime(Resolution::FIVE_MINUTES, Aggregation::SUM),
            new Pad(Resolution::FIVE_MINUTES, $start, $end)
        ],
        [
            new Find($conn, $sid, $nid2, $start, $end),
            new RollupTime(Resolution::FIVE_MINUTES, Aggregation::SUM),
            new Pad(Resolution::FIVE_MINUTES, $start, $end)
        ]
    ],
    new RollupSpace(Aggregation::SUM),
    new RollupTime(Resolution::DAY, Aggregation::SUM),
    new Pad(Resolution::DAY, $start, $end)
];
$pipeline = new Pipeline();
$series = $pipeline->run($sequence);

在这个例子中,您可以看到我们首先将每个系列汇总,然后将其组合成一个系列,最后将这个结果汇总为日系列。

在这个例子中,我们只是将所有值相加,因此操作顺序并不重要。然而,如果您使用平均/最大/最小之类的聚合,顺序就很重要了。通过使用管道,您可以对操作顺序有完全的控制。

例如

使用公式组合不同的系列

示例

$formula = '([sid=1,nid=1,agg=1] / [sid=2,nid=1,agg=1]) * 100';

$parser = new Parser();
$ast = $parser->parse($formula);

$astEvaluator = new AstEvaluator();
$astEvaluator->setVariableEvaluatorCallback(function($options) use($conn, $start, $end)){
    $pipeline = new Pipeline();
    return $pipeline->run([
        new Find($conn, $options['sid'], $options['nid'], $start, $end),
        new RollupTime(Resolution::FIVE_MINUTES, $options['agg']),
        new Pad(Resolution::FIVE_MINUTES, $start, $end)
    ])->vals;
]);

$sequence = [
    new Formula($ast, $astEvaluator),
    new RollupTime(Resolution::DAY, Aggregation::SUM),
    new Pad(Resolution::DAY, $start, $end)
];
$pipeline = new Pipeline();
$series = $pipeline->run($sequence);

管道工厂

可以使用Mongotd\Pipeline\Factory类创建一些默认的管道序列。当然,也可以创建自定义工厂。

关于时区怎么办?

时区在处理时间序列时是个大麻烦。仅仅确定时间序列的时区偏移量是不够的,因为夏令时可能会在序列持续期间改变偏移量。我们可以使用一些日期时间库来减轻一些痛苦,但问题变成了性能。因此,在这个项目中,我们尽可能地使用时间戳,只有在绝对需要时才考虑时区。我们需要考虑时区最常见的地方是在时间汇总时。在这里,我们在计算中对每个时间戳进行偏移,使得当结果转换为给定时区的日期时间时,值将正好落在期望的分辨率步骤上。

寻找异常数据

在插入新数据时会发现异常。为了实现这一点,我们需要决定使用哪种异常测试。

使用3-sigma方法的示例

$sigmaTest = new FindAnomaliesUsingSigmaTest($conn);
$sigmaTest->setDaysToScan(20); // How many days in the past to use for comparison

// Instead of looking at individual values,
// look at the average value within 300 seconds
$sigmaTest->setWindowLength(300);

$sigmaTest->setMinPrevDataPoints(14); // How many data points are required before we bother doing the scan
$sigmaTest->setMinCurrDataPoints(1); // How many data points we need for evaluation

// The treshold for 3 sigma.
// If the data is normally distributed,
// a value of 3 gives anomalies if the value is outside of the 99.7 percentile
$sigmaTest->setScoreTreshold(3);

// Add sigma test as middleware
$storage->addMiddleware($sigmaTest);

// Store the anomalies as well
$storage->addMiddleware(new StoreAnomalies($conn));

3-sigma方法取了与当前时间相同时间点的所有值的平均值和标准差。它将这个平均值与当前值进行比较,使用3*STD作为相对于平均值的允许偏差。

3-sigma方法的缺点是它假设值是正态分布的(这并不总是如此)。在这种情况下,它还假设值遵循每日轮廓,即同一时间点的值很可能相同。最后,该方法需要回溯很长时间并检索值,这可能会很慢。

寻找异常的另一种方法是Holt-Winters算法。它和3-sigma方法有同样的缺点,即数据应该是正态分布的,并遵循时间轮廓。然而,它使用指数平滑而不是精确计算平均值和标准差。因此,它应该要快得多。

使用Holt-Winters的示例

$hwTest = new FindAnomaliesUsingHwTest($conn);
$hwTest->setMinDaysScanned(20);
$hwTest->setHwAlpha(0.05); // Smoothing factor for average
$hwTest->setHwBeta(0.005); // Smoothing factor for linear increase
$hwTest->setHwGamma(0.1);  // Smoothing factor std
$hwTest->setScoreTreshold(3); // Same as 3-sigma
$hwTest->setSeasonLength(1440); // Season length in minutes (daily season by default)

$storage->addMiddleware($hwTest);
$storage->addMiddleware(new StoreAnomalies($conn));

检索异常

使用管道,我们还可以找到在插入时检测到的已存储异常。

此外,我们还有一个辅助序列,用于在给定时间段内找到异常状态。状态是一个时间戳 => 0|1的数组,汇总到期望的分辨率。

$from = new DateTime('-1 day');
$to = new DateTime('now');
$nids = array(); // Limit results to these nids, no limit if empty
$sids = array(); // Limit results to these sids, no limit if empty
$minNumberOfAnomalies = 1;
$maxResults = 20;

$sequence = [
    new FindAnomalies($conn, $start, $end, $nids, $sids, $minCount, $limit),
    new AddAnomalyState($start, $end, Resolution::FIVE_MINUTES)
];

$pipeline = new Pipeline();
$result = $pipeline->run($sequence);

/*
$results is now an array of the following form:

$result[] = array(
    'nid' => $nid,
    'sid' => $sid,
    'count' => $count,
    'anomalies' => $anomalies,
    'state' => $state // Added by Mongotd\Pipeline\AddAnomalyState
);

Where $anomalies is an array of objects of the class Mongotd\Anomaly
$count is just the length of the $anomalies array.
*/