@ -14,9 +14,16 @@
/// limitations under the License.
///
import { DataSet , DataSetHolder , DatasourceType , widgetType } from '@shared/models/widget.models' ;
import { AggregationType , getCurrentTime , SubscriptionTimewindow } from '@shared/models/time/time.models' ;
import { ComparisonResultType , DataSet , DataSetHolder , DatasourceType , widgetType } from '@shared/models/widget.models' ;
import {
AggregationType ,
ComparisonDuration ,
createTimewindowForComparison ,
getCurrentTime ,
SubscriptionTimewindow
} from '@shared/models/time/time.models' ;
import {
ComparisonTsValue ,
EntityData ,
EntityDataPageLink ,
EntityFilter ,
@ -28,11 +35,12 @@ import {
TsValue
} from '@shared/models/query/query.models' ;
import {
AggKey ,
DataKeyType ,
EntityCountCmd ,
EntityDataCmd ,
IndexedSubscriptionData ,
SubscriptionData ,
SubscriptionDataHolder ,
TelemetryService ,
TelemetrySubscriber
} from '@shared/models/telemetry/telemetry.models' ;
@ -55,6 +63,11 @@ declare type DataUpdatedCb = (data: DataSetHolder, dataIndex: number,
export interface SubscriptionDataKey {
name : string ;
type : DataKeyType ;
aggregationType? : AggregationType ;
comparisonEnabled? : boolean ;
timeForComparison? : ComparisonDuration ;
comparisonCustomIntervalValue? : number ;
comparisonResultType? : ComparisonResultType ;
funcBody : string ;
func? : DataKeyFunction ;
postFuncBody : string ;
@ -82,9 +95,16 @@ export interface EntityDataSubscriptionOptions {
export class EntityDataSubscription {
constructor ( private listener : EntityDataListener ,
private telemetryService : TelemetryService ,
private utils : UtilsService ) {
this . initializeSubscription ( ) ;
}
private entityDataSubscriptionOptions = this . listener . subscriptionOptions ;
private datasourceType : DatasourceType = this . entityDataSubscriptionOptions . datasourceType ;
private history : boolean ;
private isFloatingTimewindow : boolean ;
private realtime : boolean ;
private subscriber : TelemetrySubscriber ;
@ -95,13 +115,18 @@ export class EntityDataSubscription {
private attrFields : Array < EntityKey > ;
private tsFields : Array < EntityKey > ;
private latestValues : Array < EntityKey > ;
private aggTsValues : Array < AggKey > ;
private aggTsComparisonValues : Array < AggKey > ;
private entityDataResolveSubject : Subject < EntityDataLoadResult > ;
private pageData : PageData < EntityData > ;
private data : Array < Array < DataSetHolder > > ;
private subsTw : SubscriptionTimewindow ;
private latestTsOffset : number ;
private dataAggregators : Array < DataAggregator > ;
private tsLatestDataAggregators : Array < DataAggregator > ;
private dataKeys : { [ key : string ] : Array < SubscriptionDataKey > | SubscriptionDataKey } = { } ;
private dataKeysList : SubscriptionDataKey [ ] = [ ] ;
private datasourceData : { [ index : number ] : { [ key : string ] : DataSetHolder } } ;
private datasourceOrigData : { [ index : number ] : { [ key : string ] : DataSetHolder } } ;
private entityIdToDataIndex : { [ id : string ] : number } ;
@ -116,15 +141,44 @@ export class EntityDataSubscription {
private dataResolved = false ;
private started = false ;
constructor ( private listener : EntityDataListener ,
private telemetryService : TelemetryService ,
private utils : UtilsService ) {
this . initializeSubscription ( ) ;
private static convertValue ( val : string ) : any {
if ( val && isNumeric ( val ) && Number ( val ) . toString ( ) === val ) {
return Number ( val ) ;
}
return val ;
}
private static calculateComparisonValue ( key : SubscriptionDataKey , comparisonTsValue : ComparisonTsValue ) : [ number , any , number ? ] [ ] {
let timestamp : number ;
let value : any ;
switch ( key . comparisonResultType ) {
case ComparisonResultType . PREVIOUS_VALUE :
timestamp = comparisonTsValue . previous . ts ;
value = comparisonTsValue . previous . value ;
break ;
case ComparisonResultType . DELTA_ABSOLUTE :
case ComparisonResultType . DELTA_PERCENT :
timestamp = comparisonTsValue . previous . ts ;
const currentVal = EntityDataSubscription . convertValue ( comparisonTsValue . current . value ) ;
const prevVal = EntityDataSubscription . convertValue ( comparisonTsValue . previous . value ) ;
if ( isNumeric ( currentVal ) && isNumeric ( prevVal ) ) {
if ( key . comparisonResultType === ComparisonResultType . DELTA_ABSOLUTE ) {
value = currentVal - prevVal ;
} else {
value = ( currentVal - prevVal ) / prevVal * 100 ;
}
} else {
value = '' ;
}
break ;
}
return [ [ timestamp , value ] ] ;
}
private initializeSubscription() {
for ( let i = 0 ; i < this . entityDataSubscriptionOptions . dataKeys . length ; i ++ ) {
const dataKey = deepClone ( this . entityDataSubscriptionOptions . dataKeys [ i ] ) ;
this . dataKeysList . push ( dataKey ) ;
dataKey . index = i ;
if ( this . datasourceType === DatasourceType . function ) {
if ( ! dataKey . func ) {
@ -142,7 +196,8 @@ export class EntityDataSubscription {
if ( this . datasourceType === DatasourceType . function ) {
key = ` ${ dataKey . name } _ ${ dataKey . index } _ ${ dataKey . type } ${ dataKey . latest ? '_latest' : '' } ` ;
} else {
key = ` ${ dataKey . name } _ ${ dataKey . type } ${ dataKey . latest ? '_latest' : '' } ` ;
const keyIndexSuffix = dataKey . aggregationType && dataKey . aggregationType !== AggregationType . NONE ? ` _ ${ dataKey . index } ` : '' ;
key = ` ${ dataKey . name } _ ${ dataKey . type } ${ keyIndexSuffix } ${ dataKey . latest ? '_latest' : '' } ` ;
}
let dataKeysList = this . dataKeys [ key ] as Array < SubscriptionDataKey > ;
if ( ! dataKeysList ) {
@ -180,6 +235,12 @@ export class EntityDataSubscription {
} ) ;
this . dataAggregators = null ;
}
if ( this . tsLatestDataAggregators ) {
this . tsLatestDataAggregators . forEach ( ( aggregator ) = > {
aggregator . destroy ( ) ;
} ) ;
this . tsLatestDataAggregators = null ;
}
this . pageData = null ;
}
@ -188,16 +249,11 @@ export class EntityDataSubscription {
if ( this . entityDataSubscriptionOptions . isPaginatedDataSubscription ) {
this . started = true ;
this . dataResolved = true ;
this . subsTw = this . entityDataSubscriptionOptions . subscriptionTimewindow ;
this . latestTsOffset = this . entityDataSubscriptionOptions . latestTsOffset ;
this . history = this . entityDataSubscriptionOptions . subscriptionTimewindow &&
isObject ( this . entityDataSubscriptionOptions . subscriptionTimewindow . fixedWindow ) ;
this . realtime = this . entityDataSubscriptionOptions . subscriptionTimewindow &&
isDefinedAndNotNull ( this . entityDataSubscriptionOptions . subscriptionTimewindow . realtimeWindowMs ) ;
this . prepareSubscriptionTimewindow ( ) ;
}
if ( this . datasourceType === DatasourceType . entity ) {
const entityFields : Array < EntityKey > =
this . entityDataSubscriptionOptions . dataKeys . filter ( dataKey = > dataKey . type === DataKeyType . entityField ) . map (
this . dataKeysList . filter ( dataKey = > dataKey . type === DataKeyType . entityField ) . map (
dataKey = > ( { type : EntityKeyType . ENTITY_FIELD , key : dataKey.name } )
) ;
if ( ! entityFields . find ( key = > key . key === 'name' ) ) {
@ -219,18 +275,20 @@ export class EntityDataSubscription {
} ) ;
}
this . attrFields = this . entityDataSubscriptionOptions . dataKeys . filter ( dataKey = > dataKey . type === DataKeyType . attribute ) . map (
this . attrFields = this . dataKeysList . filter ( dataKey = > dataKey . type === DataKeyType . attribute ) . map (
dataKey = > ( { type : EntityKeyType . ATTRIBUTE , key : dataKey.name } )
) ;
this . tsFields = this . entityDataSubscriptionOptions . dataKeys .
filter ( dataKey = > dataKey . type === DataKeyType . timeseries && ! dataKey . latest ) . map (
this . tsFields = this . dataKeysList .
filter ( dataKey = > dataKey . type === DataKeyType . timeseries &&
( ! dataKey . aggregationType || dataKey . aggregationType === AggregationType . NONE ) && ! dataKey . latest ) . map (
dataKey = > ( { type : EntityKeyType . TIME_SERIES , key : dataKey.name } )
) ;
if ( this . entityDataSubscriptionOptions . type === widgetType . timeseries ) {
const latestTsFields = this . entityDataSubscriptionOptions . dataKeys .
filter ( dataKey = > dataKey . type === DataKeyType . timeseries && dataKey . latest ) . map (
const latestTsFields = this . dataKeysList .
filter ( dataKey = > dataKey . type === DataKeyType . timeseries && dataKey . latest &&
( ! dataKey . aggregationType || dataKey . aggregationType === AggregationType . NONE ) ) . map (
dataKey = > ( { type : EntityKeyType . TIME_SERIES , key : dataKey.name } )
) ;
this . latestValues = this . attrFields . concat ( latestTsFields ) ;
@ -238,6 +296,19 @@ export class EntityDataSubscription {
this . latestValues = this . attrFields . concat ( this . tsFields ) ;
}
this . aggTsValues = this . dataKeysList .
filter ( dataKey = > dataKey . type === DataKeyType . timeseries &&
dataKey . aggregationType && dataKey . aggregationType !== AggregationType . NONE && ! dataKey . comparisonEnabled ) . map (
dataKey = > ( { id : dataKey.index , key : dataKey.name , agg : dataKey.aggregationType } )
) ;
this . aggTsComparisonValues = this . dataKeysList .
filter ( dataKey = > dataKey . type === DataKeyType . timeseries &&
dataKey . aggregationType && dataKey . aggregationType !== AggregationType . NONE && dataKey . comparisonEnabled ) . map (
dataKey = > ( { id : dataKey.index , key : dataKey.name , agg : dataKey.aggregationType ,
previousValueOnly : dataKey.comparisonResultType === ComparisonResultType . PREVIOUS_VALUE } )
) ;
this . subscriber = new TelemetrySubscriber ( this . telemetryService ) ;
this . dataCommand = new EntityDataCmd ( ) ;
@ -282,19 +353,28 @@ export class EntityDataSubscription {
this . subscriber . reconnect $ . subscribe ( ( ) = > {
if ( this . started ) {
const targetCommand = this . entityDataSubscriptionOptions . isPaginatedDataSubscription ? this . dataCommand : this.subsCommand ;
if ( this . entityDataSubscriptionOptions . type === widgetType . timeseries &&
! this . history && this . tsFields . length ) {
if ( ! this . history && ( this . entityDataSubscriptionOptions . type === widgetType . timeseries && this . tsFields . length ||
this . aggTsValues . length > 0 && ! this . isFloatingTimewindow ) ) {
const newSubsTw = this . listener . updateRealtimeSubscription ( ) ;
this . subsTw = newSubsTw ;
targetCommand . tsCmd . startTs = this . subsTw . startTs ;
targetCommand . tsCmd . timeWindow = this . subsTw . aggregation . timeWindow ;
targetCommand . tsCmd . interval = this . subsTw . aggregation . interval ;
targetCommand . tsCmd . limit = this . subsTw . aggregation . limit ;
targetCommand . tsCmd . agg = this . subsTw . aggregation . type ;
targetCommand . tsCmd . fetchLatestPreviousPoint = this . subsTw . aggregation . stateData ;
this . dataAggregators . forEach ( ( dataAggregator ) = > {
dataAggregator . reset ( newSubsTw ) ;
} ) ;
if ( this . entityDataSubscriptionOptions . type === widgetType . timeseries && this . tsFields . length ) {
targetCommand . tsCmd . startTs = this . subsTw . startTs ;
targetCommand . tsCmd . timeWindow = this . subsTw . aggregation . timeWindow ;
targetCommand . tsCmd . interval = this . subsTw . aggregation . interval ;
targetCommand . tsCmd . limit = this . subsTw . aggregation . limit ;
targetCommand . tsCmd . agg = this . subsTw . aggregation . type ;
targetCommand . tsCmd . fetchLatestPreviousPoint = this . subsTw . aggregation . stateData ;
this . dataAggregators . forEach ( ( dataAggregator ) = > {
dataAggregator . reset ( newSubsTw ) ;
} ) ;
}
if ( this . aggTsValues . length > 0 && ! this . isFloatingTimewindow ) {
targetCommand . aggTsCmd . startTs = this . subsTw . startTs ;
targetCommand . aggTsCmd . timeWindow = this . subsTw . aggregation . timeWindow ;
this . tsLatestDataAggregators . forEach ( ( dataAggregator ) = > {
dataAggregator . reset ( newSubsTw ) ;
} ) ;
}
}
if ( this . entityDataSubscriptionOptions . type === widgetType . timeseries ) {
this . subscriber . setTsOffset ( this . subsTw . tsOffset ) ;
@ -359,7 +439,7 @@ export class EntityDataSubscription {
entityType : null
} ;
const countKey = this . entityDataSubscriptionOptions . dataKeys [ 0 ] ;
const countKey = this . dataKeysList [ 0 ] ;
let dataReceived = false ;
@ -422,14 +502,9 @@ export class EntityDataSubscription {
if ( this . entityDataSubscriptionOptions . isPaginatedDataSubscription ) {
return ;
}
this . subsTw = this . entityDataSubscriptionOptions . subscriptionTimewindow ;
this . latestTsOffset = this . entityDataSubscriptionOptions . latestTsOffset ;
this . history = this . entityDataSubscriptionOptions . subscriptionTimewindow &&
isObject ( this . entityDataSubscriptionOptions . subscriptionTimewindow . fixedWindow ) ;
this . realtime = this . entityDataSubscriptionOptions . subscriptionTimewindow &&
isDefinedAndNotNull ( this . entityDataSubscriptionOptions . subscriptionTimewindow . realtimeWindowMs ) ;
this . prepareSubscriptionTimewindow ( ) ;
this . prepareData ( ) ;
this . prepareData ( true ) ;
if ( this . datasourceType === DatasourceType . entity ) {
this . subsCommand = new EntityDataCmd ( ) ;
@ -461,7 +536,19 @@ export class EntityDataSubscription {
this . started = true ;
}
private prepareSubscriptionTimewindow() {
this . subsTw = this . entityDataSubscriptionOptions . subscriptionTimewindow ;
this . latestTsOffset = this . entityDataSubscriptionOptions . latestTsOffset ;
this . history = this . entityDataSubscriptionOptions . subscriptionTimewindow &&
isObject ( this . entityDataSubscriptionOptions . subscriptionTimewindow . fixedWindow ) ;
this . realtime = this . entityDataSubscriptionOptions . subscriptionTimewindow &&
isDefinedAndNotNull ( this . entityDataSubscriptionOptions . subscriptionTimewindow . realtimeWindowMs ) ;
this . isFloatingTimewindow = this . entityDataSubscriptionOptions . subscriptionTimewindow &&
! this . entityDataSubscriptionOptions . subscriptionTimewindow . quickInterval && ! this . history ;
}
private prepareSubscriptionCommands ( cmd : EntityDataCmd ) {
let latestValuesKeys : EntityKey [ ] = [ ] ;
if ( this . entityDataSubscriptionOptions . type === widgetType . timeseries ) {
if ( this . tsFields . length > 0 ) {
if ( this . history ) {
@ -486,17 +573,40 @@ export class EntityDataSubscription {
} ;
}
}
if ( this . latestValues . length > 0 ) {
cmd . latestCmd = {
keys : this.latestValues
} ;
}
latestValuesKeys = this . latestValues ;
} else if ( this . entityDataSubscriptionOptions . type === widgetType . latest ) {
if ( this . latestValues . length > 0 ) {
cmd . latestCmd = {
keys : this.latestValues
} ;
latestValuesKeys = this . latestValues ;
}
if ( this . history && ( this . aggTsValues . length > 0 || this . aggTsComparisonValues . length > 0 ) ) {
for ( const aggTsComparison of this . aggTsComparisonValues ) {
const subscriptionDataKey = this . dataKeyByIndex ( aggTsComparison . id ) ;
const timewindowForComparison =
createTimewindowForComparison ( this . subsTw , subscriptionDataKey . timeForComparison ,
subscriptionDataKey . comparisonCustomIntervalValue ) ;
aggTsComparison . previousStartTs = timewindowForComparison . fixedWindow . startTimeMs ;
aggTsComparison . previousEndTs = timewindowForComparison . fixedWindow . endTimeMs ;
}
cmd . aggHistoryCmd = {
keys : [ . . . this . aggTsValues , . . . this . aggTsComparisonValues ] ,
startTs : this.subsTw.fixedWindow.startTimeMs ,
endTs : this.subsTw.fixedWindow.endTimeMs
} ;
} else if ( ! this . isFloatingTimewindow && this . aggTsValues . length > 0 ) {
cmd . aggTsCmd = {
keys : this.aggTsValues ,
startTs : this.subsTw.startTs ,
timeWindow : this.subsTw.aggregation.timeWindow
} ;
if ( latestValuesKeys . length > 0 ) {
const tsKeys = this . aggTsValues . map ( key = > key . key ) ;
latestValuesKeys = latestValuesKeys . filter ( latestKey = > latestKey . type !== EntityKeyType . TIME_SERIES
|| ! tsKeys . includes ( latestKey . key ) ) ;
}
}
if ( latestValuesKeys . length > 0 ) {
cmd . latestCmd = {
keys : latestValuesKeys
} ;
}
}
@ -510,7 +620,7 @@ export class EntityDataSubscription {
this . generateData ( true ) ;
}
private prepareData() {
private prepareData ( isUpdate : boolean ) {
if ( this . timeseriesTimer ) {
clearTimeout ( this . timeseriesTimer ) ;
this . timeseriesTimer = null ;
@ -526,37 +636,73 @@ export class EntityDataSubscription {
} ) ;
}
this . dataAggregators = [ ] ;
if ( this . tsLatestDataAggregators ) {
this . tsLatestDataAggregators . forEach ( ( aggregator ) = > {
aggregator . destroy ( ) ;
} ) ;
}
this . tsLatestDataAggregators = [ ] ;
this . resetData ( ) ;
if ( this . entityDataSubscriptionOptions . type === widgetType . timeseries ) {
let tsKeyNames = [ ] ;
let tsKeyIds : number [ ] ;
if ( this . datasourceType === DatasourceType . function ) {
for ( const key of Object . keys ( this . dataKeys ) ) {
const dataKeysList = this . dataKeys [ key ] as Array < SubscriptionDataKey > ;
dataKeysList . forEach ( ( subscriptionDataKey ) = > {
if ( ! subscriptionDataKey . latest ) {
tsKeyNames . push ( ` ${ subscriptionDataKey . name } _ ${ subscriptionDataKey . index } ` ) ;
}
} ) ;
}
tsKeyIds = this . dataKeysList . filter ( key = > ! key . latest ) . map ( key = > key . index ) ;
} else {
tsKeyNames = this . tsFields ? this . tsFields . map ( field = > field . key ) : [ ] ;
tsKeyIds = this . dataKeysList .
filter ( dataKey = > dataKey . type === DataKeyType . timeseries &&
( ! dataKey . aggregationType || dataKey . aggregationType === AggregationType . NONE ) && ! dataKey . latest ) . map (
dataKey = > dataKey . index
) ;
}
for ( let dataIndex = 0 ; dataIndex < this . pageData . data . length ; dataIndex ++ ) {
if ( tsKeyNames . length ) {
if ( this . datasourceType === DatasourceType . function ) {
this . dataAggregators [ dataIndex ] = this . createRealtimeDataAggregator ( this . subsTw , tsKeyNames ,
DataKeyType . function , dataIndex , this . notifyListener . bind ( this ) ) ;
} else {
this . dataAggregators [ dataIndex ] = this . createRealtimeDataAggregator ( this . subsTw , tsKeyNames ,
DataKeyType . timeseries , dataIndex , this . notifyListener . bind ( this ) ) ;
}
const aggKeys : AggKey [ ] = tsKeyIds . map ( key = > ( { id : key , key : key + '' , agg : this.subsTw.aggregation.type } ) ) ;
if ( aggKeys . length ) {
for ( let dataIndex = 0 ; dataIndex < this . pageData . data . length ; dataIndex ++ ) {
this . dataAggregators [ dataIndex ] = this . createRealtimeDataAggregator ( this . subsTw , aggKeys ,
false , dataIndex , this . notifyListener . bind ( this ) ) ;
}
}
}
if ( this . aggTsValues && this . aggTsValues . length ) {
if ( ! this . isFloatingTimewindow ) {
const aggLatestTimewindow = deepClone ( this . subsTw ) ;
aggLatestTimewindow . aggregation . stateData = false ;
aggLatestTimewindow . aggregation . interval = aggLatestTimewindow . aggregation . timeWindow ;
for ( let dataIndex = 0 ; dataIndex < this . pageData . data . length ; dataIndex ++ ) {
this . tsLatestDataAggregators [ dataIndex ] = this . createRealtimeDataAggregator ( aggLatestTimewindow , this . aggTsValues ,
true , dataIndex , this . notifyListener . bind ( this ) ) ;
}
} else {
this . reportNotSupported ( this . aggTsValues , isUpdate ) ;
}
}
if ( ! this . history && this . aggTsComparisonValues && this . aggTsComparisonValues . length ) {
this . reportNotSupported ( this . aggTsComparisonValues , isUpdate ) ;
}
}
private reportNotSupported ( keys : AggKey [ ] , isUpdate : boolean ) {
const indexedData : IndexedSubscriptionData = [ ] ;
for ( const key of keys ) {
indexedData [ key . id ] = [ [ 0 , 'Not supported!' ] ] ;
}
for ( let dataIndex = 0 ; dataIndex < this . pageData . data . length ; dataIndex ++ ) {
this . onIndexedData ( indexedData , dataIndex , true ,
this . entityDataSubscriptionOptions . type === widgetType . timeseries ,
( data , dataIndex1 , dataKeyIndex , detectChanges , isLatest ) = > {
if ( ! this . data [ dataIndex1 ] ) {
this . data [ dataIndex1 ] = [ ] ;
}
this . data [ dataIndex1 ] [ dataKeyIndex ] = data ;
if ( isUpdate ) {
this . notifyListener ( data , dataIndex1 , dataKeyIndex , detectChanges , isLatest ) ;
}
} ) ;
}
}
private resetData() {
this . data = [ ] ;
this . datasourceData = [ ] ;
this . entityIdToDataIndex = { } ;
for ( let dataIndex = 0 ; dataIndex < this . pageData . data . length ; dataIndex ++ ) {
@ -609,19 +755,18 @@ export class EntityDataSubscription {
this . pageData = pageData ;
if ( this . entityDataSubscriptionOptions . isPaginatedDataSubscription ) {
this . prepareData ( ) ;
this . prepareData ( false ) ;
} else if ( isInitialData ) {
this . resetData ( ) ;
}
const data : Array < Array < DataSetHolder > > = [ ] ;
for ( let dataIndex = 0 ; dataIndex < pageData . data . length ; dataIndex ++ ) {
const entityData = pageData . data [ dataIndex ] ;
this . processEntityData ( entityData , dataIndex , false ,
( data1 , dataIndex1 , dataKeyIndex ) = > {
if ( ! data [ dataIndex1 ] ) {
data [ dataIndex1 ] = [ ] ;
if ( ! this . data [ dataIndex1 ] ) {
this . data [ dataIndex1 ] = [ ] ;
}
data [ dataIndex1 ] [ dataKeyIndex ] = data1 ;
this . data [ dataIndex1 ] [ dataKeyIndex ] = data1 ;
}
) ;
}
@ -630,7 +775,7 @@ export class EntityDataSubscription {
this . entityDataResolveSubject . next (
{
pageData ,
data ,
data : this.data ,
datasourceIndex : this.listener.configDatasourceIndex ,
pageLink : this.entityDataSubscriptionOptions.pageLink
}
@ -638,7 +783,7 @@ export class EntityDataSubscription {
this . entityDataResolveSubject . complete ( ) ;
} else {
if ( isInitialData || this . entityDataSubscriptionOptions . isPaginatedDataSubscription ) {
this . listener . dataLoaded ( pageData , data ,
this . listener . dataLoaded ( pageData , this . data ,
this . listener . configDatasourceIndex , this . entityDataSubscriptionOptions . pageLink ) ;
}
if ( this . entityDataSubscriptionOptions . isPaginatedDataSubscription && isInitialData ) {
@ -648,7 +793,7 @@ export class EntityDataSubscription {
this . entityDataResolveSubject . next (
{
pageData ,
data ,
data : this.data ,
datasourceIndex : this.listener.configDatasourceIndex ,
pageLink : this.entityDataSubscriptionOptions.pageLink
}
@ -675,27 +820,85 @@ export class EntityDataSubscription {
private processEntityData ( entityData : EntityData , dataIndex : number , isUpdate : boolean ,
dataUpdatedCb : DataUpdatedCb ) {
if ( ( this . entityDataSubscriptionOptions . type === widgetType . latest ||
this . entityDataSubscriptionOptions . type === widgetType . timeseries ) && entityData . latest ) {
for ( const type of Object . keys ( entityData . latest ) ) {
const subscriptionData = this . toSubscriptionData ( entityData . latest [ type ] , false ) ;
const dataKeyType = entityKeyTypeToDataKeyType ( EntityKeyType [ type ] ) ;
this . onData ( subscriptionData , dataKeyType , dataIndex , true ,
this . entityDataSubscriptionOptions . type === widgetType . timeseries , dataUpdatedCb ) ;
if ( this . entityDataSubscriptionOptions . type === widgetType . latest ||
this . entityDataSubscriptionOptions . type === widgetType . timeseries ) {
if ( entityData . aggLatest ) {
const aggData : IndexedSubscriptionData = [ ] ;
for ( const idStr of Object . keys ( entityData . aggLatest ) ) {
const id = Number ( idStr ) ;
const dataKey = this . dataKeyByIndex ( id ) ;
const aggLatestData = entityData . aggLatest [ id ] ;
if ( dataKey . comparisonEnabled ) {
const keyData = EntityDataSubscription . calculateComparisonValue ( dataKey , aggLatestData ) ;
this . onKeyData ( keyData , dataKey . name , id , dataKey . type , dataIndex , true ,
this . entityDataSubscriptionOptions . type === widgetType . timeseries , true , dataUpdatedCb ) ;
} else {
aggData [ id ] = [ [ aggLatestData . current . ts , aggLatestData . current . value , aggLatestData . current . count ] ] ;
}
}
if ( Object . keys ( aggData ) . length > 0 && this . tsLatestDataAggregators && this . tsLatestDataAggregators [ dataIndex ] ) {
const dataAggregator = this . tsLatestDataAggregators [ dataIndex ] ;
let prevDataCb ;
if ( ! isUpdate ) {
prevDataCb = dataAggregator . updateOnDataCb ( ( data , detectChanges ) = > {
this . onIndexedData ( data , dataIndex , detectChanges ,
this . entityDataSubscriptionOptions . type === widgetType . timeseries , dataUpdatedCb ) ;
} ) ;
}
dataAggregator . onData ( aggData , false , this . history , true ) ;
if ( prevDataCb ) {
dataAggregator . updateOnDataCb ( prevDataCb ) ;
}
}
}
if ( entityData . latest ) {
for ( const type of Object . keys ( entityData . latest ) ) {
const subscriptionData = this . toSubscriptionData ( entityData . latest [ type ] , false ) ;
const dataKeyType = entityKeyTypeToDataKeyType ( EntityKeyType [ type ] ) ;
if ( isUpdate && EntityKeyType [ type ] === EntityKeyType . TIME_SERIES ) {
const keys : string [ ] = Object . keys ( subscriptionData ) ;
const latestTsKeys = this . latestValues . filter ( key = > key . type === EntityKeyType . TIME_SERIES && keys . includes ( key . key ) ) ;
if ( latestTsKeys . length ) {
const latestTsSubsciptionData : SubscriptionData = { } ;
for ( const latestTsKey of latestTsKeys ) {
latestTsSubsciptionData [ latestTsKey . key ] = subscriptionData [ latestTsKey . key ] ;
}
this . onData ( latestTsSubsciptionData , dataKeyType , dataIndex , true ,
this . entityDataSubscriptionOptions . type === widgetType . timeseries , dataUpdatedCb ) ;
}
const aggTsKeys = this . aggTsValues . filter ( key = > keys . includes ( key . key ) ) ;
if ( ! this . history && aggTsKeys . length && this . tsLatestDataAggregators && this . tsLatestDataAggregators [ dataIndex ] ) {
const dataAggregator = this . tsLatestDataAggregators [ dataIndex ] ;
const indexedData : IndexedSubscriptionData = [ ] ;
for ( const aggKey of aggTsKeys ) {
indexedData [ aggKey . id ] = subscriptionData [ aggKey . key ] ;
}
dataAggregator . onData ( indexedData , true , false , true ) ;
}
} else {
this . onData ( subscriptionData , dataKeyType , dataIndex , true ,
this . entityDataSubscriptionOptions . type === widgetType . timeseries , dataUpdatedCb ) ;
}
}
}
}
if ( this . entityDataSubscriptionOptions . type === widgetType . timeseries && entityData . timeseries ) {
const subscriptionData = this . toSubscriptionData ( entityData . timeseries , true ) ;
if ( this . dataAggregators && this . dataAggregators [ dataIndex ] ) {
const dataAggregator = this . dataAggregators [ dataIndex ] ;
const keyNames = Object . keys ( subscriptionData ) ;
const dataKeys = this . timeseriesDataKeysByKeyNames ( keyNames ) ;
const indexedData : IndexedSubscriptionData = [ ] ;
for ( const dataKey of dataKeys ) {
indexedData [ dataKey . index ] = subscriptionData [ dataKey . name ] ;
}
let prevDataCb ;
if ( ! isUpdate ) {
prevDataCb = dataAggregator . updateOnDataCb ( ( data , detectChanges ) = > {
this . onData ( data , this . datasourceType === DatasourceType . function ?
DataKeyType.function : DataKeyType.timeseries , dataIndex , detectChanges , false , dataUpdatedCb ) ;
this . onIndexedData ( data , dataIndex , detectChanges , false , dataUpdatedCb ) ;
} ) ;
}
dataAggregator . onData ( { data : subscriptionData } , false , this . history , true ) ;
dataAggregator . onData ( indexed Data , false , this . history , true ) ;
if ( prevDataCb ) {
dataAggregator . updateOnDataCb ( prevDataCb ) ;
}
@ -707,92 +910,109 @@ export class EntityDataSubscription {
private onData ( sourceData : SubscriptionData , type : DataKeyType , dataIndex : number , detectChanges : boolean ,
isTsLatest : boolean , dataUpdatedCb : DataUpdatedCb ) {
for ( const keyName of Object . keys ( sourceData ) ) {
const keyData = sourceData [ keyName ] ;
const key = ` ${ keyName } _ ${ type } ${ isTsLatest ? '_latest' : '' } ` ;
const dataKeyList = this . dataKeys [ key ] as Array < SubscriptionDataKey > ;
for ( let keyIndex = 0 ; dataKeyList && keyIndex < dataKeyList . length ; keyIndex ++ ) {
const datasourceKey = ` ${ key } _ ${ keyIndex } ` ;
if ( this . datasourceData [ dataIndex ] [ datasourceKey ] . data ) {
const dataKey = dataKeyList [ keyIndex ] ;
const data : DataSet = [ ] ;
let prevSeries : [ number , any ] ;
let prevOrigSeries : [ number , any ] ;
let datasourceKeyData : DataSet ;
let datasourceOrigKeyData : DataSet ;
let update = false ;
if ( this . realtime && ! isTsLatest ) {
datasourceKeyData = [ ] ;
datasourceOrigKeyData = [ ] ;
} else {
datasourceKeyData = this . datasourceData [ dataIndex ] [ datasourceKey ] . data ;
datasourceOrigKeyData = this . datasourceOrigData [ dataIndex ] [ datasourceKey ] . data ;
}
if ( datasourceKeyData . length > 0 ) {
prevSeries = datasourceKeyData [ datasourceKeyData . length - 1 ] ;
prevOrigSeries = datasourceOrigKeyData [ datasourceOrigKeyData . length - 1 ] ;
} else {
prevSeries = [ 0 , 0 ] ;
prevOrigSeries = [ 0 , 0 ] ;
}
this . datasourceOrigData [ dataIndex ] [ datasourceKey ] . data = [ ] ;
if ( this . entityDataSubscriptionOptions . type === widgetType . timeseries && ! isTsLatest ) {
keyData . forEach ( ( keySeries ) = > {
let series = keySeries ;
const time = series [ 0 ] ;
this . datasourceOrigData [ dataIndex ] [ datasourceKey ] . data . push ( series ) ;
let value = this . convertValue ( series [ 1 ] ) ;
if ( dataKey . postFunc ) {
value = dataKey . postFunc ( time , value , prevSeries [ 1 ] , prevOrigSeries [ 0 ] , prevOrigSeries [ 1 ] ) ;
}
prevOrigSeries = series ;
series = [ time , value ] ;
data . push ( series ) ;
prevSeries = series ;
} ) ;
update = true ;
} else if ( this . entityDataSubscriptionOptions . type === widgetType . latest || isTsLatest ) {
if ( keyData . length > 0 ) {
let series = keyData [ 0 ] ;
const time = series [ 0 ] ;
this . datasourceOrigData [ dataIndex ] [ datasourceKey ] . data . push ( series ) ;
let value = this . convertValue ( series [ 1 ] ) ;
if ( dataKey . postFunc ) {
value = dataKey . postFunc ( time , value , prevSeries [ 1 ] , prevOrigSeries [ 0 ] , prevOrigSeries [ 1 ] ) ;
}
series = [ time , value ] ;
data . push ( series ) ;
}
update = true ;
}
if ( update ) {
this . datasourceData [ dataIndex ] [ datasourceKey ] . data = data ;
dataUpdatedCb ( this . datasourceData [ dataIndex ] [ datasourceKey ] , dataIndex , dataKey . index , detectChanges , isTsLatest ) ;
}
}
for ( const key of Object . keys ( sourceData ) ) {
const keyData = sourceData [ key ] ;
this . onKeyData ( keyData , key , 0 , type ,
dataIndex , detectChanges , isTsLatest , false , dataUpdatedCb ) ;
}
}
private onIndexedData ( sourceData : IndexedSubscriptionData , dataIndex : number , detectChanges : boolean ,
isTsLatest : boolean , dataUpdatedCb : DataUpdatedCb ) {
for ( const indexStr of Object . keys ( sourceData ) ) {
const id = Number ( indexStr ) ;
const dataKey = this . dataKeyByIndex ( id ) ;
const isAggLatest = dataKey . aggregationType && dataKey . aggregationType !== AggregationType . NONE ;
const keyData = sourceData [ id ] ;
let keyName = dataKey . name ;
if ( dataKey . type === DataKeyType . function ) {
keyName += ` _ ${ dataKey . index } ` ;
}
this . onKeyData ( keyData , keyName , id , dataKey . type ,
dataIndex , detectChanges , isTsLatest , isAggLatest , dataUpdatedCb ) ;
}
}
private convertValue ( val : string ) : any {
if ( val && isNumeric ( val ) && Number ( val ) . toString ( ) === val ) {
return Number ( val ) ;
private onKeyData ( keyData : [ number , any , number ? ] [ ] , keyName : string , id : number , type : DataKeyType ,
dataIndex : number , detectChanges : boolean ,
isTsLatest : boolean , isAggLatest : boolean , dataUpdatedCb : DataUpdatedCb ) {
const keyIdSuffix = isAggLatest ? ` _ ${ id } ` : '' ;
const key = ` ${ keyName } _ ${ type } ${ keyIdSuffix } ${ isTsLatest ? '_latest' : '' } ` ;
const dataKeyList = this . dataKeys [ key ] as Array < SubscriptionDataKey > ;
for ( let keyIndex = 0 ; dataKeyList && keyIndex < dataKeyList . length ; keyIndex ++ ) {
const datasourceKey = ` ${ key } _ ${ keyIndex } ` ;
if ( this . datasourceData [ dataIndex ] [ datasourceKey ] . data ) {
const dataKey = dataKeyList [ keyIndex ] ;
const data : DataSet = [ ] ;
let prevSeries : [ number , any ] ;
let prevOrigSeries : [ number , any ] ;
let datasourceKeyData : DataSet ;
let datasourceOrigKeyData : DataSet ;
let update = false ;
if ( this . realtime && ! isTsLatest ) {
datasourceKeyData = [ ] ;
datasourceOrigKeyData = [ ] ;
} else {
datasourceKeyData = this . datasourceData [ dataIndex ] [ datasourceKey ] . data ;
datasourceOrigKeyData = this . datasourceOrigData [ dataIndex ] [ datasourceKey ] . data ;
}
if ( datasourceKeyData . length > 0 ) {
prevSeries = datasourceKeyData [ datasourceKeyData . length - 1 ] ;
prevOrigSeries = datasourceOrigKeyData [ datasourceOrigKeyData . length - 1 ] ;
} else {
prevSeries = [ 0 , 0 ] ;
prevOrigSeries = [ 0 , 0 ] ;
}
this . datasourceOrigData [ dataIndex ] [ datasourceKey ] . data = [ ] ;
if ( this . entityDataSubscriptionOptions . type === widgetType . timeseries && ! isTsLatest ) {
keyData . forEach ( ( keySeries ) = > {
let series = keySeries ;
const time = series [ 0 ] ;
this . datasourceOrigData [ dataIndex ] [ datasourceKey ] . data . push ( [ series [ 0 ] , series [ 1 ] ] ) ;
let value = EntityDataSubscription . convertValue ( series [ 1 ] ) ;
if ( dataKey . postFunc ) {
value = dataKey . postFunc ( time , value , prevSeries [ 1 ] , prevOrigSeries [ 0 ] , prevOrigSeries [ 1 ] ) ;
}
prevOrigSeries = [ series [ 0 ] , series [ 1 ] ] ;
series = [ series [ 0 ] , value ] ;
data . push ( [ series [ 0 ] , series [ 1 ] ] ) ;
prevSeries = [ series [ 0 ] , series [ 1 ] ] ;
} ) ;
update = true ;
} else if ( this . entityDataSubscriptionOptions . type === widgetType . latest || isTsLatest ) {
if ( keyData . length > 0 ) {
let series = keyData [ 0 ] ;
const time = series [ 0 ] ;
this . datasourceOrigData [ dataIndex ] [ datasourceKey ] . data . push ( [ series [ 0 ] , series [ 1 ] ] ) ;
let value = EntityDataSubscription . convertValue ( series [ 1 ] ) ;
if ( dataKey . postFunc ) {
value = dataKey . postFunc ( time , value , prevSeries [ 1 ] , prevOrigSeries [ 0 ] , prevOrigSeries [ 1 ] ) ;
}
series = [ time , value ] ;
data . push ( [ series [ 0 ] , series [ 1 ] ] ) ;
}
update = true ;
}
if ( update ) {
this . datasourceData [ dataIndex ] [ datasourceKey ] . data = data ;
dataUpdatedCb ( this . datasourceData [ dataIndex ] [ datasourceKey ] , dataIndex , dataKey . index , detectChanges , isTsLatest ) ;
}
}
}
return val ;
}
private toSubscriptionData ( sourceData : { [ key : string ] : TsValue | TsValue [ ] } , isTs : boolean ) : SubscriptionData {
const subsData : SubscriptionData = { } ;
for ( const keyName of Object . keys ( sourceData ) ) {
const values = sourceData [ keyName ] ;
const dataSet : [ number , any ] [ ] = [ ] ;
const dataSet : [ number , any , number ? ] [ ] = [ ] ;
if ( isTs ) {
( values as TsValue [ ] ) . forEach ( ( keySeries ) = > {
dataSet . push ( [ keySeries . ts , keySeries . value ] ) ;
dataSet . push ( [ keySeries . ts , keySeries . value , keySeries . count ] ) ;
} ) ;
} else {
const tsValue = values as TsValue ;
dataSet . push ( [ tsValue . ts , tsValue . value ] ) ;
dataSet . push ( [ tsValue . ts , tsValue . value , tsValue . count ] ) ;
}
subsData [ keyName ] = dataSet ;
}
@ -800,21 +1020,37 @@ export class EntityDataSubscription {
}
private createRealtimeDataAggregator ( subsTw : SubscriptionTimewindow ,
tsKeyName s : Array < string > ,
dataKeyType : DataKeyType ,
tsKeys : Array < AggKey > ,
isLatestDataAgg : boolean ,
dataIndex : number ,
dataUpdatedCb : DataUpdatedCb ) : DataAggregator {
return new DataAggregator (
( data , detectChanges ) = > {
this . onData ( data , dataKeyType , dataIndex , detectChanges , false , dataUpdatedCb ) ;
this . onIndexedData ( data , dataIndex , detectChanges ,
isLatestDataAgg && ( this . entityDataSubscriptionOptions . type === widgetType . timeseries ) , dataUpdatedCb ) ;
} ,
tsKeyNames ,
tsKeys ,
isLatestDataAgg ,
subsTw ,
this . utils ,
this . entityDataSubscriptionOptions . ignoreDataUpdateOnIntervalTick
this . entityDataSubscriptionOptions . ignoreDataUpdateOnIntervalTick || isLatestDataAgg
) ;
}
private dataKeyByIndex ( index : number ) : SubscriptionDataKey {
return this . dataKeysList . find ( key = > key . index === index ) ;
}
private timeseriesDataKeysByKeyNames ( keyNames : string [ ] ) : SubscriptionDataKey [ ] {
const result : SubscriptionDataKey [ ] = [ ] ;
for ( const keyName of keyNames ) {
const key = ` ${ keyName } _ ${ DataKeyType . timeseries } ` ;
const dataKeyList = this . dataKeys [ key ] as Array < SubscriptionDataKey > ;
result . push ( . . . dataKeyList ) ;
}
return result ;
}
private generateSeries ( dataKey : SubscriptionDataKey , startTime : number , endTime : number ) : [ number , any ] [ ] {
const data : [ number , any ] [ ] = [ ] ;
let prevSeries : [ number , any ] ;
@ -893,9 +1129,7 @@ export class EntityDataSubscription {
let startTime : number ;
let endTime : number ;
let delta : number ;
const generatedData : SubscriptionDataHolder = {
data : { }
} ;
const generatedData : IndexedSubscriptionData = [ ] ;
if ( ! this . history ) {
delta = Math . floor ( this . tickElapsed / this . frequency ) ;
}
@ -928,7 +1162,7 @@ export class EntityDataSubscription {
endTime = Math . min ( currentTime , endTime ) ;
}
}
generatedData . data [ ` ${ dataKey . name } _ ${ dataKey . index } ` ] = this . generateSeries ( dataKey , startTime , endTime ) ;
generatedData [ dataKey . index ] = this . generateSeries ( dataKey , startTime , endTime ) ;
}
if ( this . dataAggregators && this . dataAggregators . length ) {
this . dataAggregators [ 0 ] . onData ( generatedData , true , this . history , detectChanges ) ;