16 changed files with 480 additions and 84 deletions
@ -0,0 +1,238 @@ |
|||
/* |
|||
* Copyright © 2016-2017 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
|
|||
export default class DataAggregator { |
|||
|
|||
constructor(onDataCb, limit, aggregationType, timeWindow, types, $timeout, $filter) { |
|||
this.onDataCb = onDataCb; |
|||
this.aggregationType = aggregationType; |
|||
this.types = types; |
|||
this.$timeout = $timeout; |
|||
this.$filter = $filter; |
|||
this.dataReceived = false; |
|||
this.noAggregation = aggregationType === types.aggregation.none.value; |
|||
var interval = Math.floor(timeWindow / limit); |
|||
if (!this.noAggregation) { |
|||
this.interval = Math.max(interval, 1000); |
|||
this.limit = Math.ceil(interval/this.interval * limit); |
|||
this.timeWindow = this.interval * this.limit; |
|||
} else { |
|||
this.limit = limit; |
|||
this.timeWindow = interval * this.limit; |
|||
this.interval = 1000; |
|||
} |
|||
this.aggregationTimeout = this.interval; |
|||
switch (aggregationType) { |
|||
case types.aggregation.min.value: |
|||
this.aggFunction = min; |
|||
break; |
|||
case types.aggregation.max.value: |
|||
this.aggFunction = max |
|||
break; |
|||
case types.aggregation.avg.value: |
|||
this.aggFunction = avg; |
|||
break; |
|||
case types.aggregation.sum.value: |
|||
this.aggFunction = sum; |
|||
break; |
|||
case types.aggregation.count.value: |
|||
this.aggFunction = count; |
|||
break; |
|||
case types.aggregation.none.value: |
|||
this.aggFunction = none; |
|||
break; |
|||
default: |
|||
this.aggFunction = avg; |
|||
} |
|||
} |
|||
|
|||
onData(data) { |
|||
if (!this.dataReceived) { |
|||
this.elapsed = 0; |
|||
this.dataReceived = true; |
|||
this.startTs = data.serverStartTs; |
|||
this.endTs = this.startTs + this.timeWindow; |
|||
this.aggregationMap = processAggregatedData(data.data, this.aggregationType === this.types.aggregation.count.value, this.noAggregation); |
|||
this.onInterval(currentTime()); |
|||
} else { |
|||
updateAggregatedData(this.aggregationMap, this.aggregationType === this.types.aggregation.count.value, |
|||
this.noAggregation, this.aggFunction, data.data, this.interval, this.startTs); |
|||
} |
|||
} |
|||
|
|||
onInterval(startedTime) { |
|||
var now = currentTime(); |
|||
this.elapsed += now - startedTime; |
|||
if (this.intervalTimeoutHandle) { |
|||
this.$timeout.cancel(this.intervalTimeoutHandle); |
|||
this.intervalTimeoutHandle = null; |
|||
} |
|||
var delta = Math.floor(this.elapsed / this.interval); |
|||
if (delta || !this.data) { |
|||
this.startTs += delta * this.interval; |
|||
this.endTs += delta * this.interval; |
|||
this.data = toData(this.aggregationMap, this.startTs, this.endTs, this.$filter, this.limit); |
|||
this.elapsed = this.elapsed - delta * this.interval; |
|||
} |
|||
if (this.onDataCb) { |
|||
this.onDataCb(this.data, this.startTs, this.endTs); |
|||
} |
|||
|
|||
var self = this; |
|||
this.intervalTimeoutHandle = this.$timeout(function() { |
|||
self.onInterval(now); |
|||
}, this.aggregationTimeout, false); |
|||
} |
|||
|
|||
reset() { |
|||
this.destroy(); |
|||
this.dataReceived = false; |
|||
} |
|||
|
|||
destroy() { |
|||
if (this.intervalTimeoutHandle) { |
|||
this.$timeout.cancel(this.intervalTimeoutHandle); |
|||
this.intervalTimeoutHandle = null; |
|||
} |
|||
this.aggregationMap = null; |
|||
} |
|||
|
|||
} |
|||
|
|||
/* eslint-disable */ |
|||
function currentTime() { |
|||
return window.performance && window.performance.now ? |
|||
window.performance.now() : Date.now(); |
|||
} |
|||
/* eslint-enable */ |
|||
|
|||
function processAggregatedData(data, isCount, noAggregation) { |
|||
var aggregationMap = {}; |
|||
for (var key in data) { |
|||
var aggKeyData = aggregationMap[key]; |
|||
if (!aggKeyData) { |
|||
aggKeyData = {}; |
|||
aggregationMap[key] = aggKeyData; |
|||
} |
|||
var keyData = data[key]; |
|||
for (var i in keyData) { |
|||
var kvPair = keyData[i]; |
|||
var timestamp = kvPair[0]; |
|||
var value = convertValue(kvPair[1], noAggregation); |
|||
var aggKey = timestamp; |
|||
var aggData = { |
|||
count: isCount ? value : 1, |
|||
sum: value, |
|||
aggValue: value |
|||
} |
|||
aggKeyData[aggKey] = aggData; |
|||
} |
|||
} |
|||
return aggregationMap; |
|||
} |
|||
|
|||
function updateAggregatedData(aggregationMap, isCount, noAggregation, aggFunction, data, interval, startTs) { |
|||
for (var key in data) { |
|||
var aggKeyData = aggregationMap[key]; |
|||
if (!aggKeyData) { |
|||
aggKeyData = {}; |
|||
aggregationMap[key] = aggKeyData; |
|||
} |
|||
var keyData = data[key]; |
|||
for (var i in keyData) { |
|||
var kvPair = keyData[i]; |
|||
var timestamp = kvPair[0]; |
|||
var value = convertValue(kvPair[1], noAggregation); |
|||
var aggTimestamp = noAggregation ? timestamp : (startTs + Math.floor((timestamp - startTs) / interval) * interval + interval/2); |
|||
var aggData = aggKeyData[aggTimestamp]; |
|||
if (!aggData) { |
|||
aggData = { |
|||
count: 1, |
|||
sum: value, |
|||
aggValue: isCount ? 1 : value |
|||
} |
|||
aggKeyData[aggTimestamp] = aggData; |
|||
} else { |
|||
aggFunction(aggData, value); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
function toData(aggregationMap, startTs, endTs, $filter, limit) { |
|||
var data = {}; |
|||
for (var key in aggregationMap) { |
|||
if (!data[key]) { |
|||
data[key] = []; |
|||
} |
|||
var aggKeyData = aggregationMap[key]; |
|||
var keyData = data[key]; |
|||
for (var aggTimestamp in aggKeyData) { |
|||
if (aggTimestamp <= startTs) { |
|||
delete aggKeyData[aggTimestamp]; |
|||
} else if (aggTimestamp <= endTs) { |
|||
var aggData = aggKeyData[aggTimestamp]; |
|||
var kvPair = [aggTimestamp, aggData.aggValue]; |
|||
keyData.push(kvPair); |
|||
} |
|||
} |
|||
keyData = $filter('orderBy')(keyData, '+this[0]'); |
|||
if (keyData.length > limit) { |
|||
keyData = keyData.slice(keyData.length - limit); |
|||
} |
|||
data[key] = keyData; |
|||
} |
|||
return data; |
|||
} |
|||
|
|||
function convertValue(value, noAggregation) { |
|||
if (!noAggregation || value && isNumeric(value)) { |
|||
return Number(value); |
|||
} else { |
|||
return value; |
|||
} |
|||
} |
|||
|
|||
function isNumeric(value) { |
|||
return (value - parseFloat( value ) + 1) >= 0; |
|||
} |
|||
|
|||
function avg(aggData, value) { |
|||
aggData.count++; |
|||
aggData.sum += value; |
|||
aggData.aggValue = aggData.sum / aggData.count; |
|||
} |
|||
|
|||
function min(aggData, value) { |
|||
aggData.aggValue = Math.min(aggData.aggValue, value); |
|||
} |
|||
|
|||
function max(aggData, value) { |
|||
aggData.aggValue = Math.max(aggData.aggValue, value); |
|||
} |
|||
|
|||
function sum(aggData, value) { |
|||
aggData.aggValue = aggData.aggValue + value; |
|||
} |
|||
|
|||
function count(aggData) { |
|||
aggData.count++; |
|||
aggData.aggValue = aggData.count; |
|||
} |
|||
|
|||
function none(aggData, value) { |
|||
aggData.aggValue = value; |
|||
} |
|||
Loading…
Reference in new issue