|
|
|
@ -35,19 +35,21 @@ import { |
|
|
|
TelemetrySubscriber |
|
|
|
} from '@shared/models/telemetry/telemetry.models'; |
|
|
|
import { UtilsService } from '@core/services/utils.service'; |
|
|
|
import { EntityDataListener } from '@core/api/entity-data.service'; |
|
|
|
import { EntityDataListener, EntityDataLoadResult } from '@core/api/entity-data.service'; |
|
|
|
import { deepClone, isDefinedAndNotNull, isObject, objectHashCode } from '@core/utils'; |
|
|
|
import { PageData } from '@shared/models/page/page-data'; |
|
|
|
import { DataAggregator } from '@core/api/data-aggregator'; |
|
|
|
import { NULL_UUID } from '@shared/models/id/has-uuid'; |
|
|
|
import { EntityType } from '@shared/models/entity-type.models'; |
|
|
|
import Timeout = NodeJS.Timeout; |
|
|
|
import { Observable, of, ReplaySubject, Subject } from 'rxjs'; |
|
|
|
|
|
|
|
export interface EntityDataSubscriptionOptions { |
|
|
|
datasourceType: DatasourceType; |
|
|
|
dataKeys: Array<SubscriptionDataKey>; |
|
|
|
type: widgetType; |
|
|
|
entityFilter?: EntityFilter; |
|
|
|
isLatestDataSubscription?: boolean; |
|
|
|
pageLink?: EntityDataPageLink; |
|
|
|
keyFilters?: Array<KeyFilter>; |
|
|
|
subscriptionTimewindow?: SubscriptionTimewindow; |
|
|
|
@ -59,21 +61,19 @@ declare type DataUpdatedCb = (data: DataSetHolder, dataIndex: number, dataKeyInd |
|
|
|
|
|
|
|
export class EntityDataSubscription { |
|
|
|
|
|
|
|
private listeners: Array<EntityDataListener> = []; |
|
|
|
private datasourceType: DatasourceType = this.entityDataSubscriptionOptions.datasourceType; |
|
|
|
|
|
|
|
private history = this.entityDataSubscriptionOptions.subscriptionTimewindow && |
|
|
|
isObject(this.entityDataSubscriptionOptions.subscriptionTimewindow.fixedWindow); |
|
|
|
|
|
|
|
private realtime = this.entityDataSubscriptionOptions.subscriptionTimewindow && |
|
|
|
isDefinedAndNotNull(this.entityDataSubscriptionOptions.subscriptionTimewindow.realtimeWindowMs); |
|
|
|
private history: boolean; |
|
|
|
private realtime: boolean; |
|
|
|
|
|
|
|
private subscriber: TelemetrySubscriber; |
|
|
|
private dataCommand: EntityDataCmd; |
|
|
|
private subsCommand: EntityDataCmd; |
|
|
|
|
|
|
|
private attrFields: Array<EntityKey>; |
|
|
|
private tsFields: Array<EntityKey>; |
|
|
|
private latestValues: Array<EntityKey>; |
|
|
|
|
|
|
|
private entityDataResolveSubject: Subject<EntityDataLoadResult>; |
|
|
|
private pageData: PageData<EntityData>; |
|
|
|
private subsTw: SubscriptionTimewindow; |
|
|
|
private dataAggregators: Array<DataAggregator>; |
|
|
|
@ -87,7 +87,11 @@ export class EntityDataSubscription { |
|
|
|
private tickElapsed = 0; |
|
|
|
private timer: Timeout; |
|
|
|
|
|
|
|
constructor(private entityDataSubscriptionOptions: EntityDataSubscriptionOptions, |
|
|
|
private dataResolved = false; |
|
|
|
private started = false; |
|
|
|
|
|
|
|
constructor(public entityDataSubscriptionOptions: EntityDataSubscriptionOptions, |
|
|
|
private listener: EntityDataListener, |
|
|
|
private telemetryService: TelemetryService, |
|
|
|
private utils: UtilsService) { |
|
|
|
this.initializeSubscription(); |
|
|
|
@ -126,50 +130,6 @@ export class EntityDataSubscription { |
|
|
|
} |
|
|
|
dataKey.key = key; |
|
|
|
} |
|
|
|
if (this.datasourceType === DatasourceType.function) { |
|
|
|
this.frequency = 1000; |
|
|
|
if (this.entityDataSubscriptionOptions.type === widgetType.timeseries) { |
|
|
|
this.frequency = Math.min(this.entityDataSubscriptionOptions.subscriptionTimewindow.aggregation.interval, 5000); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public addListener(listener: EntityDataListener) { |
|
|
|
this.listeners.push(listener); |
|
|
|
} |
|
|
|
|
|
|
|
public hasListeners(): boolean { |
|
|
|
return this.listeners.length > 0; |
|
|
|
} |
|
|
|
|
|
|
|
public removeListener(listener: EntityDataListener) { |
|
|
|
this.listeners.splice(this.listeners.indexOf(listener), 1); |
|
|
|
} |
|
|
|
|
|
|
|
public syncListener(listener: EntityDataListener) { |
|
|
|
if (this.pageData) { |
|
|
|
let key: string; |
|
|
|
let dataKey: SubscriptionDataKey; |
|
|
|
const data: Array<Array<DataSetHolder>> = []; |
|
|
|
for (let dataIndex = 0; dataIndex < this.pageData.data.length; dataIndex++) { |
|
|
|
data[dataIndex] = []; |
|
|
|
for (key of Object.keys(this.dataKeys)) { |
|
|
|
if (this.datasourceType === DatasourceType.entity || this.entityDataSubscriptionOptions.type === widgetType.timeseries) { |
|
|
|
const dataKeysList = this.dataKeys[key] as Array<SubscriptionDataKey>; |
|
|
|
for (let i = 0; i < dataKeysList.length; i++) { |
|
|
|
dataKey = dataKeysList[i]; |
|
|
|
const datasourceKey = `${key}_${i}`; |
|
|
|
data[dataIndex][dataKey.index] = this.datasourceData[dataIndex][datasourceKey]; |
|
|
|
} |
|
|
|
} else { |
|
|
|
dataKey = this.dataKeys[key] as SubscriptionDataKey; |
|
|
|
data[dataIndex][dataKey.index] = this.datasourceData[dataIndex][key]; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
listener.dataLoaded(this.pageData, data, listener.configDatasourceIndex); |
|
|
|
} |
|
|
|
this.listeners.push(listener); |
|
|
|
} |
|
|
|
|
|
|
|
public unsubscribe() { |
|
|
|
@ -192,19 +152,30 @@ export class EntityDataSubscription { |
|
|
|
this.pageData = null; |
|
|
|
} |
|
|
|
|
|
|
|
public start() { |
|
|
|
this.subsTw = this.entityDataSubscriptionOptions.subscriptionTimewindow; |
|
|
|
public subscribe(): Observable<EntityDataLoadResult> { |
|
|
|
if (!this.entityDataSubscriptionOptions.isLatestDataSubscription) { |
|
|
|
this.entityDataResolveSubject = new ReplaySubject(1); |
|
|
|
} else { |
|
|
|
this.started = true; |
|
|
|
this.dataResolved = true; |
|
|
|
} |
|
|
|
if (this.datasourceType === DatasourceType.entity) { |
|
|
|
const entityFields: Array<EntityKey> = |
|
|
|
this.entityDataSubscriptionOptions.dataKeys.filter(dataKey => dataKey.type === DataKeyType.entityField).map( |
|
|
|
dataKey => ({ type: EntityKeyType.ENTITY_FIELD, key: dataKey.name }) |
|
|
|
); |
|
|
|
dataKey => ({ type: EntityKeyType.ENTITY_FIELD, key: dataKey.name }) |
|
|
|
); |
|
|
|
if (!entityFields.find(key => key.key === 'name')) { |
|
|
|
entityFields.push({ |
|
|
|
type: EntityKeyType.ENTITY_FIELD, |
|
|
|
key: 'name' |
|
|
|
}); |
|
|
|
} |
|
|
|
if (!entityFields.find(key => key.key === 'label')) { |
|
|
|
entityFields.push({ |
|
|
|
type: EntityKeyType.ENTITY_FIELD, |
|
|
|
key: 'label' |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
this.attrFields = this.entityDataSubscriptionOptions.dataKeys.filter(dataKey => dataKey.type === DataKeyType.attribute).map( |
|
|
|
dataKey => ({ type: EntityKeyType.ATTRIBUTE, key: dataKey.name }) |
|
|
|
@ -217,9 +188,9 @@ export class EntityDataSubscription { |
|
|
|
this.latestValues = this.attrFields.concat(this.tsFields); |
|
|
|
|
|
|
|
this.subscriber = new TelemetrySubscriber(this.telemetryService); |
|
|
|
const command = new EntityDataCmd(); |
|
|
|
this.dataCommand = new EntityDataCmd(); |
|
|
|
|
|
|
|
command.query = { |
|
|
|
this.dataCommand.query = { |
|
|
|
entityFilter: this.entityDataSubscriptionOptions.entityFilter, |
|
|
|
pageLink: this.entityDataSubscriptionOptions.pageLink, |
|
|
|
keyFilters: this.entityDataSubscriptionOptions.keyFilters, |
|
|
|
@ -227,72 +198,17 @@ export class EntityDataSubscription { |
|
|
|
latestValues: this.latestValues |
|
|
|
}; |
|
|
|
|
|
|
|
if (this.entityDataSubscriptionOptions.type === widgetType.timeseries) { |
|
|
|
if (this.tsFields.length > 0) { |
|
|
|
if (this.history) { |
|
|
|
command.historyCmd = { |
|
|
|
keys: this.tsFields.map(key => key.key), |
|
|
|
startTs: this.subsTw.fixedWindow.startTimeMs, |
|
|
|
endTs: this.subsTw.fixedWindow.endTimeMs, |
|
|
|
interval: this.subsTw.aggregation.interval, |
|
|
|
limit: this.subsTw.aggregation.limit, |
|
|
|
agg: this.subsTw.aggregation.type |
|
|
|
if (this.entityDataSubscriptionOptions.isLatestDataSubscription) { |
|
|
|
if (this.entityDataSubscriptionOptions.type === widgetType.latest) { |
|
|
|
if (this.latestValues.length > 0) { |
|
|
|
this.dataCommand.latestCmd = { |
|
|
|
keys: this.latestValues |
|
|
|
}; |
|
|
|
if (this.subsTw.aggregation.stateData) { |
|
|
|
command.historyCmd.startTs -= YEAR; |
|
|
|
} |
|
|
|
} else { |
|
|
|
command.tsCmd = { |
|
|
|
keys: this.tsFields.map(key => key.key), |
|
|
|
startTs: this.subsTw.startTs, |
|
|
|
timeWindow: this.subsTw.aggregation.timeWindow, |
|
|
|
interval: this.subsTw.aggregation.interval, |
|
|
|
limit: this.subsTw.aggregation.limit, |
|
|
|
agg: this.subsTw.aggregation.type |
|
|
|
} |
|
|
|
if (this.subsTw.aggregation.stateData) { |
|
|
|
command.historyCmd = { |
|
|
|
keys: this.tsFields.map(key => key.key), |
|
|
|
startTs: this.subsTw.startTs - YEAR, |
|
|
|
endTs: this.subsTw.startTs, |
|
|
|
interval: this.subsTw.aggregation.interval, |
|
|
|
limit: this.subsTw.aggregation.limit, |
|
|
|
agg: this.subsTw.aggregation.type |
|
|
|
}; |
|
|
|
} |
|
|
|
this.subscriber.reconnect$.subscribe(() => { |
|
|
|
let newSubsTw: SubscriptionTimewindow = null; |
|
|
|
this.listeners.forEach((listener) => { |
|
|
|
if (!newSubsTw) { |
|
|
|
newSubsTw = listener.updateRealtimeSubscription(); |
|
|
|
} else { |
|
|
|
listener.setRealtimeSubscription(newSubsTw); |
|
|
|
} |
|
|
|
}); |
|
|
|
this.subsTw = newSubsTw; |
|
|
|
command.tsCmd.startTs = this.subsTw.startTs; |
|
|
|
command.tsCmd.timeWindow = this.subsTw.aggregation.timeWindow; |
|
|
|
command.tsCmd.interval = this.subsTw.aggregation.interval; |
|
|
|
command.tsCmd.limit = this.subsTw.aggregation.limit; |
|
|
|
command.tsCmd.agg = this.subsTw.aggregation.type; |
|
|
|
if (this.subsTw.aggregation.stateData) { |
|
|
|
command.historyCmd.startTs = this.subsTw.startTs - YEAR; |
|
|
|
command.historyCmd.endTs = this.subsTw.startTs; |
|
|
|
command.historyCmd.interval = this.subsTw.aggregation.interval; |
|
|
|
command.historyCmd.limit = this.subsTw.aggregation.limit; |
|
|
|
command.historyCmd.agg = this.subsTw.aggregation.type; |
|
|
|
} |
|
|
|
}); |
|
|
|
} |
|
|
|
} |
|
|
|
} else if (this.entityDataSubscriptionOptions.type === widgetType.latest) { |
|
|
|
if (this.latestValues.length > 0) { |
|
|
|
command.latestCmd = { |
|
|
|
keys: this.latestValues.map(key => key.key) |
|
|
|
}; |
|
|
|
} |
|
|
|
} |
|
|
|
this.subscriber.subscriptionCommands.push(command); |
|
|
|
|
|
|
|
this.subscriber.subscriptionCommands.push(this.dataCommand); |
|
|
|
|
|
|
|
this.subscriber.entityData$.subscribe( |
|
|
|
(entityDataUpdate) => { |
|
|
|
@ -304,6 +220,30 @@ export class EntityDataSubscription { |
|
|
|
} |
|
|
|
); |
|
|
|
|
|
|
|
this.subscriber.reconnect$.subscribe(() => { |
|
|
|
const newSubsTw: SubscriptionTimewindow = this.listener.updateRealtimeSubscription(); |
|
|
|
this.listener.setRealtimeSubscription(newSubsTw); |
|
|
|
this.subsTw = newSubsTw; |
|
|
|
if (this.started && !this.entityDataSubscriptionOptions.isLatestDataSubscription) { |
|
|
|
this.subsCommand.tsCmd.startTs = this.subsTw.startTs; |
|
|
|
this.subsCommand.tsCmd.timeWindow = this.subsTw.aggregation.timeWindow; |
|
|
|
this.subsCommand.tsCmd.interval = this.subsTw.aggregation.interval; |
|
|
|
this.subsCommand.tsCmd.limit = this.subsTw.aggregation.limit; |
|
|
|
this.subsCommand.tsCmd.agg = this.subsTw.aggregation.type; |
|
|
|
if (this.subsTw.aggregation.stateData) { |
|
|
|
this.subsCommand.historyCmd.startTs = this.subsTw.startTs - YEAR; |
|
|
|
this.subsCommand.historyCmd.endTs = this.subsTw.startTs; |
|
|
|
this.subsCommand.historyCmd.interval = this.subsTw.aggregation.interval; |
|
|
|
this.subsCommand.historyCmd.limit = this.subsTw.aggregation.limit; |
|
|
|
this.subsCommand.historyCmd.agg = this.subsTw.aggregation.type; |
|
|
|
} |
|
|
|
this.subsCommand.query = this.dataCommand.query; |
|
|
|
this.subscriber.subscriptionCommands = [this.subsCommand]; |
|
|
|
} else { |
|
|
|
this.subscriber.subscriptionCommands = [this.dataCommand]; |
|
|
|
} |
|
|
|
}); |
|
|
|
|
|
|
|
this.subscriber.subscribe(); |
|
|
|
} else if (this.datasourceType === DatasourceType.function) { |
|
|
|
const entityData: EntityData = { |
|
|
|
@ -325,29 +265,46 @@ export class EntityDataSubscription { |
|
|
|
totalPages: 1 |
|
|
|
}; |
|
|
|
this.onPageData(pageData); |
|
|
|
this.tickScheduledTime = this.utils.currentPerfTime(); |
|
|
|
if (this.history) { |
|
|
|
this.onTick(true); |
|
|
|
} else { |
|
|
|
this.timer = setTimeout(this.onTick.bind(this, true), 0); |
|
|
|
if (this.entityDataSubscriptionOptions.isLatestDataSubscription) { |
|
|
|
if (this.entityDataSubscriptionOptions.type === widgetType.latest) { |
|
|
|
this.frequency = 1000; |
|
|
|
this.timer = setTimeout(this.onTick.bind(this, true), 0); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
if (this.entityDataSubscriptionOptions.isLatestDataSubscription) { |
|
|
|
return of(null); |
|
|
|
} else { |
|
|
|
return this.entityDataResolveSubject.asObservable(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private onPageData(pageData: PageData<EntityData>) { |
|
|
|
public start() { |
|
|
|
if (this.entityDataSubscriptionOptions.isLatestDataSubscription) { |
|
|
|
return; |
|
|
|
} |
|
|
|
this.subsTw = this.entityDataSubscriptionOptions.subscriptionTimewindow; |
|
|
|
this.history = this.entityDataSubscriptionOptions.subscriptionTimewindow && |
|
|
|
isObject(this.entityDataSubscriptionOptions.subscriptionTimewindow.fixedWindow); |
|
|
|
this.realtime = this.entityDataSubscriptionOptions.subscriptionTimewindow && |
|
|
|
isDefinedAndNotNull(this.entityDataSubscriptionOptions.subscriptionTimewindow.realtimeWindowMs); |
|
|
|
|
|
|
|
if (this.timer) { |
|
|
|
clearTimeout(this.timer); |
|
|
|
this.timer = null; |
|
|
|
} |
|
|
|
|
|
|
|
if (this.dataAggregators) { |
|
|
|
this.dataAggregators.forEach((aggregator) => { |
|
|
|
aggregator.destroy(); |
|
|
|
}) |
|
|
|
this.dataAggregators = null; |
|
|
|
} |
|
|
|
this.datasourceData = []; |
|
|
|
this.dataAggregators = []; |
|
|
|
this.entityIdToDataIndex = {}; |
|
|
|
let tsKeyNames; |
|
|
|
this.resetData(); |
|
|
|
|
|
|
|
if (this.entityDataSubscriptionOptions.type === widgetType.timeseries) { |
|
|
|
let tsKeyNames = []; |
|
|
|
if (this.datasourceType === DatasourceType.function) { |
|
|
|
tsKeyNames = []; |
|
|
|
for (const key of Object.keys(this.dataKeys)) { |
|
|
|
const dataKeysList = this.dataKeys[key] as Array<SubscriptionDataKey>; |
|
|
|
dataKeysList.forEach((subscriptionDataKey) => { |
|
|
|
@ -357,20 +314,85 @@ export class EntityDataSubscription { |
|
|
|
} else { |
|
|
|
tsKeyNames = this.tsFields ? this.tsFields.map(field => field.key) : []; |
|
|
|
} |
|
|
|
} |
|
|
|
for (let dataIndex = 0; dataIndex < pageData.data.length; dataIndex++) { |
|
|
|
const entityData = pageData.data[dataIndex]; |
|
|
|
this.entityIdToDataIndex[entityData.entityId.id] = dataIndex; |
|
|
|
this.datasourceData[dataIndex] = {}; |
|
|
|
if (this.entityDataSubscriptionOptions.type === widgetType.timeseries) { |
|
|
|
for (let dataIndex = 0; dataIndex < this.pageData.data.length; dataIndex++) { |
|
|
|
if (this.datasourceType === DatasourceType.function) { |
|
|
|
this.dataAggregators[dataIndex] = this.createRealtimeDataAggregator(this.subsTw, tsKeyNames, |
|
|
|
DataKeyType.function, dataIndex, this.notifyListeners.bind(this)); |
|
|
|
DataKeyType.function, dataIndex, this.notifyListener.bind(this)); |
|
|
|
} else if (!this.history && tsKeyNames.length) { |
|
|
|
this.dataAggregators[dataIndex] = this.createRealtimeDataAggregator(this.subsTw, tsKeyNames, |
|
|
|
DataKeyType.timeseries, dataIndex, this.notifyListeners.bind(this)); |
|
|
|
DataKeyType.timeseries, dataIndex, this.notifyListener.bind(this)); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
if (this.datasourceType === DatasourceType.entity) { |
|
|
|
this.subsCommand = new EntityDataCmd(); |
|
|
|
this.subsCommand.cmdId = this.dataCommand.cmdId; |
|
|
|
if (this.entityDataSubscriptionOptions.type === widgetType.timeseries) { |
|
|
|
if (this.tsFields.length > 0) { |
|
|
|
if (this.history) { |
|
|
|
this.subsCommand.historyCmd = { |
|
|
|
keys: this.tsFields.map(key => key.key), |
|
|
|
startTs: this.subsTw.fixedWindow.startTimeMs, |
|
|
|
endTs: this.subsTw.fixedWindow.endTimeMs, |
|
|
|
interval: this.subsTw.aggregation.interval, |
|
|
|
limit: this.subsTw.aggregation.limit, |
|
|
|
agg: this.subsTw.aggregation.type |
|
|
|
}; |
|
|
|
if (this.subsTw.aggregation.stateData) { |
|
|
|
this.subsCommand.historyCmd.startTs -= YEAR; |
|
|
|
} |
|
|
|
} else { |
|
|
|
this.subsCommand.tsCmd = { |
|
|
|
keys: this.tsFields.map(key => key.key), |
|
|
|
startTs: this.subsTw.startTs, |
|
|
|
timeWindow: this.subsTw.aggregation.timeWindow, |
|
|
|
interval: this.subsTw.aggregation.interval, |
|
|
|
limit: this.subsTw.aggregation.limit, |
|
|
|
agg: this.subsTw.aggregation.type |
|
|
|
} |
|
|
|
if (this.subsTw.aggregation.stateData) { |
|
|
|
this.subsCommand.historyCmd = { |
|
|
|
keys: this.tsFields.map(key => key.key), |
|
|
|
startTs: this.subsTw.startTs - YEAR, |
|
|
|
endTs: this.subsTw.startTs, |
|
|
|
interval: this.subsTw.aggregation.interval, |
|
|
|
limit: this.subsTw.aggregation.limit, |
|
|
|
agg: this.subsTw.aggregation.type |
|
|
|
}; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} else if (this.entityDataSubscriptionOptions.type === widgetType.latest) { |
|
|
|
if (this.latestValues.length > 0) { |
|
|
|
this.subsCommand.latestCmd = { |
|
|
|
keys: this.latestValues |
|
|
|
}; |
|
|
|
} |
|
|
|
} |
|
|
|
this.subscriber.subscriptionCommands = [this.subsCommand]; |
|
|
|
this.subscriber.update(); |
|
|
|
} else if (this.datasourceType === DatasourceType.function) { |
|
|
|
this.frequency = 1000; |
|
|
|
if (this.entityDataSubscriptionOptions.type === widgetType.timeseries) { |
|
|
|
this.frequency = Math.min(this.entityDataSubscriptionOptions.subscriptionTimewindow.aggregation.interval, 5000); |
|
|
|
} |
|
|
|
this.tickScheduledTime = this.utils.currentPerfTime(); |
|
|
|
if (this.history) { |
|
|
|
this.onTick(true); |
|
|
|
} else { |
|
|
|
this.timer = setTimeout(this.onTick.bind(this, true), 0); |
|
|
|
} |
|
|
|
} |
|
|
|
this.started = true; |
|
|
|
} |
|
|
|
|
|
|
|
private resetData() { |
|
|
|
this.datasourceData = []; |
|
|
|
this.entityIdToDataIndex = {}; |
|
|
|
for (let dataIndex = 0; dataIndex < this.pageData.data.length; dataIndex++) { |
|
|
|
const entityData = this.pageData.data[dataIndex]; |
|
|
|
this.entityIdToDataIndex[entityData.entityId.id] = dataIndex; |
|
|
|
this.datasourceData[dataIndex] = {}; |
|
|
|
for (const key of Object.keys(this.dataKeys)) { |
|
|
|
const dataKey = this.dataKeys[key]; |
|
|
|
if (this.datasourceType === DatasourceType.entity || this.entityDataSubscriptionOptions.type === widgetType.timeseries) { |
|
|
|
@ -388,7 +410,23 @@ export class EntityDataSubscription { |
|
|
|
} |
|
|
|
} |
|
|
|
this.datasourceOrigData = deepClone(this.datasourceData); |
|
|
|
if (this.entityDataSubscriptionOptions.type === widgetType.timeseries) { |
|
|
|
for (const key of Object.keys(this.dataKeys)) { |
|
|
|
const dataKeyList = this.dataKeys[key] as Array<SubscriptionDataKey>; |
|
|
|
dataKeyList.forEach((dataKey) => { |
|
|
|
delete dataKey.lastUpdateTime; |
|
|
|
}); |
|
|
|
} |
|
|
|
} else if (this.entityDataSubscriptionOptions.type === widgetType.latest) { |
|
|
|
for (const key of Object.keys(this.dataKeys)) { |
|
|
|
delete (this.dataKeys[key] as SubscriptionDataKey).lastUpdateTime; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private onPageData(pageData: PageData<EntityData>) { |
|
|
|
this.pageData = pageData; |
|
|
|
this.resetData(); |
|
|
|
const data: Array<Array<DataSetHolder>> = []; |
|
|
|
for (let dataIndex = 0; dataIndex < pageData.data.length; dataIndex++) { |
|
|
|
const entityData = pageData.data[dataIndex]; |
|
|
|
@ -401,28 +439,33 @@ export class EntityDataSubscription { |
|
|
|
} |
|
|
|
); |
|
|
|
} |
|
|
|
|
|
|
|
this.pageData = pageData; |
|
|
|
|
|
|
|
this.listeners.forEach((listener) => { |
|
|
|
listener.dataLoaded(pageData, data, |
|
|
|
listener.configDatasourceIndex); |
|
|
|
}); |
|
|
|
if (!this.dataResolved) { |
|
|
|
this.dataResolved = true; |
|
|
|
this.entityDataResolveSubject.next( |
|
|
|
{ |
|
|
|
pageData, |
|
|
|
data, |
|
|
|
datasourceIndex: this.listener.configDatasourceIndex |
|
|
|
} |
|
|
|
); |
|
|
|
this.entityDataResolveSubject.complete(); |
|
|
|
} else { |
|
|
|
this.listener.dataLoaded(pageData, data, |
|
|
|
this.listener.configDatasourceIndex); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private onDataUpdate(update: Array<EntityData>) { |
|
|
|
for (const entityData of update) { |
|
|
|
const dataIndex = this.entityIdToDataIndex[entityData.entityId.id]; |
|
|
|
this.processEntityData(entityData, dataIndex, true, this.notifyListeners.bind(this)); |
|
|
|
this.processEntityData(entityData, dataIndex, true, this.notifyListener.bind(this)); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private notifyListeners(data: DataSetHolder, dataIndex: number, dataKeyIndex: number, detectChanges: boolean) { |
|
|
|
this.listeners.forEach((listener) => { |
|
|
|
listener.dataUpdated(data, |
|
|
|
listener.configDatasourceIndex, |
|
|
|
private notifyListener(data: DataSetHolder, dataIndex: number, dataKeyIndex: number, detectChanges: boolean) { |
|
|
|
this.listener.dataUpdated(data, |
|
|
|
this.listener.configDatasourceIndex, |
|
|
|
dataIndex, dataKeyIndex, detectChanges); |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
private processEntityData(entityData: EntityData, dataIndex: number, aggregate: boolean, |
|
|
|
@ -596,14 +639,10 @@ export class EntityDataSubscription { |
|
|
|
const value = dataKey.func(time, prevSeries[1]); |
|
|
|
const series: [number, any] = [time, value]; |
|
|
|
this.datasourceData[0][dataKey.key].data = [series]; |
|
|
|
this.listeners.forEach( |
|
|
|
(listener) => { |
|
|
|
listener.dataUpdated(this.datasourceData[0][dataKey.key], |
|
|
|
listener.configDatasourceIndex, |
|
|
|
0, |
|
|
|
dataKey.index, detectChanges); |
|
|
|
} |
|
|
|
); |
|
|
|
this.listener.dataUpdated(this.datasourceData[0][dataKey.key], |
|
|
|
this.listener.configDatasourceIndex, |
|
|
|
0, |
|
|
|
dataKey.index, detectChanges); |
|
|
|
} |
|
|
|
|
|
|
|
private onTick(detectChanges: boolean) { |
|
|
|
|