@ -17,22 +17,21 @@ package org.thingsboard.server.dao.alarm;
import com.google.common.base.Function ;
import com.google.common.util.concurrent.AsyncFunction ;
import com.google.common.util.concurrent.Futures ;
import com.google.common.util.concurrent.ListenableFuture ;
import lombok.extern.slf4j.Slf4j ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.stereotype.Service ;
import org.springframework.util.StringUtils ;
import org.thingsboard.server.common.data.alarm.Alarm ;
import org.thingsboard.server.common.data.alarm.AlarmId ;
import org.thingsboard.server.common.data.alarm.AlarmQuery ;
import org.thingsboard.server.common.data.alarm.AlarmStatus ;
import org.thingsboard.server.common.data.alarm.* ;
import org.thingsboard.server.common.data.id.EntityId ;
import org.thingsboard.server.common.data.page.TimePageData ;
import org.thingsboard.server.common.data.relation.EntityRelation ;
import org.thingsboard.server.common.data.relation.RelationTypeGroup ;
import org.thingsboard.server.dao.entity.AbstractEntityService ;
import org.thingsboard.server.dao.entity.BaseEntityService ;
import org.thingsboard.server.dao.entity.EntityService ;
import org.thingsboard.server.dao.exception.DataValidationException ;
import org.thingsboard.server.dao.model.* ;
import org.thingsboard.server.dao.relation.EntityRelationsQuery ;
@ -45,6 +44,7 @@ import org.thingsboard.server.dao.tenant.TenantDao;
import javax.annotation.Nullable ;
import javax.annotation.PostConstruct ;
import javax.annotation.PreDestroy ;
import java.util.ArrayList ;
import java.util.List ;
import java.util.concurrent.ExecutionException ;
import java.util.concurrent.ExecutorService ;
@ -59,7 +59,6 @@ import static org.thingsboard.server.dao.service.Validator.*;
public class BaseAlarmService extends AbstractEntityService implements AlarmService {
public static final String ALARM_RELATION_PREFIX = "ALARM_" ;
public static final String ALARM_RELATION = "ALARM_ANY" ;
@Autowired
private AlarmDao alarmDao ;
@ -70,6 +69,9 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
@Autowired
private RelationService relationService ;
@Autowired
private EntityService entityService ;
protected ExecutorService readResultsProcessingExecutor ;
@PostConstruct
@ -116,11 +118,9 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
query . setParameters ( new RelationsSearchParameters ( saved . getOriginator ( ) , EntitySearchDirection . TO , Integer . MAX_VALUE ) ) ;
List < EntityId > parentEntities = relationService . findByQuery ( query ) . get ( ) . stream ( ) . map ( r - > r . getFrom ( ) ) . collect ( Collectors . toList ( ) ) ;
for ( EntityId parentId : parentEntities ) {
createRelation ( new EntityRelation ( parentId , saved . getId ( ) , ALARM_RELATION , RelationTypeGroup . ALARM ) ) ;
createRelation ( new EntityRelation ( parentId , saved . getId ( ) , ALARM_RELATION_PREFIX + saved . getStatus ( ) . name ( ) , RelationTypeGroup . ALARM ) ) ;
createAlarmRelation ( parentId , saved . getId ( ) , saved . getStatus ( ) , true ) ;
}
createRelation ( new EntityRelation ( alarm . getOriginator ( ) , saved . getId ( ) , ALARM_RELATION , RelationTypeGroup . ALARM ) ) ;
createRelation ( new EntityRelation ( alarm . getOriginator ( ) , saved . getId ( ) , ALARM_RELATION_PREFIX + saved . getStatus ( ) . name ( ) , RelationTypeGroup . ALARM ) ) ;
createAlarmRelation ( alarm . getOriginator ( ) , saved . getId ( ) , saved . getStatus ( ) , true ) ;
return saved ;
}
@ -199,12 +199,27 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
}
@Override
public ListenableFuture < TimePageData < Alarm > > findAlarms ( AlarmQuery query ) {
ListenableFuture < List < Alarm > > alarms = alarmDao . findAlarms ( query ) ;
return Futures . transform ( alarms , new Function < List < Alarm > , TimePageData < Alarm > > ( ) {
public ListenableFuture < TimePageData < AlarmInfo > > findAlarms ( AlarmQuery query ) {
ListenableFuture < List < AlarmInfo > > alarms = alarmDao . findAlarms ( query ) ;
if ( query . getFetchOriginator ( ) ! = null & & query . getFetchOriginator ( ) . booleanValue ( ) ) {
alarms = Futures . transform ( alarms , ( AsyncFunction < List < AlarmInfo > , List < AlarmInfo > > ) input - > {
List < ListenableFuture < AlarmInfo > > alarmFutures = new ArrayList < > ( input . size ( ) ) ;
for ( AlarmInfo alarmInfo : input ) {
alarmFutures . add ( Futures . transform (
entityService . fetchEntityNameAsync ( alarmInfo . getOriginator ( ) ) , ( Function < String , AlarmInfo > )
originatorName - > {
alarmInfo . setOriginatorName ( originatorName ) ;
return alarmInfo ;
}
) ) ;
}
return Futures . successfulAsList ( alarmFutures ) ;
} ) ;
}
return Futures . transform ( alarms , new Function < List < AlarmInfo > , TimePageData < AlarmInfo > > ( ) {
@Nullable
@Override
public TimePageData < Alarm > apply ( @Nullable List < Alarm > alarms ) {
public TimePageData < AlarmInfo > apply ( @Nullable List < AlarmInfo > alarms ) {
return new TimePageData < > ( alarms , query . getPageLink ( ) ) ;
}
} ) ;
@ -245,17 +260,45 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
query . setParameters ( new RelationsSearchParameters ( alarm . getOriginator ( ) , EntitySearchDirection . TO , Integer . MAX_VALUE ) ) ;
List < EntityId > parentEntities = relationService . findByQuery ( query ) . get ( ) . stream ( ) . map ( r - > r . getFrom ( ) ) . collect ( Collectors . toList ( ) ) ;
for ( EntityId parentId : parentEntities ) {
deleteRelation ( new EntityRelation ( parentId , alarm . getId ( ) , ALARM_RELATION_PREFIX + oldStatus . name ( ) , RelationTypeGroup . ALARM ) ) ;
createRelation ( new EntityRelation ( parentId , alarm . getId ( ) , ALARM_RELATION_PREFIX + newStatus . name ( ) , RelationTypeGroup . ALARM ) ) ;
}
deleteRelation ( new EntityRelation ( alarm . getOriginator ( ) , alarm . getId ( ) , ALARM_RELATION_PREFIX + oldStatus . name ( ) , RelationTypeGroup . ALARM ) ) ;
createRelation ( new EntityRelation ( alarm . getOriginator ( ) , alarm . getId ( ) , ALARM_RELATION_PREFIX + newStatus . name ( ) , RelationTypeGroup . ALARM ) ) ;
updateAlarmRelation ( parentId , alarm . getId ( ) , oldStatus , newStatus ) ;
}
updateAlarmRelation ( alarm . getOriginator ( ) , alarm . getId ( ) , oldStatus , newStatus ) ;
} catch ( ExecutionException | InterruptedException e ) {
log . warn ( "[{}] Failed to update relations. Old status: [{}], New status: [{}]" , alarm . getId ( ) , oldStatus , newStatus ) ;
throw new RuntimeException ( e ) ;
}
}
private void createAlarmRelation ( EntityId entityId , EntityId alarmId , AlarmStatus status , boolean createAnyRelation ) {
try {
if ( createAnyRelation ) {
createRelation ( new EntityRelation ( entityId , alarmId , ALARM_RELATION_PREFIX + AlarmSearchStatus . ANY . name ( ) , RelationTypeGroup . ALARM ) ) ;
}
createRelation ( new EntityRelation ( entityId , alarmId , ALARM_RELATION_PREFIX + status . name ( ) , RelationTypeGroup . ALARM ) ) ;
createRelation ( new EntityRelation ( entityId , alarmId , ALARM_RELATION_PREFIX + status . getClearSearchStatus ( ) . name ( ) , RelationTypeGroup . ALARM ) ) ;
createRelation ( new EntityRelation ( entityId , alarmId , ALARM_RELATION_PREFIX + status . getAckSearchStatus ( ) . name ( ) , RelationTypeGroup . ALARM ) ) ;
} catch ( ExecutionException | InterruptedException e ) {
log . warn ( "[{}] Failed to create relation. Status: [{}]" , alarmId , status ) ;
throw new RuntimeException ( e ) ;
}
}
private void deleteAlarmRelation ( EntityId entityId , EntityId alarmId , AlarmStatus status ) {
try {
deleteRelation ( new EntityRelation ( entityId , alarmId , ALARM_RELATION_PREFIX + status . name ( ) , RelationTypeGroup . ALARM ) ) ;
deleteRelation ( new EntityRelation ( entityId , alarmId , ALARM_RELATION_PREFIX + status . getClearSearchStatus ( ) . name ( ) , RelationTypeGroup . ALARM ) ) ;
deleteRelation ( new EntityRelation ( entityId , alarmId , ALARM_RELATION_PREFIX + status . getAckSearchStatus ( ) . name ( ) , RelationTypeGroup . ALARM ) ) ;
} catch ( ExecutionException | InterruptedException e ) {
log . warn ( "[{}] Failed to delete relation. Status: [{}]" , alarmId , status ) ;
throw new RuntimeException ( e ) ;
}
}
private void updateAlarmRelation ( EntityId entityId , EntityId alarmId , AlarmStatus oldStatus , AlarmStatus newStatus ) {
deleteAlarmRelation ( entityId , alarmId , oldStatus ) ;
createAlarmRelation ( entityId , alarmId , newStatus , false ) ;
}
private < T > ListenableFuture < T > getAndUpdate ( AlarmId alarmId , Function < Alarm , T > function ) {
validateId ( alarmId , "Alarm id should be specified!" ) ;
ListenableFuture < Alarm > entity = alarmDao . findAlarmByIdAsync ( alarmId . getId ( ) ) ;