@ -16,6 +16,7 @@
package org.thingsboard.rule.engine.profile ;
import com.fasterxml.jackson.databind.JsonNode ;
import com.fasterxml.jackson.databind.node.ObjectNode ;
import lombok.Data ;
import lombok.extern.slf4j.Slf4j ;
import org.thingsboard.rule.engine.action.TbAlarmResult ;
@ -29,6 +30,7 @@ import org.thingsboard.server.common.data.alarm.AlarmStatus;
import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm ;
import org.thingsboard.server.common.data.id.EntityId ;
import org.thingsboard.server.common.data.query.EntityKeyType ;
import org.thingsboard.server.common.data.query.KeyFilter ;
import org.thingsboard.server.common.msg.TbMsg ;
import org.thingsboard.server.common.msg.TbMsgMetaData ;
import org.thingsboard.server.common.msg.queue.ServiceQueue ;
@ -53,6 +55,7 @@ class AlarmState {
private volatile boolean initialFetchDone ;
private volatile TbMsgMetaData lastMsgMetaData ;
private volatile String lastMsgQueueName ;
private volatile DataSnapshot dataSnapshot ;
AlarmState ( ProfileState deviceProfile , EntityId originator , DeviceProfileAlarm alarmDefinition , PersistedAlarmState alarmState ) {
this . deviceProfile = deviceProfile ;
@ -64,6 +67,7 @@ class AlarmState {
initCurrentAlarm ( ctx ) ;
lastMsgMetaData = msg . getMetaData ( ) ;
lastMsgQueueName = msg . getQueueName ( ) ;
this . dataSnapshot = data ;
return createOrClearAlarms ( ctx , data , update , AlarmRuleState : : eval ) ;
}
@ -74,7 +78,7 @@ class AlarmState {
public < T > boolean createOrClearAlarms ( TbContext ctx , T data , SnapshotUpdate update , BiFunction < AlarmRuleState , T , AlarmEvalResult > evalFunction ) {
boolean stateUpdate = false ;
AlarmSeverity resultSeverity = null ;
AlarmRuleState resultState = null ;
log . debug ( "[{}] processing update: {}" , alarmDefinition . getId ( ) , data ) ;
for ( AlarmRuleState state : createRulesSortedBySeverityDesc ) {
if ( ! validateUpdate ( update , state ) ) {
@ -84,18 +88,18 @@ class AlarmState {
AlarmEvalResult evalResult = evalFunction . apply ( state , data ) ;
stateUpdate | = state . checkUpdate ( ) ;
if ( AlarmEvalResult . TRUE . equals ( evalResult ) ) {
resultSeverity = state . getSeverity ( ) ;
resultState = state ;
break ;
} else if ( AlarmEvalResult . FALSE . equals ( evalResult ) ) {
state . clear ( ) ;
stateUpdate | = state . checkUpdate ( ) ;
stateUpdate = clearAlarmState ( stateUpdate , state ) ;
}
}
if ( resultSeverity ! = null ) {
TbAlarmResult result = calculateAlarmResult ( ctx , resultSeverity ) ;
if ( resultState ! = null ) {
TbAlarmResult result = calculateAlarmResult ( ctx , resultState ) ;
if ( result ! = null ) {
pushMsg ( ctx , result ) ;
}
stateUpdate = clearAlarmState ( stateUpdate , clearState ) ;
} else if ( currentAlarm ! = null & & clearState ! = null ) {
if ( ! validateUpdate ( update , clearState ) ) {
log . debug ( "[{}] Update is not valid for current clear state" , alarmDefinition . getId ( ) ) ;
@ -103,23 +107,26 @@ class AlarmState {
}
AlarmEvalResult evalResult = evalFunction . apply ( clearState , data ) ;
if ( AlarmEvalResult . TRUE . equals ( evalResult ) ) {
clearState . clear ( ) ;
stateUpdate | = clearState . checkUpdate ( ) ;
stateUpdate = clearAlarmState ( stateUpdate , clearState ) ;
for ( AlarmRuleState state : createRulesSortedBySeverityDesc ) {
state . clear ( ) ;
stateUpdate | = state . checkUpdate ( ) ;
stateUpdate = clearAlarmState ( stateUpdate , state ) ;
}
ctx . getAlarmService ( ) . clearAlarm ( ctx . getTenantId ( ) , currentAlarm . getId ( ) , JacksonUtil . OBJECT_MAPPER . createObjectNode ( ) , System . currentTimeMillis ( ) ) ;
pushMsg ( ctx , new TbAlarmResult ( false , false , true , currentAlarm ) ) ;
currentAlarm = null ;
} else if ( AlarmEvalResult . FALSE . equals ( evalResult ) ) {
clearState . clear ( ) ;
stateUpdate | = clearState . checkUpdate ( ) ;
stateUpdate = clearAlarmState ( stateUpdate , clearState ) ;
}
}
return stateUpdate ;
}
public boolean clearAlarmState ( boolean stateUpdate , AlarmRuleState state ) {
state . clear ( ) ;
stateUpdate | = state . checkUpdate ( ) ;
return stateUpdate ;
}
public boolean validateUpdate ( SnapshotUpdate update , AlarmRuleState state ) {
if ( update ! = null ) {
//Check that the update type and that keys match.
@ -187,7 +194,8 @@ class AlarmState {
}
}
private TbAlarmResult calculateAlarmResult ( TbContext ctx , AlarmSeverity severity ) {
private TbAlarmResult calculateAlarmResult ( TbContext ctx , AlarmRuleState ruleState ) {
AlarmSeverity severity = ruleState . getSeverity ( ) ;
if ( currentAlarm ! = null ) {
// TODO: In some extremely rare cases, we might miss the event of alarm clear (If one use in-mem queue and restarted the server) or (if one manipulated the rule chain).
// Maybe we should fetch alarm every time?
@ -213,7 +221,7 @@ class AlarmState {
currentAlarm . setSeverity ( severity ) ;
currentAlarm . setStartTs ( System . currentTimeMillis ( ) ) ;
currentAlarm . setEndTs ( currentAlarm . getStartTs ( ) ) ;
currentAlarm . setDetails ( JacksonUtil . OBJECT_MAPPER . createObjectNode ( ) ) ;
currentAlarm . setDetails ( createDetails ( ruleState ) ) ;
currentAlarm . setOriginator ( originator ) ;
currentAlarm . setTenantId ( ctx . getTenantId ( ) ) ;
currentAlarm . setPropagate ( alarmDefinition . isPropagate ( ) ) ;
@ -226,13 +234,50 @@ class AlarmState {
}
}
private JsonNode createDetails ( AlarmRuleState ruleState ) {
ObjectNode details = JacksonUtil . OBJECT_MAPPER . createObjectNode ( ) ;
String alarmDetails = ruleState . getAlarmRule ( ) . getAlarmDetails ( ) ;
if ( alarmDetails ! = null ) {
for ( KeyFilter keyFilter : ruleState . getAlarmRule ( ) . getCondition ( ) . getCondition ( ) ) {
EntityKeyValue entityKeyValue = dataSnapshot . getValue ( keyFilter . getKey ( ) ) ;
alarmDetails = alarmDetails . replaceAll ( String . format ( "\\$\\{%s}" , keyFilter . getKey ( ) . getKey ( ) ) , getValueAsString ( entityKeyValue ) ) ;
}
details . put ( "data" , alarmDetails ) ;
}
return details ;
}
private static String getValueAsString ( EntityKeyValue entityKeyValue ) {
Object result = null ;
switch ( entityKeyValue . getDataType ( ) ) {
case STRING :
result = entityKeyValue . getStrValue ( ) ;
break ;
case JSON :
result = entityKeyValue . getJsonValue ( ) ;
break ;
case LONG :
result = entityKeyValue . getLngValue ( ) ;
break ;
case DOUBLE :
result = entityKeyValue . getDblValue ( ) ;
break ;
case BOOLEAN :
result = entityKeyValue . getBoolValue ( ) ;
break ;
}
return String . valueOf ( result ) ;
}
public boolean processAlarmClear ( TbContext ctx , Alarm alarmNf ) {
boolean updated = false ;
if ( currentAlarm ! = null & & currentAlarm . getId ( ) . equals ( alarmNf . getId ( ) ) ) {
currentAlarm = null ;
for ( AlarmRuleState state : createRulesSortedBySeverityDesc ) {
state . clear ( ) ;
updated | = state . checkUpdate ( ) ;
updated = clearAlarmState ( updated , state ) ;
}
}
return updated ;