magnusjt / mongotd
Mongodb时序数据库
Requires
- psr/log: 1.0.0
Requires (Dev)
- monolog/monolog: 1.10.0
- phpunit/phpunit: 4.2.*
This package is not auto-updated.
Last update: 2024-09-28 18:47:40 UTC
README
这是一个基于mongodb的时序数据库的PHP实现。它负责存储时序数据,并使用不同的技术(目前有3-sigma、Holt-winters或Kolmogorov-Smirnov)对这些数据执行异常检测。
时序数据使用sid(传感器ID,即数据表示什么)和nid(节点ID,即数据收集自何处)进行存储。这两个ID共同唯一地表示一个时序。时序可能是仪表或增量型。在后一种情况下,连续的值从彼此中减去以获得变化率。
- 仪表计数器的示例包括温度、当前访客数等。
- 增量计数器的示例包括累积访客数、累积请求数等。
为了提高存储效率,数据以一天为一个块进行存储,使用哈希键为午夜以来的秒数。这避免了为每个数据点存储元数据。
在检索数据时,可以在时间(每小时、每天等的总和/平均值/最大值/最小值)、空间(某些节点上的总和/平均值/最大值/最小值)或给定公式上执行聚合。但请注意,没有进行预聚合。这是为了避免时区问题(即某些天可能包含不同的小时,具体取决于查看数据时所在的时区)。
安装
使用composer安装
composer require magnusjt/mongotd
基本用法
首先我们获取一个mongodb连接并初始化mongotd。
$host = '127.0.0.1'; // Or mongodb connection string $dbName = 'mongotd'; $colPrefix = 'mongotd'; // All collections used by mongotd will have this prefix and an underscore $conn = new \Mongotd\Connection($host, $dbName, $colPrefix);
关于传感器ID和节点ID
传感器ID和节点ID存储为数据库中的字符串。因此,提供的任何ID都必须可转换为字符串。
关于插入的值
插入的值总是整数或双精度浮点数,并且必须可转换为这些数据类型。
插入数据
通过一组存储中间件来执行数据插入。您可以选择所需的中间件,甚至可以创建自己的中间件。在最极端的情况下,实际上可以替换mongodb为另一个数据库。
例如
$storage = new Storage();
$storage->addMiddleware(new FilterCounterValues());
$storage->addMiddleware(new CalculateDeltas($conn, new Logger(null), Resolution::FIVE_MINUTES));
$storage->addMiddleware(new InsertCounterValues($conn));
$storage->addMiddleware(new FindAnomaliesUsingSigmaTest($conn));
$storage->addMiddleware(new StoreAnomalies($conn));
通过CounterValue数组存储时序数据
$storage->store([
new CounterValue($sid, $nid, $datetime, $value, $isIncremental)
]);
检索数据
由于有那么多处理时序数据的方法,我们使用管道解决方案来处理原始值。管道可以以许多不同的方式组装,并且可以根据您的需求添加新的或修改过的处理步骤。
以下是一些使用默认管道可以执行的操作的示例
通过总和、平均值、最大值、最小值将值汇总为日值
$start = new DateTime('2015-11-01 00:00:00');
$end = new DateTime('2015-12-01 00:00:00');
DateTimeHelper::normalizeTimeRange($start, $end, Resolution::DAY);
$sequence = [
new Find($conn, $sid, $nid, $start, $end),
new RollupTime(Resolution::DAY, Aggregation::Sum),
new Pad(Resolution::DAY, $start, $end)
];
$pipeline = new Pipeline();
$series = $pipeline->run($sequence);
使用总和、平均值、最大值、最小值组合不同的系列
$sequence = [
[
[
new Find($conn, $sid, $nid1, $start, $end),
new RollupTime(Resolution::FIVE_MINUTES, Aggregation::SUM),
new Pad(Resolution::FIVE_MINUTES, $start, $end)
],
[
new Find($conn, $sid, $nid2, $start, $end),
new RollupTime(Resolution::FIVE_MINUTES, Aggregation::SUM),
new Pad(Resolution::FIVE_MINUTES, $start, $end)
]
],
new RollupSpace(Aggregation::SUM),
new RollupTime(Resolution::DAY, Aggregation::SUM),
new Pad(Resolution::DAY, $start, $end)
];
$pipeline = new Pipeline();
$series = $pipeline->run($sequence);
在这个例子中,您可以看到我们首先将每个系列汇总,然后将其组合成一个系列,最后将这个结果汇总为日系列。
在这个例子中,我们只是将所有值相加,因此操作顺序并不重要。然而,如果您使用平均/最大/最小之类的聚合,顺序就很重要了。通过使用管道,您可以对操作顺序有完全的控制。
例如
使用公式组合不同的系列
示例
$formula = '([sid=1,nid=1,agg=1] / [sid=2,nid=1,agg=1]) * 100';
$parser = new Parser();
$ast = $parser->parse($formula);
$astEvaluator = new AstEvaluator();
$astEvaluator->setVariableEvaluatorCallback(function($options) use($conn, $start, $end)){
$pipeline = new Pipeline();
return $pipeline->run([
new Find($conn, $options['sid'], $options['nid'], $start, $end),
new RollupTime(Resolution::FIVE_MINUTES, $options['agg']),
new Pad(Resolution::FIVE_MINUTES, $start, $end)
])->vals;
]);
$sequence = [
new Formula($ast, $astEvaluator),
new RollupTime(Resolution::DAY, Aggregation::SUM),
new Pad(Resolution::DAY, $start, $end)
];
$pipeline = new Pipeline();
$series = $pipeline->run($sequence);
管道工厂
可以使用Mongotd\Pipeline\Factory类创建一些默认的管道序列。当然,也可以创建自定义工厂。
关于时区怎么办?
时区在处理时间序列时是个大麻烦。仅仅确定时间序列的时区偏移量是不够的,因为夏令时可能会在序列持续期间改变偏移量。我们可以使用一些日期时间库来减轻一些痛苦,但问题变成了性能。因此,在这个项目中,我们尽可能地使用时间戳,只有在绝对需要时才考虑时区。我们需要考虑时区最常见的地方是在时间汇总时。在这里,我们在计算中对每个时间戳进行偏移,使得当结果转换为给定时区的日期时间时,值将正好落在期望的分辨率步骤上。
寻找异常数据
在插入新数据时会发现异常。为了实现这一点,我们需要决定使用哪种异常测试。
使用3-sigma方法的示例
$sigmaTest = new FindAnomaliesUsingSigmaTest($conn);
$sigmaTest->setDaysToScan(20); // How many days in the past to use for comparison
// Instead of looking at individual values,
// look at the average value within 300 seconds
$sigmaTest->setWindowLength(300);
$sigmaTest->setMinPrevDataPoints(14); // How many data points are required before we bother doing the scan
$sigmaTest->setMinCurrDataPoints(1); // How many data points we need for evaluation
// The treshold for 3 sigma.
// If the data is normally distributed,
// a value of 3 gives anomalies if the value is outside of the 99.7 percentile
$sigmaTest->setScoreTreshold(3);
// Add sigma test as middleware
$storage->addMiddleware($sigmaTest);
// Store the anomalies as well
$storage->addMiddleware(new StoreAnomalies($conn));
3-sigma方法取了与当前时间相同时间点的所有值的平均值和标准差。它将这个平均值与当前值进行比较,使用3*STD作为相对于平均值的允许偏差。
3-sigma方法的缺点是它假设值是正态分布的(这并不总是如此)。在这种情况下,它还假设值遵循每日轮廓,即同一时间点的值很可能相同。最后,该方法需要回溯很长时间并检索值,这可能会很慢。
寻找异常的另一种方法是Holt-Winters算法。它和3-sigma方法有同样的缺点,即数据应该是正态分布的,并遵循时间轮廓。然而,它使用指数平滑而不是精确计算平均值和标准差。因此,它应该要快得多。
使用Holt-Winters的示例
$hwTest = new FindAnomaliesUsingHwTest($conn);
$hwTest->setMinDaysScanned(20);
$hwTest->setHwAlpha(0.05); // Smoothing factor for average
$hwTest->setHwBeta(0.005); // Smoothing factor for linear increase
$hwTest->setHwGamma(0.1); // Smoothing factor std
$hwTest->setScoreTreshold(3); // Same as 3-sigma
$hwTest->setSeasonLength(1440); // Season length in minutes (daily season by default)
$storage->addMiddleware($hwTest);
$storage->addMiddleware(new StoreAnomalies($conn));
检索异常
使用管道,我们还可以找到在插入时检测到的已存储异常。
此外,我们还有一个辅助序列,用于在给定时间段内找到异常状态。状态是一个时间戳 => 0|1的数组,汇总到期望的分辨率。
$from = new DateTime('-1 day'); $to = new DateTime('now'); $nids = array(); // Limit results to these nids, no limit if empty $sids = array(); // Limit results to these sids, no limit if empty $minNumberOfAnomalies = 1; $maxResults = 20; $sequence = [ new FindAnomalies($conn, $start, $end, $nids, $sids, $minCount, $limit), new AddAnomalyState($start, $end, Resolution::FIVE_MINUTES) ]; $pipeline = new Pipeline(); $result = $pipeline->run($sequence); /* $results is now an array of the following form: $result[] = array( 'nid' => $nid, 'sid' => $sid, 'count' => $count, 'anomalies' => $anomalies, 'state' => $state // Added by Mongotd\Pipeline\AddAnomalyState ); Where $anomalies is an array of objects of the class Mongotd\Anomaly $count is just the length of the $anomalies array. */