|
|
|
@ -23,6 +23,8 @@ export default angular.module('thingsboard.api.datasource', [thingsboardApiDevic |
|
|
|
.factory('datasourceService', DatasourceService) |
|
|
|
.name; |
|
|
|
|
|
|
|
const YEAR = 1000 * 60 * 60 * 24 * 365; |
|
|
|
|
|
|
|
/*@ngInject*/ |
|
|
|
function DatasourceService($timeout, $filter, $log, telemetryWebsocketService, types, utils) { |
|
|
|
|
|
|
|
@ -103,7 +105,7 @@ function DatasourceSubscription(datasourceSubscription, telemetryWebsocketServic |
|
|
|
var datasourceType = datasourceSubscription.datasourceType; |
|
|
|
var datasourceData = {}; |
|
|
|
var dataKeys = {}; |
|
|
|
var subscribers = {}; |
|
|
|
var subscribers = []; |
|
|
|
var history = datasourceSubscription.subscriptionTimewindow && |
|
|
|
datasourceSubscription.subscriptionTimewindow.fixedWindow; |
|
|
|
var realtime = datasourceSubscription.subscriptionTimewindow && |
|
|
|
@ -247,7 +249,6 @@ function DatasourceSubscription(datasourceSubscription, telemetryWebsocketServic |
|
|
|
if (tsKeys.length > 0) { |
|
|
|
|
|
|
|
var subscriber; |
|
|
|
var subscriptionCommand; |
|
|
|
|
|
|
|
if (history) { |
|
|
|
|
|
|
|
@ -263,41 +264,103 @@ function DatasourceSubscription(datasourceSubscription, telemetryWebsocketServic |
|
|
|
}; |
|
|
|
|
|
|
|
subscriber = { |
|
|
|
historyCommand: historyCommand, |
|
|
|
historyCommands: [ historyCommand ], |
|
|
|
type: types.dataKeyType.timeseries, |
|
|
|
onData: function (data) { |
|
|
|
if (data.data) { |
|
|
|
for (var key in data.data) { |
|
|
|
subsTw: subsTw |
|
|
|
}; |
|
|
|
|
|
|
|
if (subsTw.aggregation.stateData) { |
|
|
|
subscriber.firstStateHistoryCommand = createFirstStateHistoryCommand(subsTw.fixedWindow.startTimeMs, tsKeys); |
|
|
|
subscriber.historyCommands.push(subscriber.firstStateHistoryCommand); |
|
|
|
} |
|
|
|
|
|
|
|
subscriber.onData = function (data, subscriptionId) { |
|
|
|
if (this.subsTw.aggregation.stateData && |
|
|
|
this.firstStateHistoryCommand && this.firstStateHistoryCommand.cmdId == subscriptionId) { |
|
|
|
if (this.data) { |
|
|
|
onStateHistoryData(data, this.data, this.subsTw.aggregation.limit, |
|
|
|
subsTw.fixedWindow.startTimeMs, this.subsTw.fixedWindow.endTimeMs, |
|
|
|
(data) => { |
|
|
|
onData(data.data, types.dataKeyType.timeseries, true); |
|
|
|
}); |
|
|
|
} else { |
|
|
|
this.firstStateData = data; |
|
|
|
} |
|
|
|
} else { |
|
|
|
if (this.subsTw.aggregation.stateData) { |
|
|
|
if (this.firstStateData) { |
|
|
|
onStateHistoryData(this.firstStateData, data, this.subsTw.aggregation.limit, |
|
|
|
this.subsTw.fixedWindow.startTimeMs, this.subsTw.fixedWindow.endTimeMs, |
|
|
|
(data) => { |
|
|
|
onData(data.data, types.dataKeyType.timeseries, true); |
|
|
|
}); |
|
|
|
} else { |
|
|
|
this.data = data; |
|
|
|
} |
|
|
|
} else { |
|
|
|
for (key in data.data) { |
|
|
|
var keyData = data.data[key]; |
|
|
|
data.data[key] = $filter('orderBy')(keyData, '+this[0]'); |
|
|
|
} |
|
|
|
onData(data.data, types.dataKeyType.timeseries, true); |
|
|
|
} |
|
|
|
}, |
|
|
|
onReconnected: function() {} |
|
|
|
} |
|
|
|
}; |
|
|
|
|
|
|
|
subscriber.onReconnected = function() {}; |
|
|
|
telemetryWebsocketService.subscribe(subscriber); |
|
|
|
subscribers[subscriber.historyCommand.cmdId] = subscriber; |
|
|
|
subscribers.push(subscriber); |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
subscriptionCommand = { |
|
|
|
var subscriptionCommand = { |
|
|
|
entityType: datasourceSubscription.entityType, |
|
|
|
entityId: datasourceSubscription.entityId, |
|
|
|
keys: tsKeys |
|
|
|
}; |
|
|
|
|
|
|
|
subscriber = { |
|
|
|
subscriptionCommand: subscriptionCommand, |
|
|
|
subscriptionCommands: [subscriptionCommand], |
|
|
|
type: types.dataKeyType.timeseries |
|
|
|
}; |
|
|
|
|
|
|
|
if (datasourceSubscription.type === types.widgetType.timeseries.value) { |
|
|
|
subscriber.subsTw = subsTw; |
|
|
|
updateRealtimeSubscriptionCommand(subscriptionCommand, subsTw); |
|
|
|
|
|
|
|
if (subsTw.aggregation.stateData) { |
|
|
|
subscriber.firstStateSubscriptionCommand = createFirstStateHistoryCommand(subsTw.startTs, tsKeys); |
|
|
|
subscriber.historyCommands = [subscriber.firstStateSubscriptionCommand]; |
|
|
|
} |
|
|
|
dataAggregator = createRealtimeDataAggregator(subsTw, tsKeyNames, types.dataKeyType.timeseries); |
|
|
|
subscriber.onData = function(data) { |
|
|
|
dataAggregator.onData(data, false, false, true); |
|
|
|
subscriber.onData = function(data, subscriptionId) { |
|
|
|
if (this.subsTw.aggregation.stateData && |
|
|
|
this.firstStateSubscriptionCommand && this.firstStateSubscriptionCommand.cmdId == subscriptionId) { |
|
|
|
if (this.data) { |
|
|
|
onStateHistoryData(data, this.data, this.subsTw.aggregation.limit, |
|
|
|
this.subsTw.startTs, this.subsTw.startTs + this.subsTw.aggregation.timeWindow, |
|
|
|
(data) => { |
|
|
|
dataAggregator.onData(data, false, false, true); |
|
|
|
}); |
|
|
|
this.stateDataReceived = true; |
|
|
|
} else { |
|
|
|
this.firstStateData = data; |
|
|
|
} |
|
|
|
} else { |
|
|
|
if (this.subsTw.aggregation.stateData && !this.stateDataReceived) { |
|
|
|
if (this.firstStateData) { |
|
|
|
onStateHistoryData(this.firstStateData, data, this.subsTw.aggregation.limit, |
|
|
|
this.subsTw.startTs, this.subsTw.startTs + this.subsTw.aggregation.timeWindow, |
|
|
|
(data) => { |
|
|
|
dataAggregator.onData(data, false, false, true); |
|
|
|
}); |
|
|
|
this.stateDataReceived = true; |
|
|
|
} else { |
|
|
|
this.data = data; |
|
|
|
} |
|
|
|
} else { |
|
|
|
dataAggregator.onData(data, false, false, true); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
subscriber.onReconnected = function() { |
|
|
|
var newSubsTw = null; |
|
|
|
@ -309,7 +372,14 @@ function DatasourceSubscription(datasourceSubscription, telemetryWebsocketServic |
|
|
|
listener.setRealtimeSubscription(newSubsTw); |
|
|
|
} |
|
|
|
} |
|
|
|
updateRealtimeSubscriptionCommand(this.subscriptionCommand, newSubsTw); |
|
|
|
this.subsTw = newSubsTw; |
|
|
|
this.firstStateData = null; |
|
|
|
this.data = null; |
|
|
|
this.stateDataReceived = false; |
|
|
|
updateRealtimeSubscriptionCommand(this.subscriptionCommands[0], this.subsTw); |
|
|
|
if (this.subsTw.aggregation.stateData) { |
|
|
|
updateFirstStateHistoryCommand(this.firstStateSubscriptionCommand, this.subsTw.startTs); |
|
|
|
} |
|
|
|
dataAggregator.reset(newSubsTw.startTs, newSubsTw.aggregation.timeWindow, newSubsTw.aggregation.interval); |
|
|
|
} |
|
|
|
} else { |
|
|
|
@ -322,21 +392,21 @@ function DatasourceSubscription(datasourceSubscription, telemetryWebsocketServic |
|
|
|
} |
|
|
|
|
|
|
|
telemetryWebsocketService.subscribe(subscriber); |
|
|
|
subscribers[subscriber.subscriptionCommand.cmdId] = subscriber; |
|
|
|
subscribers.push(subscriber); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if (attrKeys.length > 0) { |
|
|
|
|
|
|
|
subscriptionCommand = { |
|
|
|
var attrsSubscriptionCommand = { |
|
|
|
entityType: datasourceSubscription.entityType, |
|
|
|
entityId: datasourceSubscription.entityId, |
|
|
|
keys: attrKeys |
|
|
|
}; |
|
|
|
|
|
|
|
subscriber = { |
|
|
|
subscriptionCommand: subscriptionCommand, |
|
|
|
subscriptionCommands: [attrsSubscriptionCommand], |
|
|
|
type: types.dataKeyType.attribute, |
|
|
|
onData: function (data) { |
|
|
|
if (data.data) { |
|
|
|
@ -347,7 +417,7 @@ function DatasourceSubscription(datasourceSubscription, telemetryWebsocketServic |
|
|
|
}; |
|
|
|
|
|
|
|
telemetryWebsocketService.subscribe(subscriber); |
|
|
|
subscribers[subscriber.cmdId] = subscriber; |
|
|
|
subscribers.push(subscriber); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
@ -377,6 +447,49 @@ function DatasourceSubscription(datasourceSubscription, telemetryWebsocketServic |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
function createFirstStateHistoryCommand(startTs, tsKeys) { |
|
|
|
return { |
|
|
|
entityType: datasourceSubscription.entityType, |
|
|
|
entityId: datasourceSubscription.entityId, |
|
|
|
keys: tsKeys, |
|
|
|
startTs: startTs - YEAR, |
|
|
|
endTs: startTs, |
|
|
|
interval: 1000, |
|
|
|
limit: 1, |
|
|
|
agg: types.aggregation.none.value |
|
|
|
}; |
|
|
|
} |
|
|
|
|
|
|
|
function updateFirstStateHistoryCommand(stateHistoryCommand, startTs) { |
|
|
|
stateHistoryCommand.startTs = startTs - YEAR; |
|
|
|
stateHistoryCommand.endTs = startTs; |
|
|
|
} |
|
|
|
|
|
|
|
function onStateHistoryData(firstStateData, data, limit, startTs, endTs, onData) { |
|
|
|
for (var key in data.data) { |
|
|
|
var keyData = data.data[key]; |
|
|
|
data.data[key] = $filter('orderBy')(keyData, '+this[0]'); |
|
|
|
keyData = data.data[key]; |
|
|
|
if (keyData.length < limit) { |
|
|
|
var firstStateKeyData = firstStateData.data[key]; |
|
|
|
if (firstStateKeyData.length) { |
|
|
|
var firstStateDataTsKv = firstStateKeyData[0]; |
|
|
|
firstStateDataTsKv[0] = startTs; |
|
|
|
firstStateKeyData = [ |
|
|
|
[ startTs, firstStateKeyData[0][1] ] |
|
|
|
]; |
|
|
|
keyData.unshift(firstStateDataTsKv); |
|
|
|
} |
|
|
|
} |
|
|
|
if (keyData.length) { |
|
|
|
var lastTsKv = angular.copy(keyData[keyData.length-1]); |
|
|
|
lastTsKv[0] = endTs; |
|
|
|
keyData.push(lastTsKv); |
|
|
|
} |
|
|
|
} |
|
|
|
onData(data); |
|
|
|
} |
|
|
|
|
|
|
|
function createRealtimeDataAggregator(subsTw, tsKeyNames, dataKeyType) { |
|
|
|
return new DataAggregator( |
|
|
|
function(data, apply) { |
|
|
|
@ -388,6 +501,7 @@ function DatasourceSubscription(datasourceSubscription, telemetryWebsocketServic |
|
|
|
subsTw.aggregation.type, |
|
|
|
subsTw.aggregation.timeWindow, |
|
|
|
subsTw.aggregation.interval, |
|
|
|
subsTw.aggregation.stateData, |
|
|
|
types, |
|
|
|
$timeout, |
|
|
|
$filter |
|
|
|
@ -408,14 +522,14 @@ function DatasourceSubscription(datasourceSubscription, telemetryWebsocketServic |
|
|
|
timer = null; |
|
|
|
} |
|
|
|
if (datasourceType === types.datasourceType.entity) { |
|
|
|
for (var cmdId in subscribers) { |
|
|
|
var subscriber = subscribers[cmdId]; |
|
|
|
for (var i=0;i<subscribers.length;i++) { |
|
|
|
var subscriber = subscribers[i]; |
|
|
|
telemetryWebsocketService.unsubscribe(subscriber); |
|
|
|
if (subscriber.onDestroy) { |
|
|
|
subscriber.onDestroy(); |
|
|
|
} |
|
|
|
} |
|
|
|
subscribers = {}; |
|
|
|
subscribers.length = 0; |
|
|
|
} |
|
|
|
if (dataAggregator) { |
|
|
|
dataAggregator.destroy(); |
|
|
|
|