|
|
|
@ -49,7 +49,8 @@ import Timeout = NodeJS.Timeout; |
|
|
|
|
|
|
|
declare type DataKeyFunction = (time: number, prevValue: any) => any; |
|
|
|
declare type DataKeyPostFunction = (time: number, value: any, prevValue: any, timePrev: number, prevOrigValue: any) => any; |
|
|
|
declare type DataUpdatedCb = (data: DataSetHolder, dataIndex: number, dataKeyIndex: number, detectChanges: boolean) => void; |
|
|
|
declare type DataUpdatedCb = (data: DataSetHolder, dataIndex: number, |
|
|
|
dataKeyIndex: number, detectChanges: boolean, isLatest: boolean) => void; |
|
|
|
|
|
|
|
export interface SubscriptionDataKey { |
|
|
|
name: string; |
|
|
|
@ -59,8 +60,10 @@ export interface SubscriptionDataKey { |
|
|
|
postFuncBody: string; |
|
|
|
postFunc?: DataKeyPostFunction; |
|
|
|
index?: number; |
|
|
|
listIndex?: number; |
|
|
|
key?: string; |
|
|
|
lastUpdateTime?: number; |
|
|
|
latest?: boolean; |
|
|
|
} |
|
|
|
|
|
|
|
export interface EntityDataSubscriptionOptions { |
|
|
|
@ -104,9 +107,11 @@ export class EntityDataSubscription { |
|
|
|
private entityIdToDataIndex: {[id: string]: number}; |
|
|
|
|
|
|
|
private frequency: number; |
|
|
|
private latestFrequency: number; |
|
|
|
private tickScheduledTime = 0; |
|
|
|
private tickElapsed = 0; |
|
|
|
private timer: Timeout; |
|
|
|
private timeseriesTimer: Timeout; |
|
|
|
private latestTimer: Timeout; |
|
|
|
|
|
|
|
private dataResolved = false; |
|
|
|
private started = false; |
|
|
|
@ -135,9 +140,9 @@ export class EntityDataSubscription { |
|
|
|
if (this.datasourceType === DatasourceType.entity || this.datasourceType === DatasourceType.entityCount || |
|
|
|
this.entityDataSubscriptionOptions.type === widgetType.timeseries) { |
|
|
|
if (this.datasourceType === DatasourceType.function) { |
|
|
|
key = `${dataKey.name}_${dataKey.index}_${dataKey.type}`; |
|
|
|
key = `${dataKey.name}_${dataKey.index}_${dataKey.type}${dataKey.latest ? '_latest' : ''}`; |
|
|
|
} else { |
|
|
|
key = `${dataKey.name}_${dataKey.type}`; |
|
|
|
key = `${dataKey.name}_${dataKey.type}${dataKey.latest ? '_latest' : ''}`; |
|
|
|
} |
|
|
|
let dataKeysList = this.dataKeys[key] as Array<SubscriptionDataKey>; |
|
|
|
if (!dataKeysList) { |
|
|
|
@ -145,6 +150,7 @@ export class EntityDataSubscription { |
|
|
|
this.dataKeys[key] = dataKeysList; |
|
|
|
} |
|
|
|
dataKeysList.push(dataKey); |
|
|
|
dataKey.listIndex = dataKeysList.length - 1; |
|
|
|
} else { |
|
|
|
key = String(objectHashCode(dataKey)); |
|
|
|
this.dataKeys[key] = dataKey; |
|
|
|
@ -154,9 +160,13 @@ export class EntityDataSubscription { |
|
|
|
} |
|
|
|
|
|
|
|
public unsubscribe() { |
|
|
|
if (this.timer) { |
|
|
|
clearTimeout(this.timer); |
|
|
|
this.timer = null; |
|
|
|
if (this.timeseriesTimer) { |
|
|
|
clearTimeout(this.timeseriesTimer); |
|
|
|
this.timeseriesTimer = null; |
|
|
|
} |
|
|
|
if (this.latestTimer) { |
|
|
|
clearTimeout(this.latestTimer); |
|
|
|
this.latestTimer = null; |
|
|
|
} |
|
|
|
if (this.datasourceType === DatasourceType.entity || this.datasourceType === DatasourceType.entityCount) { |
|
|
|
if (this.subscriber) { |
|
|
|
@ -213,11 +223,20 @@ export class EntityDataSubscription { |
|
|
|
dataKey => ({ type: EntityKeyType.ATTRIBUTE, key: dataKey.name }) |
|
|
|
); |
|
|
|
|
|
|
|
this.tsFields = this.entityDataSubscriptionOptions.dataKeys.filter(dataKey => dataKey.type === DataKeyType.timeseries).map( |
|
|
|
dataKey => ({ type: EntityKeyType.TIME_SERIES, key: dataKey.name }) |
|
|
|
this.tsFields = this.entityDataSubscriptionOptions.dataKeys. |
|
|
|
filter(dataKey => dataKey.type === DataKeyType.timeseries && !dataKey.latest).map( |
|
|
|
dataKey => ({ type: EntityKeyType.TIME_SERIES, key: dataKey.name }) |
|
|
|
); |
|
|
|
|
|
|
|
this.latestValues = this.attrFields.concat(this.tsFields); |
|
|
|
if (this.entityDataSubscriptionOptions.type === widgetType.timeseries) { |
|
|
|
const latestTsFields = this.entityDataSubscriptionOptions.dataKeys. |
|
|
|
filter(dataKey => dataKey.type === DataKeyType.timeseries && dataKey.latest).map( |
|
|
|
dataKey => ({ type: EntityKeyType.TIME_SERIES, key: dataKey.name }) |
|
|
|
); |
|
|
|
this.latestValues = this.attrFields.concat(latestTsFields); |
|
|
|
} else { |
|
|
|
this.latestValues = this.attrFields.concat(this.tsFields); |
|
|
|
} |
|
|
|
|
|
|
|
this.subscriber = new TelemetrySubscriber(this.telemetryService); |
|
|
|
this.dataCommand = new EntityDataCmd(); |
|
|
|
@ -467,6 +486,11 @@ export class EntityDataSubscription { |
|
|
|
}; |
|
|
|
} |
|
|
|
} |
|
|
|
if (this.latestValues.length > 0) { |
|
|
|
cmd.latestCmd = { |
|
|
|
keys: this.latestValues |
|
|
|
}; |
|
|
|
} |
|
|
|
} else if (this.entityDataSubscriptionOptions.type === widgetType.latest) { |
|
|
|
if (this.latestValues.length > 0) { |
|
|
|
cmd.latestCmd = { |
|
|
|
@ -478,21 +502,22 @@ export class EntityDataSubscription { |
|
|
|
|
|
|
|
private startFunction() { |
|
|
|
this.frequency = 1000; |
|
|
|
this.latestFrequency = 1000; |
|
|
|
if (this.entityDataSubscriptionOptions.type === widgetType.timeseries) { |
|
|
|
this.frequency = Math.min(this.entityDataSubscriptionOptions.subscriptionTimewindow.aggregation.interval, 5000); |
|
|
|
} |
|
|
|
this.tickScheduledTime = this.utils.currentPerfTime(); |
|
|
|
if (this.history) { |
|
|
|
this.onTick(true); |
|
|
|
} else { |
|
|
|
this.timer = setTimeout(this.onTick.bind(this, true), 0); |
|
|
|
} |
|
|
|
this.generateData(true); |
|
|
|
} |
|
|
|
|
|
|
|
private prepareData() { |
|
|
|
if (this.timer) { |
|
|
|
clearTimeout(this.timer); |
|
|
|
this.timer = null; |
|
|
|
if (this.timeseriesTimer) { |
|
|
|
clearTimeout(this.timeseriesTimer); |
|
|
|
this.timeseriesTimer = null; |
|
|
|
} |
|
|
|
if (this.latestTimer) { |
|
|
|
clearTimeout(this.latestTimer); |
|
|
|
this.latestTimer = null; |
|
|
|
} |
|
|
|
|
|
|
|
if (this.dataAggregators) { |
|
|
|
@ -509,19 +534,23 @@ export class EntityDataSubscription { |
|
|
|
for (const key of Object.keys(this.dataKeys)) { |
|
|
|
const dataKeysList = this.dataKeys[key] as Array<SubscriptionDataKey>; |
|
|
|
dataKeysList.forEach((subscriptionDataKey) => { |
|
|
|
tsKeyNames.push(`${subscriptionDataKey.name}_${subscriptionDataKey.index}`); |
|
|
|
if (!subscriptionDataKey.latest) { |
|
|
|
tsKeyNames.push(`${subscriptionDataKey.name}_${subscriptionDataKey.index}`); |
|
|
|
} |
|
|
|
}); |
|
|
|
} |
|
|
|
} else { |
|
|
|
tsKeyNames = this.tsFields ? this.tsFields.map(field => field.key) : []; |
|
|
|
} |
|
|
|
for (let dataIndex = 0; dataIndex < this.pageData.data.length; dataIndex++) { |
|
|
|
if (this.datasourceType === DatasourceType.function) { |
|
|
|
this.dataAggregators[dataIndex] = this.createRealtimeDataAggregator(this.subsTw, tsKeyNames, |
|
|
|
DataKeyType.function, dataIndex, this.notifyListener.bind(this)); |
|
|
|
} else if (tsKeyNames.length) { |
|
|
|
this.dataAggregators[dataIndex] = this.createRealtimeDataAggregator(this.subsTw, tsKeyNames, |
|
|
|
DataKeyType.timeseries, dataIndex, this.notifyListener.bind(this)); |
|
|
|
if (tsKeyNames.length) { |
|
|
|
if (this.datasourceType === DatasourceType.function) { |
|
|
|
this.dataAggregators[dataIndex] = this.createRealtimeDataAggregator(this.subsTw, tsKeyNames, |
|
|
|
DataKeyType.function, dataIndex, this.notifyListener.bind(this)); |
|
|
|
} else { |
|
|
|
this.dataAggregators[dataIndex] = this.createRealtimeDataAggregator(this.subsTw, tsKeyNames, |
|
|
|
DataKeyType.timeseries, dataIndex, this.notifyListener.bind(this)); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@ -540,7 +569,8 @@ export class EntityDataSubscription { |
|
|
|
this.entityDataSubscriptionOptions.type === widgetType.timeseries) { |
|
|
|
const dataKeysList = dataKey as Array<SubscriptionDataKey>; |
|
|
|
for (let index = 0; index < dataKeysList.length; index++) { |
|
|
|
this.datasourceData[dataIndex][key + '_' + index] = { |
|
|
|
const datasourceKey = `${key}_${index}`; |
|
|
|
this.datasourceData[dataIndex][datasourceKey] = { |
|
|
|
data: [] |
|
|
|
}; |
|
|
|
} |
|
|
|
@ -637,19 +667,21 @@ export class EntityDataSubscription { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private notifyListener(data: DataSetHolder, dataIndex: number, dataKeyIndex: number, detectChanges: boolean) { |
|
|
|
private notifyListener(data: DataSetHolder, dataIndex: number, dataKeyIndex: number, detectChanges: boolean, isLatest: boolean) { |
|
|
|
this.listener.dataUpdated(data, |
|
|
|
this.listener.configDatasourceIndex, |
|
|
|
dataIndex, dataKeyIndex, detectChanges); |
|
|
|
dataIndex, dataKeyIndex, detectChanges, isLatest); |
|
|
|
} |
|
|
|
|
|
|
|
private processEntityData(entityData: EntityData, dataIndex: number, isUpdate: boolean, |
|
|
|
dataUpdatedCb: DataUpdatedCb) { |
|
|
|
if (this.entityDataSubscriptionOptions.type === widgetType.latest && entityData.latest) { |
|
|
|
if ((this.entityDataSubscriptionOptions.type === widgetType.latest || |
|
|
|
this.entityDataSubscriptionOptions.type === widgetType.timeseries) && entityData.latest) { |
|
|
|
for (const type of Object.keys(entityData.latest)) { |
|
|
|
const subscriptionData = this.toSubscriptionData(entityData.latest[type], false); |
|
|
|
const dataKeyType = entityKeyTypeToDataKeyType(EntityKeyType[type]); |
|
|
|
this.onData(subscriptionData, dataKeyType, dataIndex, true, dataUpdatedCb); |
|
|
|
this.onData(subscriptionData, dataKeyType, dataIndex, true, |
|
|
|
this.entityDataSubscriptionOptions.type === widgetType.timeseries, dataUpdatedCb); |
|
|
|
} |
|
|
|
} |
|
|
|
if (this.entityDataSubscriptionOptions.type === widgetType.timeseries && entityData.timeseries) { |
|
|
|
@ -660,7 +692,7 @@ export class EntityDataSubscription { |
|
|
|
if (!isUpdate) { |
|
|
|
prevDataCb = dataAggregator.updateOnDataCb((data, detectChanges) => { |
|
|
|
this.onData(data, this.datasourceType === DatasourceType.function ? |
|
|
|
DataKeyType.function : DataKeyType.timeseries, dataIndex, detectChanges, dataUpdatedCb); |
|
|
|
DataKeyType.function : DataKeyType.timeseries, dataIndex, detectChanges, false, dataUpdatedCb); |
|
|
|
}); |
|
|
|
} |
|
|
|
dataAggregator.onData({data: subscriptionData}, false, this.history, true); |
|
|
|
@ -668,16 +700,16 @@ export class EntityDataSubscription { |
|
|
|
dataAggregator.updateOnDataCb(prevDataCb); |
|
|
|
} |
|
|
|
} else if (!this.history && !isUpdate) { |
|
|
|
this.onData(subscriptionData, DataKeyType.timeseries, dataIndex, true, dataUpdatedCb); |
|
|
|
this.onData(subscriptionData, DataKeyType.timeseries, dataIndex, true, false, dataUpdatedCb); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private onData(sourceData: SubscriptionData, type: DataKeyType, dataIndex: number, detectChanges: boolean, |
|
|
|
dataUpdatedCb: DataUpdatedCb) { |
|
|
|
isTsLatest: boolean, dataUpdatedCb: DataUpdatedCb) { |
|
|
|
for (const keyName of Object.keys(sourceData)) { |
|
|
|
const keyData = sourceData[keyName]; |
|
|
|
const key = `${keyName}_${type}`; |
|
|
|
const key = `${keyName}_${type}${isTsLatest ? '_latest' : ''}`; |
|
|
|
const dataKeyList = this.dataKeys[key] as Array<SubscriptionDataKey>; |
|
|
|
for (let keyIndex = 0; dataKeyList && keyIndex < dataKeyList.length; keyIndex++) { |
|
|
|
const datasourceKey = `${key}_${keyIndex}`; |
|
|
|
@ -689,7 +721,7 @@ export class EntityDataSubscription { |
|
|
|
let datasourceKeyData: DataSet; |
|
|
|
let datasourceOrigKeyData: DataSet; |
|
|
|
let update = false; |
|
|
|
if (this.realtime) { |
|
|
|
if (this.realtime && !isTsLatest) { |
|
|
|
datasourceKeyData = []; |
|
|
|
datasourceOrigKeyData = []; |
|
|
|
} else { |
|
|
|
@ -704,7 +736,7 @@ export class EntityDataSubscription { |
|
|
|
prevOrigSeries = [0, 0]; |
|
|
|
} |
|
|
|
this.datasourceOrigData[dataIndex][datasourceKey].data = []; |
|
|
|
if (this.entityDataSubscriptionOptions.type === widgetType.timeseries) { |
|
|
|
if (this.entityDataSubscriptionOptions.type === widgetType.timeseries && !isTsLatest) { |
|
|
|
keyData.forEach((keySeries) => { |
|
|
|
let series = keySeries; |
|
|
|
const time = series[0]; |
|
|
|
@ -719,7 +751,7 @@ export class EntityDataSubscription { |
|
|
|
prevSeries = series; |
|
|
|
}); |
|
|
|
update = true; |
|
|
|
} else if (this.entityDataSubscriptionOptions.type === widgetType.latest) { |
|
|
|
} else if (this.entityDataSubscriptionOptions.type === widgetType.latest || isTsLatest) { |
|
|
|
if (keyData.length > 0) { |
|
|
|
let series = keyData[0]; |
|
|
|
const time = series[0]; |
|
|
|
@ -735,7 +767,7 @@ export class EntityDataSubscription { |
|
|
|
} |
|
|
|
if (update) { |
|
|
|
this.datasourceData[dataIndex][datasourceKey].data = data; |
|
|
|
dataUpdatedCb(this.datasourceData[dataIndex][datasourceKey], dataIndex, dataKey.index, detectChanges); |
|
|
|
dataUpdatedCb(this.datasourceData[dataIndex][datasourceKey], dataIndex, dataKey.index, detectChanges, isTsLatest); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@ -774,7 +806,7 @@ export class EntityDataSubscription { |
|
|
|
dataUpdatedCb: DataUpdatedCb): DataAggregator { |
|
|
|
return new DataAggregator( |
|
|
|
(data, detectChanges) => { |
|
|
|
this.onData(data, dataKeyType, dataIndex, detectChanges, dataUpdatedCb); |
|
|
|
this.onData(data, dataKeyType, dataIndex, detectChanges, false, dataUpdatedCb); |
|
|
|
}, |
|
|
|
tsKeyNames, |
|
|
|
subsTw, |
|
|
|
@ -783,17 +815,17 @@ export class EntityDataSubscription { |
|
|
|
); |
|
|
|
} |
|
|
|
|
|
|
|
private generateSeries(dataKey: SubscriptionDataKey, index: number, startTime: number, endTime: number): [number, any][] { |
|
|
|
private generateSeries(dataKey: SubscriptionDataKey, startTime: number, endTime: number): [number, any][] { |
|
|
|
const data: [number, any][] = []; |
|
|
|
let prevSeries: [number, any]; |
|
|
|
const datasourceDataKey = `${dataKey.key}_${index}`; |
|
|
|
const datasourceDataKey = `${dataKey.key}_${dataKey.listIndex}`; |
|
|
|
const datasourceKeyData = this.datasourceData[0][datasourceDataKey].data; |
|
|
|
if (datasourceKeyData.length > 0) { |
|
|
|
prevSeries = datasourceKeyData[datasourceKeyData.length - 1]; |
|
|
|
} else { |
|
|
|
prevSeries = [0, 0]; |
|
|
|
} |
|
|
|
for (let time = startTime; time <= endTime && (this.timer || this.history); time += this.frequency) { |
|
|
|
for (let time = startTime; time <= endTime && (this.timeseriesTimer || this.history); time += this.frequency) { |
|
|
|
const value = dataKey.func(time, prevSeries[1]); |
|
|
|
const series: [number, any] = [time, value]; |
|
|
|
data.push(series); |
|
|
|
@ -807,7 +839,8 @@ export class EntityDataSubscription { |
|
|
|
|
|
|
|
private generateLatest(dataKey: SubscriptionDataKey, detectChanges: boolean) { |
|
|
|
let prevSeries: [number, any]; |
|
|
|
const datasourceKeyData = this.datasourceData[0][dataKey.key].data; |
|
|
|
const datasourceKey = dataKey.latest ? `${dataKey.key}_${dataKey.listIndex}` : dataKey.key; |
|
|
|
const datasourceKeyData = this.datasourceData[0][datasourceKey].data; |
|
|
|
if (datasourceKeyData.length > 0) { |
|
|
|
prevSeries = datasourceKeyData[datasourceKeyData.length - 1]; |
|
|
|
} else { |
|
|
|
@ -816,78 +849,100 @@ export class EntityDataSubscription { |
|
|
|
const time = Date.now() + this.latestTsOffset; |
|
|
|
const value = dataKey.func(time, prevSeries[1]); |
|
|
|
const series: [number, any] = [time, value]; |
|
|
|
this.datasourceData[0][dataKey.key].data = [series]; |
|
|
|
this.listener.dataUpdated(this.datasourceData[0][dataKey.key], |
|
|
|
this.datasourceData[0][datasourceKey].data = [series]; |
|
|
|
this.listener.dataUpdated(this.datasourceData[0][datasourceKey], |
|
|
|
this.listener.configDatasourceIndex, |
|
|
|
0, |
|
|
|
dataKey.index, detectChanges); |
|
|
|
dataKey.index, detectChanges, dataKey.latest); |
|
|
|
} |
|
|
|
|
|
|
|
private onTick(detectChanges: boolean) { |
|
|
|
const now = this.utils.currentPerfTime(); |
|
|
|
this.tickElapsed += now - this.tickScheduledTime; |
|
|
|
this.tickScheduledTime = now; |
|
|
|
|
|
|
|
if (this.timer) { |
|
|
|
clearTimeout(this.timer); |
|
|
|
} |
|
|
|
private generateData(detectChanges: boolean) { |
|
|
|
let key: string; |
|
|
|
let tsDataKeys: SubscriptionDataKey[] = []; |
|
|
|
let latestDataKeys: SubscriptionDataKey[] = []; |
|
|
|
if (this.entityDataSubscriptionOptions.type === widgetType.timeseries) { |
|
|
|
let startTime: number; |
|
|
|
let endTime: number; |
|
|
|
let delta: number; |
|
|
|
const generatedData: SubscriptionDataHolder = { |
|
|
|
data: {} |
|
|
|
}; |
|
|
|
if (!this.history) { |
|
|
|
delta = Math.floor(this.tickElapsed / this.frequency); |
|
|
|
} |
|
|
|
const deltaElapsed = this.history ? this.frequency : delta * this.frequency; |
|
|
|
this.tickElapsed = this.tickElapsed - deltaElapsed; |
|
|
|
for (key of Object.keys(this.dataKeys)) { |
|
|
|
const dataKeyList = this.dataKeys[key] as Array<SubscriptionDataKey>; |
|
|
|
for (let index = 0; index < dataKeyList.length && (this.timer || this.history); index ++) { |
|
|
|
const dataKey = dataKeyList[index]; |
|
|
|
if (!startTime) { |
|
|
|
if (this.realtime) { |
|
|
|
if (dataKey.lastUpdateTime) { |
|
|
|
startTime = dataKey.lastUpdateTime + this.frequency; |
|
|
|
endTime = dataKey.lastUpdateTime + deltaElapsed; |
|
|
|
} else { |
|
|
|
startTime = this.entityDataSubscriptionOptions.subscriptionTimewindow.startTs + |
|
|
|
this.entityDataSubscriptionOptions.subscriptionTimewindow.tsOffset; |
|
|
|
endTime = startTime + this.entityDataSubscriptionOptions.subscriptionTimewindow.realtimeWindowMs + this.frequency; |
|
|
|
if (this.entityDataSubscriptionOptions.subscriptionTimewindow.aggregation.type === AggregationType.NONE) { |
|
|
|
const time = endTime - this.frequency * this.entityDataSubscriptionOptions.subscriptionTimewindow.aggregation.limit; |
|
|
|
startTime = Math.max(time, startTime); |
|
|
|
} |
|
|
|
} |
|
|
|
} else { |
|
|
|
startTime = this.entityDataSubscriptionOptions.subscriptionTimewindow.fixedWindow.startTimeMs + |
|
|
|
this.entityDataSubscriptionOptions.subscriptionTimewindow.tsOffset; |
|
|
|
endTime = this.entityDataSubscriptionOptions.subscriptionTimewindow.fixedWindow.endTimeMs + |
|
|
|
this.entityDataSubscriptionOptions.subscriptionTimewindow.tsOffset; |
|
|
|
} |
|
|
|
if (this.entityDataSubscriptionOptions.subscriptionTimewindow.quickInterval) { |
|
|
|
const currentTime = getCurrentTime().valueOf() + this.entityDataSubscriptionOptions.subscriptionTimewindow.tsOffset; |
|
|
|
endTime = Math.min(currentTime, endTime); |
|
|
|
} |
|
|
|
} |
|
|
|
generatedData.data[`${dataKey.name}_${dataKey.index}`] = this.generateSeries(dataKey, index, startTime, endTime); |
|
|
|
} |
|
|
|
} |
|
|
|
if (this.dataAggregators && this.dataAggregators.length) { |
|
|
|
this.dataAggregators[0].onData(generatedData, true, this.history, detectChanges); |
|
|
|
tsDataKeys = tsDataKeys.concat(dataKeyList.filter(dataKey => !dataKey.latest)); |
|
|
|
latestDataKeys = latestDataKeys.concat(dataKeyList.filter(dataKey => dataKey.latest)); |
|
|
|
} |
|
|
|
} else if (this.entityDataSubscriptionOptions.type === widgetType.latest) { |
|
|
|
for (key of Object.keys(this.dataKeys)) { |
|
|
|
this.generateLatest(this.dataKeys[key] as SubscriptionDataKey, detectChanges); |
|
|
|
latestDataKeys.push(this.dataKeys[key] as SubscriptionDataKey); |
|
|
|
} |
|
|
|
} |
|
|
|
if (tsDataKeys.length) { |
|
|
|
this.timeseriesTimer = setTimeout(this.onTimeseriesTick.bind(this, tsDataKeys, true), 0); |
|
|
|
} |
|
|
|
if (latestDataKeys.length) { |
|
|
|
this.onLatestTick(latestDataKeys, detectChanges); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private onTimeseriesTick(tsDataKeys: SubscriptionDataKey[], detectChanges: boolean) { |
|
|
|
const now = this.utils.currentPerfTime(); |
|
|
|
this.tickElapsed += now - this.tickScheduledTime; |
|
|
|
this.tickScheduledTime = now; |
|
|
|
if (this.timeseriesTimer) { |
|
|
|
clearTimeout(this.timeseriesTimer); |
|
|
|
} |
|
|
|
let startTime: number; |
|
|
|
let endTime: number; |
|
|
|
let delta: number; |
|
|
|
const generatedData: SubscriptionDataHolder = { |
|
|
|
data: {} |
|
|
|
}; |
|
|
|
if (!this.history) { |
|
|
|
delta = Math.floor(this.tickElapsed / this.frequency); |
|
|
|
} |
|
|
|
const deltaElapsed = this.history ? this.frequency : delta * this.frequency; |
|
|
|
this.tickElapsed = this.tickElapsed - deltaElapsed; |
|
|
|
for (let index = 0; index < tsDataKeys.length && (this.timeseriesTimer || this.history); index ++) { |
|
|
|
const dataKey = tsDataKeys[index]; |
|
|
|
if (!startTime) { |
|
|
|
if (this.realtime) { |
|
|
|
if (dataKey.lastUpdateTime) { |
|
|
|
startTime = dataKey.lastUpdateTime + this.frequency; |
|
|
|
endTime = dataKey.lastUpdateTime + deltaElapsed; |
|
|
|
} else { |
|
|
|
startTime = this.entityDataSubscriptionOptions.subscriptionTimewindow.startTs + |
|
|
|
this.entityDataSubscriptionOptions.subscriptionTimewindow.tsOffset; |
|
|
|
endTime = startTime + this.entityDataSubscriptionOptions.subscriptionTimewindow.realtimeWindowMs + this.frequency; |
|
|
|
if (this.entityDataSubscriptionOptions.subscriptionTimewindow.aggregation.type === AggregationType.NONE) { |
|
|
|
const time = endTime - this.frequency * this.entityDataSubscriptionOptions.subscriptionTimewindow.aggregation.limit; |
|
|
|
startTime = Math.max(time, startTime); |
|
|
|
} |
|
|
|
} |
|
|
|
} else { |
|
|
|
startTime = this.entityDataSubscriptionOptions.subscriptionTimewindow.fixedWindow.startTimeMs + |
|
|
|
this.entityDataSubscriptionOptions.subscriptionTimewindow.tsOffset; |
|
|
|
endTime = this.entityDataSubscriptionOptions.subscriptionTimewindow.fixedWindow.endTimeMs + |
|
|
|
this.entityDataSubscriptionOptions.subscriptionTimewindow.tsOffset; |
|
|
|
} |
|
|
|
if (this.entityDataSubscriptionOptions.subscriptionTimewindow.quickInterval) { |
|
|
|
const currentTime = getCurrentTime().valueOf() + this.entityDataSubscriptionOptions.subscriptionTimewindow.tsOffset; |
|
|
|
endTime = Math.min(currentTime, endTime); |
|
|
|
} |
|
|
|
} |
|
|
|
generatedData.data[`${dataKey.name}_${dataKey.index}`] = this.generateSeries(dataKey, startTime, endTime); |
|
|
|
} |
|
|
|
if (this.dataAggregators && this.dataAggregators.length) { |
|
|
|
this.dataAggregators[0].onData(generatedData, true, this.history, detectChanges); |
|
|
|
} |
|
|
|
|
|
|
|
if (!this.history) { |
|
|
|
this.timer = setTimeout(this.onTick.bind(this, true), this.frequency); |
|
|
|
this.timeseriesTimer = setTimeout(this.onTimeseriesTick.bind(this, tsDataKeys, true), this.frequency); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private onLatestTick(latestDataKeys: SubscriptionDataKey[], detectChanges: boolean) { |
|
|
|
if (this.latestTimer) { |
|
|
|
clearTimeout(this.latestTimer); |
|
|
|
} |
|
|
|
latestDataKeys.forEach(dataKey => { |
|
|
|
this.generateLatest(dataKey, detectChanges); |
|
|
|
}); |
|
|
|
this.latestTimer = setTimeout(this.onLatestTick.bind(this, latestDataKeys, true), this.latestFrequency); |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|