@ -24,6 +24,7 @@ import io.grpc.stub.StreamObserver;
import lombok.Data ;
import lombok.extern.slf4j.Slf4j ;
import org.checkerframework.checker.nullness.qual.Nullable ;
import org.springframework.data.util.Pair ;
import org.thingsboard.server.common.data.DataConstants ;
import org.thingsboard.server.common.data.EdgeUtils ;
import org.thingsboard.server.common.data.edge.Edge ;
@ -35,6 +36,8 @@ import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry ;
import org.thingsboard.server.common.data.page.PageData ;
import org.thingsboard.server.common.data.page.PageLink ;
import org.thingsboard.server.common.data.page.SortOrder ;
import org.thingsboard.server.common.data.page.TimePageLink ;
import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg ;
import org.thingsboard.server.gen.edge.v1.AttributesRequestMsg ;
import org.thingsboard.server.gen.edge.v1.ConnectRequestMsg ;
@ -68,17 +71,15 @@ import org.thingsboard.server.service.edge.rpc.fetch.GeneralEdgeEventFetcher;
import java.io.Closeable ;
import java.util.ArrayList ;
import java.util.Arrays ;
import java.util.Collections ;
import java.util.List ;
import java.util.Objects ;
import java.util.Optional ;
import java.util.UUID ;
import java.util.concurrent.ScheduledExecutorService ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.locks.ReentrantLock ;
import java.util.function.BiConsumer ;
import java.util.function.Consumer ;
import java.util.stream.Collectors ;
@Slf4j
@Data
@ -89,6 +90,7 @@ public final class EdgeGrpcSession implements Closeable {
private static final int MAX_DOWNLINK_ATTEMPTS = 10 ; // max number of attemps to send downlink message if edge connected
private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs" ;
private static final String QUEUE_START_SEQ_ID_ATTR_KEY = "queueStartSeqId" ;
private final UUID sessionId ;
private final BiConsumer < EdgeId , EdgeGrpcSession > sessionOpenListener ;
@ -103,6 +105,12 @@ public final class EdgeGrpcSession implements Closeable {
private boolean connected ;
private boolean syncCompleted ;
private Long newStartTs ;
private Long previousStartTs ;
private Long newStartSeqId ;
private Long previousStartSeqId ;
private Long seqIdEnd ;
private EdgeVersion edgeVersion ;
private int maxInboundMessageSize ;
@ -204,10 +212,10 @@ public final class EdgeGrpcSession implements Closeable {
EdgeEventFetcher next = cursor . getNext ( ) ;
log . info ( "[{}][{}] starting sync process, cursor current idx = {}, class = {}" ,
edge . getTenantId ( ) , edge . getId ( ) , cursor . getCurrentIdx ( ) , next . getClass ( ) . getSimpleName ( ) ) ;
ListenableFuture < UUID > uuidListenableF uture = startProcessingEdgeEvents ( next ) ;
Futures . addCallback ( uuidListenableF uture, new FutureCallback < > ( ) {
ListenableFuture < Pair < Long , Long > > f uture = startProcessingEdgeEvents ( next ) ;
Futures . addCallback ( f uture, new FutureCallback < > ( ) {
@Override
public void onSuccess ( @Nullable UUID result ) {
public void onSuccess ( @Nullable Pair < Long , Long > result ) {
doSync ( cursor ) ;
}
@ -307,36 +315,51 @@ public final class EdgeGrpcSession implements Closeable {
sendDownlinkMsg ( edgeConfigMsg ) ;
}
ListenableFuture < Void > processEdgeEvents ( ) throws Exception {
SettableFuture < Void > result = SettableFuture . create ( ) ;
ListenableFuture < Boolean > processEdgeEvents ( ) throws Exception {
SettableFuture < Boolean > result = SettableFuture . create ( ) ;
log . trace ( "[{}] starting processing edge events" , this . sessionId ) ;
if ( isConnected ( ) & & isSyncCompleted ( ) ) {
Long queueStartTs = getQueueStartTs ( ) . get ( ) ;
Pair < Long , Long > startTsAndSeqId = getQueueStartTsAndSeqId ( ) . get ( ) ;
this . previousStartTs = startTsAndSeqId . getFirst ( ) ;
this . previousStartSeqId = startTsAndSeqId . getSecond ( ) ;
GeneralEdgeEventFetcher fetcher = new GeneralEdgeEventFetcher (
queueStartTs ,
this . previousStartTs ,
this . previousStartSeqId ,
this . seqIdEnd ,
false ,
Integer . toUnsignedLong ( ctx . getEdgeEventStorageSettings ( ) . getMaxReadRecordsCount ( ) ) ,
ctx . getEdgeEventService ( ) ) ;
ListenableFuture < UUID > ifOffsetFuture = startProcessingEdgeEvents ( fetcher ) ;
Futures . addCallback ( ifOffsetFuture , new FutureCallback < > ( ) {
Futures . addCallback ( startProcessingEdgeEvents ( fetcher ) , new FutureCallback < > ( ) {
@Override
public void onSuccess ( @Nullable UUID ifOffset ) {
if ( ifOffset ! = null ) {
Long newStartTs = Uuids . unixTimestamp ( ifOffset ) ;
ListenableFuture < List < String > > updateFuture = updateQueueStartTs ( newStartTs ) ;
public void onSuccess ( @Nullable Pair < Long , Long > newStartTsAndSeqId ) {
if ( newStartTsAndSeqId ! = null ) {
ListenableFuture < List < String > > updateFuture = updateQueueStartTsAndSeqId ( newStartTsAndSeqId ) ;
Futures . addCallback ( updateFuture , new FutureCallback < > ( ) {
@Override
public void onSuccess ( @Nullable List < String > list ) {
log . debug ( "[{}] queue offset was updated [{}][{}]" , sessionId , ifOffset , newStartTs ) ;
result . set ( null ) ;
log . debug ( "[{}] queue offset was updated [{}]" , sessionId , newStartTsAndSeqId ) ;
if ( fetcher . isSeqIdNewCycleStarted ( ) ) {
seqIdEnd = fetcher . getSeqIdEnd ( ) ;
boolean newEventsAvailable = isNewEdgeEventsAvailable ( ) ;
result . set ( newEventsAvailable ) ;
} else {
seqIdEnd = null ;
boolean newEventsAvailable = isSeqIdStartedNewCycle ( ) ;
if ( ! newEventsAvailable ) {
newEventsAvailable = isNewEdgeEventsAvailable ( ) ;
}
result . set ( newEventsAvailable ) ;
}
}
@Override
public void onFailure ( Throwable t ) {
log . error ( "[{}] Failed to update queue offset [{}]" , sessionId , ifOffset , t ) ;
log . error ( "[{}] Failed to update queue offset [{}]" , sessionId , newStartTsAndSeqId , t ) ;
result . setException ( t ) ;
}
} , ctx . getGrpcCallbackExecutorService ( ) ) ;
} else {
log . trace ( "[{}] ifOffset is null. Skipping iteration without db update" , sessionId ) ;
log . trace ( "[{}] newStartTsAndSeqId is null. Skipping iteration without db update" , sessionId ) ;
result . set ( null ) ;
}
}
@ -354,14 +377,14 @@ public final class EdgeGrpcSession implements Closeable {
return result ;
}
private ListenableFuture < UUID > startProcessingEdgeEvents ( EdgeEventFetcher fetcher ) {
SettableFuture < UUID > result = SettableFuture . create ( ) ;
private ListenableFuture < Pair < Long , Long > > startProcessingEdgeEvents ( EdgeEventFetcher fetcher ) {
SettableFuture < Pair < Long , Long > > result = SettableFuture . create ( ) ;
PageLink pageLink = fetcher . getPageLink ( ctx . getEdgeEventStorageSettings ( ) . getMaxReadRecordsCount ( ) ) ;
processEdgeEvents ( fetcher , pageLink , result ) ;
return result ;
}
private void processEdgeEvents ( EdgeEventFetcher fetcher , PageLink pageLink , SettableFuture < UUID > result ) {
private void processEdgeEvents ( EdgeEventFetcher fetcher , PageLink pageLink , SettableFuture < Pair < Long , Long > > result ) {
try {
PageData < EdgeEvent > pageData = fetcher . fetchEdgeEvents ( edge . getTenantId ( ) , edge , pageLink ) ;
if ( isConnected ( ) & & ! pageData . getData ( ) . isEmpty ( ) ) {
@ -377,8 +400,15 @@ public final class EdgeGrpcSession implements Closeable {
if ( isConnected ( ) & & pageData . hasNext ( ) ) {
processEdgeEvents ( fetcher , pageLink . nextPageLink ( ) , result ) ;
} else {
UUID ifOffset = pageData . getData ( ) . get ( pageData . getData ( ) . size ( ) - 1 ) . getUuidId ( ) ;
result . set ( ifOffset ) ;
EdgeEvent latestEdgeEvent = pageData . getData ( ) . get ( pageData . getData ( ) . size ( ) - 1 ) ;
UUID idOffset = latestEdgeEvent . getUuidId ( ) ;
if ( idOffset ! = null ) {
Long newStartTs = Uuids . unixTimestamp ( idOffset ) ;
long newStartSeqId = latestEdgeEvent . getSeqId ( ) ;
result . set ( Pair . of ( newStartTs , newStartSeqId ) ) ;
} else {
result . set ( null ) ;
}
}
}
}
@ -461,69 +491,113 @@ public final class EdgeGrpcSession implements Closeable {
}
}
private DownlinkMsg convertToDownlinkMsg ( EdgeEvent edgeEvent ) {
log . trace ( "[{}][{}] converting edge event to downlink msg [{}]" , edge . getTenantId ( ) , this . sessionId , edgeEvent ) ;
DownlinkMsg downlinkMsg = null ;
try {
switch ( edgeEvent . getAction ( ) ) {
case UPDATED :
case ADDED :
case DELETED :
case ASSIGNED_TO_EDGE :
case UNASSIGNED_FROM_EDGE :
case ALARM_ACK :
case ALARM_CLEAR :
case CREDENTIALS_UPDATED :
case RELATION_ADD_OR_UPDATE :
case RELATION_DELETED :
case ASSIGNED_TO_CUSTOMER :
case UNASSIGNED_FROM_CUSTOMER :
case CREDENTIALS_REQUEST :
case RPC_CALL :
downlinkMsg = convertEntityEventToDownlink ( edgeEvent ) ;
log . trace ( "[{}][{}] entity message processed [{}]" , edgeEvent . getTenantId ( ) , this . sessionId , downlinkMsg ) ;
break ;
case ATTRIBUTES_UPDATED :
case POST_ATTRIBUTES :
case ATTRIBUTES_DELETED :
case TIMESERIES_UPDATED :
downlinkMsg = ctx . getTelemetryProcessor ( ) . convertTelemetryEventToDownlink ( edgeEvent ) ;
break ;
default :
log . warn ( "[{}][{}] Unsupported action type [{}]" , edge . getTenantId ( ) , this . sessionId , edgeEvent . getAction ( ) ) ;
private List < DownlinkMsg > convertToDownlinkMsgsPack ( List < EdgeEvent > edgeEvents ) {
List < DownlinkMsg > result = new ArrayList < > ( ) ;
for ( EdgeEvent edgeEvent : edgeEvents ) {
log . trace ( "[{}][{}] converting edge event to downlink msg [{}]" , edge . getTenantId ( ) , this . sessionId , edgeEvent ) ;
DownlinkMsg downlinkMsg = null ;
try {
switch ( edgeEvent . getAction ( ) ) {
case UPDATED :
case ADDED :
case DELETED :
case ASSIGNED_TO_EDGE :
case UNASSIGNED_FROM_EDGE :
case ALARM_ACK :
case ALARM_CLEAR :
case CREDENTIALS_UPDATED :
case RELATION_ADD_OR_UPDATE :
case RELATION_DELETED :
case CREDENTIALS_REQUEST :
case RPC_CALL :
case ASSIGNED_TO_CUSTOMER :
case UNASSIGNED_FROM_CUSTOMER :
downlinkMsg = convertEntityEventToDownlink ( edgeEvent ) ;
log . trace ( "[{}][{}] entity message processed [{}]" , edgeEvent . getTenantId ( ) , this . sessionId , downlinkMsg ) ;
break ;
case ATTRIBUTES_UPDATED :
case POST_ATTRIBUTES :
case ATTRIBUTES_DELETED :
case TIMESERIES_UPDATED :
downlinkMsg = ctx . getTelemetryProcessor ( ) . convertTelemetryEventToDownlink ( edgeEvent ) ;
break ;
default :
log . warn ( "[{}][{}] Unsupported action type [{}]" , edge . getTenantId ( ) , this . sessionId , edgeEvent . getAction ( ) ) ;
}
} catch ( Exception e ) {
log . error ( "[{}][{}] Exception during converting edge event to downlink msg" , edge . getTenantId ( ) , this . sessionId , e ) ;
}
if ( downlinkMsg ! = null ) {
result . add ( downlinkMsg ) ;
}
}
return result ;
}
private ListenableFuture < Pair < Long , Long > > getQueueStartTsAndSeqId ( ) {
ListenableFuture < List < AttributeKvEntry > > future =
ctx . getAttributesService ( ) . find ( edge . getTenantId ( ) , edge . getId ( ) , DataConstants . SERVER_SCOPE , Arrays . asList ( QUEUE_START_TS_ATTR_KEY , QUEUE_START_SEQ_ID_ATTR_KEY ) ) ;
return Futures . transform ( future , attributeKvEntries - > {
long startTs = 0L ;
long startSeqId = 0L ;
for ( AttributeKvEntry attributeKvEntry : attributeKvEntries ) {
if ( QUEUE_START_TS_ATTR_KEY . equals ( attributeKvEntry . getKey ( ) ) ) {
startTs = attributeKvEntry . getLongValue ( ) . isPresent ( ) ? attributeKvEntry . getLongValue ( ) . get ( ) : 0L ;
}
if ( QUEUE_START_SEQ_ID_ATTR_KEY . equals ( attributeKvEntry . getKey ( ) ) ) {
startSeqId = attributeKvEntry . getLongValue ( ) . isPresent ( ) ? attributeKvEntry . getLongValue ( ) . get ( ) : 0L ;
}
}
if ( startSeqId = = 0L ) {
startSeqId = findStartSeqIdFromOldestEventIfAny ( ) ;
}
return Pair . of ( startTs , startSeqId ) ;
} , ctx . getGrpcCallbackExecutorService ( ) ) ;
}
private boolean isSeqIdStartedNewCycle ( ) {
try {
TimePageLink pageLink = new TimePageLink ( ctx . getEdgeEventStorageSettings ( ) . getMaxReadRecordsCount ( ) , 0 , null , null , this . newStartTs , System . currentTimeMillis ( ) ) ;
PageData < EdgeEvent > edgeEvents = ctx . getEdgeEventService ( ) . findEdgeEvents ( edge . getTenantId ( ) , edge . getId ( ) , 0L , this . previousStartSeqId = = 0 ? null : this . previousStartSeqId - 1 , pageLink ) ;
return ! edgeEvents . getData ( ) . isEmpty ( ) ;
} catch ( Exception e ) {
log . error ( "[{}][{}] Exception during converting edge event to downlink msg" , edge . getTenantId ( ) , this . sessionId , e ) ;
log . error ( "[{}][{}][{}] Failed to execute isSeqIdStartedNewCycle " , edge . getTenantId ( ) , edge . getId ( ) , sessionId , e ) ;
}
return downlinkMsg ;
return false ;
}
private List < DownlinkMsg > convertToDownlinkMsgsPack ( List < EdgeEvent > edgeEvents ) {
return edgeEvents
. stream ( )
. map ( this : : convertToDownlinkMsg )
. filter ( Objects : : nonNull )
. collect ( Collectors . toList ( ) ) ;
private boolean isNewEdgeEventsAvailable ( ) {
try {
TimePageLink pageLink = new TimePageLink ( ctx . getEdgeEventStorageSettings ( ) . getMaxReadRecordsCount ( ) , 0 , null , null , this . newStartTs , System . currentTimeMillis ( ) ) ;
PageData < EdgeEvent > edgeEvents = ctx . getEdgeEventService ( ) . findEdgeEvents ( edge . getTenantId ( ) , edge . getId ( ) , this . newStartSeqId , null , pageLink ) ;
return ! edgeEvents . getData ( ) . isEmpty ( ) ;
} catch ( Exception e ) {
log . error ( "[{}][{}][{}] Failed to execute isNewEdgeEventsAvailable" , edge . getTenantId ( ) , edge . getId ( ) , sessionId , e ) ;
}
return false ;
}
private ListenableFuture < Long > getQueueStartTs ( ) {
ListenableFuture < Optional < AttributeKvEntry > > future =
ctx . getAttributesService ( ) . find ( edge . getTenantId ( ) , edge . getId ( ) , DataConstants . SERVER_SCOPE , QUEUE_START_TS_ATTR_KEY ) ;
return Futures . transform ( future , attributeKvEntryOpt - > {
if ( attributeKvEntryOpt ! = null & & attributeKvEntryOpt . isPresent ( ) ) {
AttributeKvEntry attributeKvEntry = attributeKvEntryOpt . get ( ) ;
return attributeKvEntry . getLongValue ( ) . isPresent ( ) ? attributeKvEntry . getLongValue ( ) . get ( ) : 0L ;
} else {
return 0L ;
private long findStartSeqIdFromOldestEventIfAny ( ) {
long startSeqId = 0L ;
try {
TimePageLink pageLink = new TimePageLink ( 1 , 0 , null , new SortOrder ( "createdTime" ) , null , null ) ;
PageData < EdgeEvent > edgeEvents = ctx . getEdgeEventService ( ) . findEdgeEvents ( edge . getTenantId ( ) , edge . getId ( ) , null , null , pageLink ) ;
if ( ! edgeEvents . getData ( ) . isEmpty ( ) ) {
startSeqId = edgeEvents . getData ( ) . get ( 0 ) . getSeqId ( ) - 1 ;
}
} , ctx . getGrpcCallbackExecutorService ( ) ) ;
} catch ( Exception e ) {
log . error ( "[{}][{}][{}] Failed to execute findStartSeqIdFromOldestEventIfAny" , edge . getTenantId ( ) , edge . getId ( ) , sessionId , e ) ;
}
return startSeqId ;
}
private ListenableFuture < List < String > > updateQueueStartTs ( Long newStartTs ) {
log . trace ( "[{}] updating QueueStartTs [{}][{}]" , this . sessionId , edge . getId ( ) , newStartTs ) ;
List < AttributeKvEntry > attributes = Collections . singletonList (
new BaseAttributeKvEntry (
new LongDataEntry ( QUEUE_START_TS_ATTR_KEY , newStartTs ) , System . currentTimeMillis ( ) ) ) ;
private ListenableFuture < List < String > > updateQueueStartTsAndSeqId ( Pair < Long , Long > pair ) {
this . newStartTs = pair . getFirst ( ) ;
this . newStartSeqId = pair . getSecond ( ) ;
log . trace ( "[{}] updateQueueStartTsAndSeqId [{}][{}][{}]" , this . sessionId , edge . getId ( ) , this . newStartTs , this . newStartSeqId ) ;
List < AttributeKvEntry > attributes = Arrays . asList (
new BaseAttributeKvEntry ( new LongDataEntry ( QUEUE_START_TS_ATTR_KEY , this . newStartTs ) , System . currentTimeMillis ( ) ) ,
new BaseAttributeKvEntry ( new LongDataEntry ( QUEUE_START_SEQ_ID_ATTR_KEY , this . newStartSeqId ) , System . currentTimeMillis ( ) ) ) ;
return ctx . getAttributesService ( ) . save ( edge . getTenantId ( ) , edge . getId ( ) , DataConstants . SERVER_SCOPE , attributes ) ;
}
@ -693,8 +767,11 @@ public final class EdgeGrpcSession implements Closeable {
}
private void interruptPreviousSendDownlinkMsgsTask ( ) {
log . debug ( "[{}][{}][{}] Previous send downlink future was not properly completed, stopping it now!" , edge . getTenantId ( ) , edge . getId ( ) , this . sessionId ) ;
stopCurrentSendDownlinkMsgsTask ( true ) ;
if ( sessionState . getSendDownlinkMsgsFuture ( ) ! = null & & ! sessionState . getSendDownlinkMsgsFuture ( ) . isDone ( )
| | sessionState . getScheduledSendDownlinkTask ( ) ! = null & & ! sessionState . getScheduledSendDownlinkTask ( ) . isCancelled ( ) ) {
log . debug ( "[{}][{}][{}] Previous send downlink future was not properly completed, stopping it now!" , edge . getTenantId ( ) , edge . getId ( ) , this . sessionId ) ;
stopCurrentSendDownlinkMsgsTask ( true ) ;
}
}
private void interruptGeneralProcessingOnSync ( ) {