|
|
|
@ -29,11 +29,11 @@ import { |
|
|
|
} from '@shared/models/query/query.models'; |
|
|
|
import { |
|
|
|
AggKey, |
|
|
|
AggSubscriptionData, |
|
|
|
DataKeyType, |
|
|
|
EntityCountCmd, |
|
|
|
EntityDataCmd, |
|
|
|
SubscriptionData, |
|
|
|
SubscriptionDataHolder, |
|
|
|
TelemetryService, |
|
|
|
TelemetrySubscriber |
|
|
|
} from '@shared/models/telemetry/telemetry.models'; |
|
|
|
@ -46,8 +46,8 @@ import { NULL_UUID } from '@shared/models/id/has-uuid'; |
|
|
|
import { EntityType } from '@shared/models/entity-type.models'; |
|
|
|
import { Observable, of, ReplaySubject, Subject } from 'rxjs'; |
|
|
|
import { EntityId } from '@shared/models/id/entity-id'; |
|
|
|
import Timeout = NodeJS.Timeout; |
|
|
|
import _ from 'lodash'; |
|
|
|
import Timeout = NodeJS.Timeout; |
|
|
|
|
|
|
|
declare type DataKeyFunction = (time: number, prevValue: any) => any; |
|
|
|
declare type DataKeyPostFunction = (time: number, value: any, prevValue: any, timePrev: number, prevOrigValue: any) => any; |
|
|
|
@ -94,6 +94,7 @@ export class EntityDataSubscription { |
|
|
|
private entityDataSubscriptionOptions = this.listener.subscriptionOptions; |
|
|
|
private datasourceType: DatasourceType = this.entityDataSubscriptionOptions.datasourceType; |
|
|
|
private history: boolean; |
|
|
|
private isFloatingTimewindow: boolean; |
|
|
|
private realtime: boolean; |
|
|
|
|
|
|
|
private subscriber: TelemetrySubscriber; |
|
|
|
@ -108,6 +109,7 @@ export class EntityDataSubscription { |
|
|
|
|
|
|
|
private entityDataResolveSubject: Subject<EntityDataLoadResult>; |
|
|
|
private pageData: PageData<EntityData>; |
|
|
|
private data: Array<Array<DataSetHolder>>; |
|
|
|
private subsTw: SubscriptionTimewindow; |
|
|
|
private latestTsOffset: number; |
|
|
|
private dataAggregators: Array<DataAggregator>; |
|
|
|
@ -304,19 +306,28 @@ export class EntityDataSubscription { |
|
|
|
this.subscriber.reconnect$.subscribe(() => { |
|
|
|
if (this.started) { |
|
|
|
const targetCommand = this.entityDataSubscriptionOptions.isPaginatedDataSubscription ? this.dataCommand : this.subsCommand; |
|
|
|
if (this.entityDataSubscriptionOptions.type === widgetType.timeseries && |
|
|
|
!this.history && this.tsFields.length) { |
|
|
|
if (!this.history && (this.entityDataSubscriptionOptions.type === widgetType.timeseries && this.tsFields.length || |
|
|
|
this.aggTsValues.length > 0 && !this.isFloatingTimewindow)) { |
|
|
|
const newSubsTw = this.listener.updateRealtimeSubscription(); |
|
|
|
this.subsTw = newSubsTw; |
|
|
|
targetCommand.tsCmd.startTs = this.subsTw.startTs; |
|
|
|
targetCommand.tsCmd.timeWindow = this.subsTw.aggregation.timeWindow; |
|
|
|
targetCommand.tsCmd.interval = this.subsTw.aggregation.interval; |
|
|
|
targetCommand.tsCmd.limit = this.subsTw.aggregation.limit; |
|
|
|
targetCommand.tsCmd.agg = this.subsTw.aggregation.type; |
|
|
|
targetCommand.tsCmd.fetchLatestPreviousPoint = this.subsTw.aggregation.stateData; |
|
|
|
this.dataAggregators.forEach((dataAggregator) => { |
|
|
|
dataAggregator.reset(newSubsTw); |
|
|
|
}); |
|
|
|
if (this.entityDataSubscriptionOptions.type === widgetType.timeseries && this.tsFields.length) { |
|
|
|
targetCommand.tsCmd.startTs = this.subsTw.startTs; |
|
|
|
targetCommand.tsCmd.timeWindow = this.subsTw.aggregation.timeWindow; |
|
|
|
targetCommand.tsCmd.interval = this.subsTw.aggregation.interval; |
|
|
|
targetCommand.tsCmd.limit = this.subsTw.aggregation.limit; |
|
|
|
targetCommand.tsCmd.agg = this.subsTw.aggregation.type; |
|
|
|
targetCommand.tsCmd.fetchLatestPreviousPoint = this.subsTw.aggregation.stateData; |
|
|
|
this.dataAggregators.forEach((dataAggregator) => { |
|
|
|
dataAggregator.reset(newSubsTw); |
|
|
|
}); |
|
|
|
} |
|
|
|
if (this.aggTsValues.length > 0 && !this.isFloatingTimewindow) { |
|
|
|
targetCommand.aggTsCmd.startTs = this.subsTw.startTs; |
|
|
|
targetCommand.aggTsCmd.timeWindow = this.subsTw.aggregation.timeWindow; |
|
|
|
this.tsLatestDataAggregators.forEach((dataAggregator) => { |
|
|
|
dataAggregator.reset(newSubsTw); |
|
|
|
}); |
|
|
|
} |
|
|
|
} |
|
|
|
if (this.entityDataSubscriptionOptions.type === widgetType.timeseries) { |
|
|
|
this.subscriber.setTsOffset(this.subsTw.tsOffset); |
|
|
|
@ -485,9 +496,12 @@ export class EntityDataSubscription { |
|
|
|
isObject(this.entityDataSubscriptionOptions.subscriptionTimewindow.fixedWindow); |
|
|
|
this.realtime = this.entityDataSubscriptionOptions.subscriptionTimewindow && |
|
|
|
isDefinedAndNotNull(this.entityDataSubscriptionOptions.subscriptionTimewindow.realtimeWindowMs); |
|
|
|
this.isFloatingTimewindow = this.entityDataSubscriptionOptions.subscriptionTimewindow && |
|
|
|
!this.entityDataSubscriptionOptions.subscriptionTimewindow.quickInterval && !this.history; |
|
|
|
} |
|
|
|
|
|
|
|
private prepareSubscriptionCommands(cmd: EntityDataCmd) { |
|
|
|
let latestValuesKeys: EntityKey[] = []; |
|
|
|
if (this.entityDataSubscriptionOptions.type === widgetType.timeseries) { |
|
|
|
if (this.tsFields.length > 0) { |
|
|
|
if (this.history) { |
|
|
|
@ -512,17 +526,9 @@ export class EntityDataSubscription { |
|
|
|
}; |
|
|
|
} |
|
|
|
} |
|
|
|
if (this.latestValues.length > 0) { |
|
|
|
cmd.latestCmd = { |
|
|
|
keys: this.latestValues |
|
|
|
}; |
|
|
|
} |
|
|
|
latestValuesKeys = this.latestValues; |
|
|
|
} else if (this.entityDataSubscriptionOptions.type === widgetType.latest) { |
|
|
|
if (this.latestValues.length > 0) { |
|
|
|
cmd.latestCmd = { |
|
|
|
keys: this.latestValues |
|
|
|
}; |
|
|
|
} |
|
|
|
latestValuesKeys = this.latestValues; |
|
|
|
} |
|
|
|
if (this.aggTsValues.length > 0) { |
|
|
|
if (this.history) { |
|
|
|
@ -531,15 +537,24 @@ export class EntityDataSubscription { |
|
|
|
startTs: this.subsTw.fixedWindow.startTimeMs, |
|
|
|
endTs: this.subsTw.fixedWindow.endTimeMs |
|
|
|
}; |
|
|
|
} else { |
|
|
|
} else if (!this.isFloatingTimewindow) { |
|
|
|
cmd.aggTsCmd = { |
|
|
|
keys: this.aggTsValues, |
|
|
|
startTs: this.subsTw.startTs, |
|
|
|
timeWindow: this.subsTw.aggregation.timeWindow, |
|
|
|
floating: !this.subsTw.quickInterval |
|
|
|
timeWindow: this.subsTw.aggregation.timeWindow |
|
|
|
}; |
|
|
|
if (latestValuesKeys.length > 0) { |
|
|
|
const tsKeys = this.aggTsValues.map(key => key.key); |
|
|
|
latestValuesKeys = latestValuesKeys.filter(latestKey => latestKey.type !== EntityKeyType.TIME_SERIES |
|
|
|
|| !tsKeys.includes(latestKey.key)); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
if (latestValuesKeys.length > 0) { |
|
|
|
cmd.latestCmd = { |
|
|
|
keys: latestValuesKeys |
|
|
|
}; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private startFunction() { |
|
|
|
@ -595,30 +610,50 @@ export class EntityDataSubscription { |
|
|
|
for (let dataIndex = 0; dataIndex < this.pageData.data.length; dataIndex++) { |
|
|
|
if (this.datasourceType === DatasourceType.function) { |
|
|
|
this.dataAggregators[dataIndex] = this.createRealtimeDataAggregator(this.subsTw, aggKeys, |
|
|
|
false, false, DataKeyType.function, dataIndex, this.notifyListener.bind(this)); |
|
|
|
false, DataKeyType.function, dataIndex, this.notifyListener.bind(this)); |
|
|
|
} else { |
|
|
|
this.dataAggregators[dataIndex] = this.createRealtimeDataAggregator(this.subsTw, aggKeys, |
|
|
|
false, false, DataKeyType.timeseries, dataIndex, this.notifyListener.bind(this)); |
|
|
|
false, DataKeyType.timeseries, dataIndex, this.notifyListener.bind(this)); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
if (this.aggTsValues && this.aggTsValues.length) { |
|
|
|
const aggLatestTimewindow = deepClone(this.subsTw); |
|
|
|
aggLatestTimewindow.aggregation.stateData = false; |
|
|
|
const isFloatingLatestDataAgg = !aggLatestTimewindow.quickInterval && !this.history; |
|
|
|
if (!isFloatingLatestDataAgg) { |
|
|
|
if (!this.isFloatingTimewindow) { |
|
|
|
const aggLatestTimewindow = deepClone(this.subsTw); |
|
|
|
aggLatestTimewindow.aggregation.stateData = false; |
|
|
|
aggLatestTimewindow.aggregation.interval = aggLatestTimewindow.aggregation.timeWindow; |
|
|
|
} |
|
|
|
for (let dataIndex = 0; dataIndex < this.pageData.data.length; dataIndex++) { |
|
|
|
this.tsLatestDataAggregators[dataIndex] = this.createRealtimeDataAggregator(aggLatestTimewindow, this.aggTsValues, |
|
|
|
true, isFloatingLatestDataAgg, |
|
|
|
DataKeyType.timeseries, dataIndex, this.notifyListener.bind(this)); |
|
|
|
for (let dataIndex = 0; dataIndex < this.pageData.data.length; dataIndex++) { |
|
|
|
this.tsLatestDataAggregators[dataIndex] = this.createRealtimeDataAggregator(aggLatestTimewindow, this.aggTsValues, |
|
|
|
true, DataKeyType.timeseries, dataIndex, this.notifyListener.bind(this)); |
|
|
|
} |
|
|
|
} else { |
|
|
|
const tsKeysByAggType = _.groupBy(this.aggTsValues, value => value.agg); |
|
|
|
const aggSubscriptionData: AggSubscriptionData = {}; |
|
|
|
for (const aggTypeString of Object.keys(tsKeysByAggType)) { |
|
|
|
const tsKeys = tsKeysByAggType[aggTypeString]; |
|
|
|
const latestTsAggSubsciptionData: SubscriptionData = {}; |
|
|
|
for (const tsKey of tsKeys) { |
|
|
|
latestTsAggSubsciptionData[tsKey.key] = [[0, 'Not supported!']]; |
|
|
|
} |
|
|
|
aggSubscriptionData[aggTypeString] = latestTsAggSubsciptionData; |
|
|
|
} |
|
|
|
for (let dataIndex = 0; dataIndex < this.pageData.data.length; dataIndex++) { |
|
|
|
this.onAggData(aggSubscriptionData, DataKeyType.timeseries, dataIndex, true, |
|
|
|
this.entityDataSubscriptionOptions.type === widgetType.timeseries, true, |
|
|
|
(data1, dataIndex1, dataKeyIndex) => { |
|
|
|
if (!this.data[dataIndex1]) { |
|
|
|
this.data[dataIndex1] = []; |
|
|
|
} |
|
|
|
this.data[dataIndex1][dataKeyIndex] = data1; |
|
|
|
}); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private resetData() { |
|
|
|
this.data = []; |
|
|
|
this.datasourceData = []; |
|
|
|
this.entityIdToDataIndex = {}; |
|
|
|
for (let dataIndex = 0; dataIndex < this.pageData.data.length; dataIndex++) { |
|
|
|
@ -675,15 +710,14 @@ export class EntityDataSubscription { |
|
|
|
} else if (isInitialData) { |
|
|
|
this.resetData(); |
|
|
|
} |
|
|
|
const data: Array<Array<DataSetHolder>> = []; |
|
|
|
for (let dataIndex = 0; dataIndex < pageData.data.length; dataIndex++) { |
|
|
|
const entityData = pageData.data[dataIndex]; |
|
|
|
this.processEntityData(entityData, dataIndex, false, |
|
|
|
(data1, dataIndex1, dataKeyIndex) => { |
|
|
|
if (!data[dataIndex1]) { |
|
|
|
data[dataIndex1] = []; |
|
|
|
if (!this.data[dataIndex1]) { |
|
|
|
this.data[dataIndex1] = []; |
|
|
|
} |
|
|
|
data[dataIndex1][dataKeyIndex] = data1; |
|
|
|
this.data[dataIndex1][dataKeyIndex] = data1; |
|
|
|
} |
|
|
|
); |
|
|
|
} |
|
|
|
@ -692,7 +726,7 @@ export class EntityDataSubscription { |
|
|
|
this.entityDataResolveSubject.next( |
|
|
|
{ |
|
|
|
pageData, |
|
|
|
data, |
|
|
|
data: this.data, |
|
|
|
datasourceIndex: this.listener.configDatasourceIndex, |
|
|
|
pageLink: this.entityDataSubscriptionOptions.pageLink |
|
|
|
} |
|
|
|
@ -700,7 +734,7 @@ export class EntityDataSubscription { |
|
|
|
this.entityDataResolveSubject.complete(); |
|
|
|
} else { |
|
|
|
if (isInitialData || this.entityDataSubscriptionOptions.isPaginatedDataSubscription) { |
|
|
|
this.listener.dataLoaded(pageData, data, |
|
|
|
this.listener.dataLoaded(pageData, this.data, |
|
|
|
this.listener.configDatasourceIndex, this.entityDataSubscriptionOptions.pageLink); |
|
|
|
} |
|
|
|
if (this.entityDataSubscriptionOptions.isPaginatedDataSubscription && isInitialData) { |
|
|
|
@ -710,7 +744,7 @@ export class EntityDataSubscription { |
|
|
|
this.entityDataResolveSubject.next( |
|
|
|
{ |
|
|
|
pageData, |
|
|
|
data, |
|
|
|
data: this.data, |
|
|
|
datasourceIndex: this.listener.configDatasourceIndex, |
|
|
|
pageLink: this.entityDataSubscriptionOptions.pageLink |
|
|
|
} |
|
|
|
@ -740,46 +774,20 @@ export class EntityDataSubscription { |
|
|
|
if (this.entityDataSubscriptionOptions.type === widgetType.latest || |
|
|
|
this.entityDataSubscriptionOptions.type === widgetType.timeseries) { |
|
|
|
if (entityData.aggLatest) { |
|
|
|
for (const aggTypeString of Object.keys(entityData.aggLatest)) { |
|
|
|
const subscriptionData = this.toSubscriptionData(entityData.aggLatest[aggTypeString], false); |
|
|
|
if (this.tsLatestDataAggregators && this.tsLatestDataAggregators[dataIndex]) { |
|
|
|
const dataAggregator = this.tsLatestDataAggregators[dataIndex]; |
|
|
|
let prevDataCb; |
|
|
|
if (!isUpdate) { |
|
|
|
prevDataCb = dataAggregator.updateOnDataCb((aggType, data, detectChanges) => { |
|
|
|
this.onData(aggType, data, DataKeyType.timeseries, dataIndex, detectChanges, |
|
|
|
this.entityDataSubscriptionOptions.type === widgetType.timeseries, dataUpdatedCb); |
|
|
|
}); |
|
|
|
} |
|
|
|
dataAggregator.onData(AggregationType[aggTypeString], {data: subscriptionData}, false, this.history, true); |
|
|
|
if (prevDataCb) { |
|
|
|
dataAggregator.updateOnDataCb(prevDataCb); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
if (entityData.aggFloating) { |
|
|
|
const subscriptionData = this.toSubscriptionData(entityData.aggFloating, true); |
|
|
|
const keys: string[] = Object.keys(subscriptionData); |
|
|
|
const aggTsKeys = this.aggTsValues.filter(key => keys.includes(key.key)); |
|
|
|
if (aggTsKeys.length && this.tsLatestDataAggregators && this.tsLatestDataAggregators[dataIndex]) { |
|
|
|
if (this.tsLatestDataAggregators && this.tsLatestDataAggregators[dataIndex]) { |
|
|
|
const dataAggregator = this.tsLatestDataAggregators[dataIndex]; |
|
|
|
const aggSubscriptionData: AggSubscriptionData = {}; |
|
|
|
for (const aggTypeString of Object.keys(entityData.aggLatest)) { |
|
|
|
aggSubscriptionData[aggTypeString] = this.toSubscriptionData(entityData.aggLatest[aggTypeString], false); |
|
|
|
} |
|
|
|
let prevDataCb; |
|
|
|
if (!isUpdate) { |
|
|
|
prevDataCb = dataAggregator.updateOnDataCb((aggType, data, detectChanges) => { |
|
|
|
this.onData(aggType, data, DataKeyType.timeseries, dataIndex, detectChanges, |
|
|
|
this.entityDataSubscriptionOptions.type === widgetType.timeseries, dataUpdatedCb); |
|
|
|
prevDataCb = dataAggregator.updateOnDataCb((data, detectChanges) => { |
|
|
|
this.onAggData(data, DataKeyType.timeseries, dataIndex, detectChanges, |
|
|
|
this.entityDataSubscriptionOptions.type === widgetType.timeseries, true, dataUpdatedCb); |
|
|
|
}); |
|
|
|
} |
|
|
|
const tsKeysByAggType = _.groupBy(aggTsKeys, value => value.agg); |
|
|
|
for (const aggTypeString of Object.keys(tsKeysByAggType)) { |
|
|
|
const tsKeys = tsKeysByAggType[aggTypeString]; |
|
|
|
const latestTsAggSubsciptionData: SubscriptionData = {}; |
|
|
|
for (const tsKey of tsKeys) { |
|
|
|
latestTsAggSubsciptionData[tsKey.key] = subscriptionData[tsKey.key]; |
|
|
|
} |
|
|
|
dataAggregator.onData(AggregationType[aggTypeString], {data: latestTsAggSubsciptionData}, false, this.history, true); |
|
|
|
} |
|
|
|
dataAggregator.onData(aggSubscriptionData, false, this.history, true); |
|
|
|
if (prevDataCb) { |
|
|
|
dataAggregator.updateOnDataCb(prevDataCb); |
|
|
|
} |
|
|
|
@ -797,25 +805,27 @@ export class EntityDataSubscription { |
|
|
|
for (const latestTsKey of latestTsKeys) { |
|
|
|
latestTsSubsciptionData[latestTsKey.key] = subscriptionData[latestTsKey.key]; |
|
|
|
} |
|
|
|
this.onData(null, latestTsSubsciptionData, dataKeyType, dataIndex, true, |
|
|
|
this.entityDataSubscriptionOptions.type === widgetType.timeseries, dataUpdatedCb); |
|
|
|
this.onData(latestTsSubsciptionData, dataKeyType, dataIndex, true, |
|
|
|
this.entityDataSubscriptionOptions.type === widgetType.timeseries, false, dataUpdatedCb); |
|
|
|
} |
|
|
|
const aggTsKeys = this.aggTsValues.filter(key => keys.includes(key.key)); |
|
|
|
if (aggTsKeys.length && this.tsLatestDataAggregators && this.tsLatestDataAggregators[dataIndex]) { |
|
|
|
if (!this.history && aggTsKeys.length && this.tsLatestDataAggregators && this.tsLatestDataAggregators[dataIndex]) { |
|
|
|
const dataAggregator = this.tsLatestDataAggregators[dataIndex]; |
|
|
|
const tsKeysByAggType = _.groupBy(aggTsKeys, value => value.agg); |
|
|
|
const aggSubscriptionData: AggSubscriptionData = {}; |
|
|
|
for (const aggTypeString of Object.keys(tsKeysByAggType)) { |
|
|
|
const tsKeys = tsKeysByAggType[aggTypeString]; |
|
|
|
const latestTsAggSubsciptionData: SubscriptionData = {}; |
|
|
|
for (const tsKey of tsKeys) { |
|
|
|
latestTsAggSubsciptionData[tsKey.key] = subscriptionData[tsKey.key]; |
|
|
|
} |
|
|
|
dataAggregator.onData(AggregationType[aggTypeString], {data: latestTsAggSubsciptionData}, true, false, true); |
|
|
|
aggSubscriptionData[aggTypeString] = latestTsAggSubsciptionData; |
|
|
|
} |
|
|
|
dataAggregator.onData(aggSubscriptionData, true, false, true); |
|
|
|
} |
|
|
|
} else { |
|
|
|
this.onData(null, subscriptionData, dataKeyType, dataIndex, true, |
|
|
|
this.entityDataSubscriptionOptions.type === widgetType.timeseries, dataUpdatedCb); |
|
|
|
this.onData(subscriptionData, dataKeyType, dataIndex, true, |
|
|
|
this.entityDataSubscriptionOptions.type === widgetType.timeseries, false, dataUpdatedCb); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@ -824,89 +834,99 @@ export class EntityDataSubscription { |
|
|
|
const subscriptionData = this.toSubscriptionData(entityData.timeseries, true); |
|
|
|
if (this.dataAggregators && this.dataAggregators[dataIndex]) { |
|
|
|
const dataAggregator = this.dataAggregators[dataIndex]; |
|
|
|
const aggSubscriptionData: AggSubscriptionData = {}; |
|
|
|
aggSubscriptionData[this.subsTw.aggregation.type] = subscriptionData; |
|
|
|
let prevDataCb; |
|
|
|
if (!isUpdate) { |
|
|
|
prevDataCb = dataAggregator.updateOnDataCb((aggType, data, detectChanges) => { |
|
|
|
this.onData(null, data, this.datasourceType === DatasourceType.function ? |
|
|
|
DataKeyType.function : DataKeyType.timeseries, dataIndex, detectChanges, false, dataUpdatedCb); |
|
|
|
prevDataCb = dataAggregator.updateOnDataCb((data, detectChanges) => { |
|
|
|
this.onAggData(data, this.datasourceType === DatasourceType.function ? |
|
|
|
DataKeyType.function : DataKeyType.timeseries, dataIndex, detectChanges, false, false, dataUpdatedCb); |
|
|
|
}); |
|
|
|
} |
|
|
|
dataAggregator.onData(this.subsTw.aggregation.type, {data: subscriptionData}, false, this.history, true); |
|
|
|
dataAggregator.onData(aggSubscriptionData, false, this.history, true); |
|
|
|
if (prevDataCb) { |
|
|
|
dataAggregator.updateOnDataCb(prevDataCb); |
|
|
|
} |
|
|
|
} else if (!this.history && !isUpdate) { |
|
|
|
this.onData(null, |
|
|
|
subscriptionData, DataKeyType.timeseries, dataIndex, true, false, dataUpdatedCb); |
|
|
|
this.onData(subscriptionData, DataKeyType.timeseries, dataIndex, true, false, false, dataUpdatedCb); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private onData(aggType: AggregationType | undefined | null, |
|
|
|
sourceData: SubscriptionData, type: DataKeyType, dataIndex: number, detectChanges: boolean, |
|
|
|
isTsLatest: boolean, dataUpdatedCb: DataUpdatedCb) { |
|
|
|
for (const keyName of Object.keys(sourceData)) { |
|
|
|
const keyData = sourceData[keyName]; |
|
|
|
const aggSuffix = aggType && aggType !== AggregationType.NONE ? `_${aggType.toLowerCase()}` : ''; |
|
|
|
const key = `${keyName}_${type}${aggSuffix}${isTsLatest ? '_latest' : ''}`; |
|
|
|
const dataKeyList = this.dataKeys[key] as Array<SubscriptionDataKey>; |
|
|
|
for (let keyIndex = 0; dataKeyList && keyIndex < dataKeyList.length; keyIndex++) { |
|
|
|
const datasourceKey = `${key}_${keyIndex}`; |
|
|
|
if (this.datasourceData[dataIndex][datasourceKey].data) { |
|
|
|
const dataKey = dataKeyList[keyIndex]; |
|
|
|
const data: DataSet = []; |
|
|
|
let prevSeries: [number, any]; |
|
|
|
let prevOrigSeries: [number, any]; |
|
|
|
let datasourceKeyData: DataSet; |
|
|
|
let datasourceOrigKeyData: DataSet; |
|
|
|
let update = false; |
|
|
|
if (this.realtime && !isTsLatest) { |
|
|
|
datasourceKeyData = []; |
|
|
|
datasourceOrigKeyData = []; |
|
|
|
} else { |
|
|
|
datasourceKeyData = this.datasourceData[dataIndex][datasourceKey].data; |
|
|
|
datasourceOrigKeyData = this.datasourceOrigData[dataIndex][datasourceKey].data; |
|
|
|
} |
|
|
|
if (datasourceKeyData.length > 0) { |
|
|
|
prevSeries = datasourceKeyData[datasourceKeyData.length - 1]; |
|
|
|
prevOrigSeries = datasourceOrigKeyData[datasourceOrigKeyData.length - 1]; |
|
|
|
} else { |
|
|
|
prevSeries = [0, 0]; |
|
|
|
prevOrigSeries = [0, 0]; |
|
|
|
} |
|
|
|
this.datasourceOrigData[dataIndex][datasourceKey].data = []; |
|
|
|
if (this.entityDataSubscriptionOptions.type === widgetType.timeseries && !isTsLatest) { |
|
|
|
keyData.forEach((keySeries) => { |
|
|
|
let series = keySeries; |
|
|
|
const time = series[0]; |
|
|
|
this.datasourceOrigData[dataIndex][datasourceKey].data.push([series[0], series[1]]); |
|
|
|
let value = EntityDataSubscription.convertValue(series[1]); |
|
|
|
if (dataKey.postFunc) { |
|
|
|
value = dataKey.postFunc(time, value, prevSeries[1], prevOrigSeries[0], prevOrigSeries[1]); |
|
|
|
} |
|
|
|
prevOrigSeries = [series[0], series[1]]; |
|
|
|
series = [series[0], value]; |
|
|
|
data.push([series[0], series[1]]); |
|
|
|
prevSeries = [series[0], series[1]]; |
|
|
|
}); |
|
|
|
update = true; |
|
|
|
} else if (this.entityDataSubscriptionOptions.type === widgetType.latest || isTsLatest) { |
|
|
|
if (keyData.length > 0) { |
|
|
|
let series = keyData[0]; |
|
|
|
const time = series[0]; |
|
|
|
this.datasourceOrigData[dataIndex][datasourceKey].data.push([series[0], series[1]]); |
|
|
|
let value = EntityDataSubscription.convertValue(series[1]); |
|
|
|
if (dataKey.postFunc) { |
|
|
|
value = dataKey.postFunc(time, value, prevSeries[1], prevOrigSeries[0], prevOrigSeries[1]); |
|
|
|
private onData(sourceData: SubscriptionData, type: DataKeyType, dataIndex: number, detectChanges: boolean, |
|
|
|
isTsLatest: boolean, isAggLatest: boolean, dataUpdatedCb: DataUpdatedCb) { |
|
|
|
const aggSubscriptionData: AggSubscriptionData = {}; |
|
|
|
aggSubscriptionData[AggregationType.NONE] = sourceData; |
|
|
|
this.onAggData(aggSubscriptionData, type, dataIndex, detectChanges, isTsLatest, isAggLatest, dataUpdatedCb); |
|
|
|
} |
|
|
|
|
|
|
|
private onAggData(sourceData: AggSubscriptionData, type: DataKeyType, dataIndex: number, detectChanges: boolean, |
|
|
|
isTsLatest: boolean, isAggLatest: boolean, dataUpdatedCb: DataUpdatedCb) { |
|
|
|
for (const aggTypeString of Object.keys(sourceData)) { |
|
|
|
const aggType = AggregationType[aggTypeString]; |
|
|
|
const aggSuffix = isAggLatest ? (aggType !== AggregationType.NONE ? `_${aggType.toLowerCase()}` : '') : ''; |
|
|
|
for (const keyName of Object.keys(sourceData[aggType])) { |
|
|
|
const keyData = sourceData[aggType][keyName]; |
|
|
|
const key = `${keyName}_${type}${aggSuffix}${isTsLatest ? '_latest' : ''}`; |
|
|
|
const dataKeyList = this.dataKeys[key] as Array<SubscriptionDataKey>; |
|
|
|
for (let keyIndex = 0; dataKeyList && keyIndex < dataKeyList.length; keyIndex++) { |
|
|
|
const datasourceKey = `${key}_${keyIndex}`; |
|
|
|
if (this.datasourceData[dataIndex][datasourceKey].data) { |
|
|
|
const dataKey = dataKeyList[keyIndex]; |
|
|
|
const data: DataSet = []; |
|
|
|
let prevSeries: [number, any]; |
|
|
|
let prevOrigSeries: [number, any]; |
|
|
|
let datasourceKeyData: DataSet; |
|
|
|
let datasourceOrigKeyData: DataSet; |
|
|
|
let update = false; |
|
|
|
if (this.realtime && !isTsLatest) { |
|
|
|
datasourceKeyData = []; |
|
|
|
datasourceOrigKeyData = []; |
|
|
|
} else { |
|
|
|
datasourceKeyData = this.datasourceData[dataIndex][datasourceKey].data; |
|
|
|
datasourceOrigKeyData = this.datasourceOrigData[dataIndex][datasourceKey].data; |
|
|
|
} |
|
|
|
if (datasourceKeyData.length > 0) { |
|
|
|
prevSeries = datasourceKeyData[datasourceKeyData.length - 1]; |
|
|
|
prevOrigSeries = datasourceOrigKeyData[datasourceOrigKeyData.length - 1]; |
|
|
|
} else { |
|
|
|
prevSeries = [0, 0]; |
|
|
|
prevOrigSeries = [0, 0]; |
|
|
|
} |
|
|
|
this.datasourceOrigData[dataIndex][datasourceKey].data = []; |
|
|
|
if (this.entityDataSubscriptionOptions.type === widgetType.timeseries && !isTsLatest) { |
|
|
|
keyData.forEach((keySeries) => { |
|
|
|
let series = keySeries; |
|
|
|
const time = series[0]; |
|
|
|
this.datasourceOrigData[dataIndex][datasourceKey].data.push([series[0], series[1]]); |
|
|
|
let value = EntityDataSubscription.convertValue(series[1]); |
|
|
|
if (dataKey.postFunc) { |
|
|
|
value = dataKey.postFunc(time, value, prevSeries[1], prevOrigSeries[0], prevOrigSeries[1]); |
|
|
|
} |
|
|
|
prevOrigSeries = [series[0], series[1]]; |
|
|
|
series = [series[0], value]; |
|
|
|
data.push([series[0], series[1]]); |
|
|
|
prevSeries = [series[0], series[1]]; |
|
|
|
}); |
|
|
|
update = true; |
|
|
|
} else if (this.entityDataSubscriptionOptions.type === widgetType.latest || isTsLatest) { |
|
|
|
if (keyData.length > 0) { |
|
|
|
let series = keyData[0]; |
|
|
|
const time = series[0]; |
|
|
|
this.datasourceOrigData[dataIndex][datasourceKey].data.push([series[0], series[1]]); |
|
|
|
let value = EntityDataSubscription.convertValue(series[1]); |
|
|
|
if (dataKey.postFunc) { |
|
|
|
value = dataKey.postFunc(time, value, prevSeries[1], prevOrigSeries[0], prevOrigSeries[1]); |
|
|
|
} |
|
|
|
series = [time, value]; |
|
|
|
data.push([series[0], series[1]]); |
|
|
|
} |
|
|
|
series = [time, value]; |
|
|
|
data.push([series[0], series[1]]); |
|
|
|
update = true; |
|
|
|
} |
|
|
|
if (update) { |
|
|
|
this.datasourceData[dataIndex][datasourceKey].data = data; |
|
|
|
dataUpdatedCb(this.datasourceData[dataIndex][datasourceKey], dataIndex, dataKey.index, detectChanges, isTsLatest); |
|
|
|
} |
|
|
|
update = true; |
|
|
|
} |
|
|
|
if (update) { |
|
|
|
this.datasourceData[dataIndex][datasourceKey].data = data; |
|
|
|
dataUpdatedCb(this.datasourceData[dataIndex][datasourceKey], dataIndex, dataKey.index, detectChanges, isTsLatest); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@ -934,18 +954,16 @@ export class EntityDataSubscription { |
|
|
|
private createRealtimeDataAggregator(subsTw: SubscriptionTimewindow, |
|
|
|
tsKeys: Array<AggKey>, |
|
|
|
isLatestDataAgg: boolean, |
|
|
|
isFloatingLatestDataAgg: boolean, |
|
|
|
dataKeyType: DataKeyType, |
|
|
|
dataIndex: number, |
|
|
|
dataUpdatedCb: DataUpdatedCb): DataAggregator { |
|
|
|
return new DataAggregator( |
|
|
|
(aggType, data, detectChanges) => { |
|
|
|
this.onData(isLatestDataAgg ? aggType : null, data, dataKeyType, dataIndex, detectChanges, |
|
|
|
isLatestDataAgg && (this.entityDataSubscriptionOptions.type === widgetType.timeseries), dataUpdatedCb); |
|
|
|
(data, detectChanges) => { |
|
|
|
this.onAggData(data, dataKeyType, dataIndex, detectChanges, |
|
|
|
isLatestDataAgg && (this.entityDataSubscriptionOptions.type === widgetType.timeseries), isLatestDataAgg, dataUpdatedCb); |
|
|
|
}, |
|
|
|
tsKeys, |
|
|
|
isLatestDataAgg, |
|
|
|
isFloatingLatestDataAgg, |
|
|
|
subsTw, |
|
|
|
this.utils, |
|
|
|
this.entityDataSubscriptionOptions.ignoreDataUpdateOnIntervalTick |
|
|
|
@ -1030,9 +1048,9 @@ export class EntityDataSubscription { |
|
|
|
let startTime: number; |
|
|
|
let endTime: number; |
|
|
|
let delta: number; |
|
|
|
const generatedData: SubscriptionDataHolder = { |
|
|
|
data: {} |
|
|
|
}; |
|
|
|
const aggType = this.entityDataSubscriptionOptions.subscriptionTimewindow.aggregation.type; |
|
|
|
const generatedData: AggSubscriptionData = {}; |
|
|
|
generatedData[aggType] = {}; |
|
|
|
if (!this.history) { |
|
|
|
delta = Math.floor(this.tickElapsed / this.frequency); |
|
|
|
} |
|
|
|
@ -1065,11 +1083,10 @@ export class EntityDataSubscription { |
|
|
|
endTime = Math.min(currentTime, endTime); |
|
|
|
} |
|
|
|
} |
|
|
|
generatedData.data[`${dataKey.name}_${dataKey.index}`] = this.generateSeries(dataKey, startTime, endTime); |
|
|
|
generatedData[aggType][`${dataKey.name}_${dataKey.index}`] = this.generateSeries(dataKey, startTime, endTime); |
|
|
|
} |
|
|
|
if (this.dataAggregators && this.dataAggregators.length) { |
|
|
|
this.dataAggregators[0].onData(this.entityDataSubscriptionOptions.subscriptionTimewindow.aggregation.type, |
|
|
|
generatedData, true, this.history, detectChanges); |
|
|
|
this.dataAggregators[0].onData(generatedData, true, this.history, detectChanges); |
|
|
|
} |
|
|
|
|
|
|
|
if (!this.history) { |
|
|
|
|