@ -89,6 +89,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceAct
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto ;
import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg ;
import org.thingsboard.server.service.rpc.RemoveRpcActorMsg ;
import org.thingsboard.server.service.rpc.RpcSubmitStrategy ;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg ;
import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper ;
@ -106,6 +107,9 @@ import java.util.Optional;
import java.util.Set ;
import java.util.UUID ;
import java.util.concurrent.ExecutionException ;
import java.util.concurrent.ScheduledExecutorService ;
import java.util.concurrent.ScheduledFuture ;
import java.util.concurrent.TimeUnit ;
import java.util.function.Consumer ;
import java.util.stream.Collectors ;
@ -124,6 +128,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
private final Map < UUID , SessionInfo > rpcSubscriptions ;
private final Map < Integer , ToDeviceRpcRequestMetadata > toDeviceRpcPendingMap ;
private final boolean rpcSequential ;
private final RpcSubmitStrategy rpcSubmitStrategy ;
private final ScheduledExecutorService scheduler ;
private int rpcSeq = 0 ;
private String deviceName ;
@ -135,11 +141,13 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
super ( systemContext ) ;
this . tenantId = tenantId ;
this . deviceId = deviceId ;
this . rpcSequential = systemContext . isRpcSequential ( ) ;
this . rpcSubmitStrategy = RpcSubmitStrategy . parse ( systemContext . getRpcSubmitStrategy ( ) ) ;
this . rpcSequential = ! rpcSubmitStrategy . equals ( RpcSubmitStrategy . BURST ) ;
this . attributeSubscriptions = new HashMap < > ( ) ;
this . rpcSubscriptions = new HashMap < > ( ) ;
this . toDeviceRpcPendingMap = new LinkedHashMap < > ( ) ;
this . sessions = new LinkedHashMapRemoveEldest < > ( systemContext . getMaxConcurrentSessionsPerDevice ( ) , this : : notifyTransportAboutClosedSessionMaxSessionsLimit ) ;
this . scheduler = systemContext . getScheduler ( ) ;
if ( initAttributes ( ) ) {
restoreSessions ( ) ;
}
@ -225,24 +233,29 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if ( persisted ) {
ObjectNode response = JacksonUtil . newObjectNode ( ) ;
response . put ( "rpcId" , rpcId . toString ( ) ) ;
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor ( new FromDeviceRpcResponse ( msg . getMsg ( ) . getId ( ) , JacksonUtil . toString ( response ) , null ) ) ;
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor ( new FromDeviceRpcResponse ( rpcId , JacksonUtil . toString ( response ) , null ) ) ;
}
if ( ! persisted & & request . isOneway ( ) & & sent ) {
log . debug ( "[{}] RPC command response sent [{}][{}]!" , deviceId , rpcId , requestId ) ;
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor ( new FromDeviceRpcResponse ( msg . getMsg ( ) . getId ( ) , null , null ) ) ;
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor ( new FromDeviceRpcResponse ( rpcId , null , null ) ) ;
} else {
registerPendingRpcRequest ( context , msg , sent , rpcRequest , timeout ) ;
}
if ( sent ) {
log . debug ( "[{}][{}][{}] RPC request is sent!" , deviceId , rpcId , requestId ) ;
} else {
log . debug ( "[{}][{}][{}] RPC request is NOT sent!" , deviceId , rpcId , requestId ) ;
}
String rpcSent = sent ? "" : "NOT " ;
log . debug ( "[{}][{}][{}] RPC request is {}sent!" , deviceId , rpcId , requestId , rpcSent ) ;
}
private boolean isSendNewRpcAvailable ( ) {
return ! rpcSequential | | toDeviceRpcPendingMap . values ( ) . stream ( ) . filter ( md - > ! md . isDelivered ( ) ) . findAny ( ) . isEmpty ( ) ;
if ( rpcSequential ) {
if ( rpcSubmitStrategy . equals ( RpcSubmitStrategy . SEQUENTIAL_ON_ACK_FROM_DEVICE ) ) {
return toDeviceRpcPendingMap . values ( ) . stream ( ) . filter ( md - > ! md . isDelivered ( ) ) . findAny ( ) . isEmpty ( ) ;
}
if ( rpcSubmitStrategy . equals ( RpcSubmitStrategy . SEQUENTIAL_ON_RESPONSE_FROM_DEVICE ) ) {
return toDeviceRpcPendingMap . values ( ) . stream ( ) . filter ( ToDeviceRpcRequestMetadata : : isDelivered ) . findAny ( ) . isEmpty ( ) ;
}
}
return true ;
}
private void createRpc ( ToDeviceRpcRequest request , RpcStatus status ) {
@ -283,31 +296,35 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
void processRemoveRpc ( RemoveRpcActorMsg msg ) {
UUID request Id = msg . getRequestId ( ) ;
log . debug ( "[{}][{}] Received remove RPC request ..." , deviceId , request Id ) ;
UUID rpc Id = msg . getRequestId ( ) ;
log . debug ( "[{}][{}] Received remove RPC request ..." , deviceId , rpc Id ) ;
Map . Entry < Integer , ToDeviceRpcRequestMetadata > entry = null ;
for ( Map . Entry < Integer , ToDeviceRpcRequestMetadata > e : toDeviceRpcPendingMap . entrySet ( ) ) {
if ( e . getValue ( ) . getMsg ( ) . getMsg ( ) . getId ( ) . equals ( request Id ) ) {
if ( e . getValue ( ) . getMsg ( ) . getMsg ( ) . getId ( ) . equals ( rpc Id ) ) {
entry = e ;
break ;
}
}
if ( entry ! = null ) {
Integer key = entry . getKey ( ) ;
if ( entry . getValue ( ) . isDelivered ( ) ) {
toDeviceRpcPendingMap . remove ( key ) ;
} else {
Optional < Map . Entry < Integer , ToDeviceRpcRequestMetadata > > firstRpc = getFirstRpc ( ) ;
if ( firstRpc . isPresent ( ) & & key . equals ( firstRpc . get ( ) . getKey ( ) ) ) {
toDeviceRpcPendingMap . remove ( key ) ;
log . debug ( "[{}][{}][{}] Removed pending RPC! Going to send next pending request ..." , deviceId , requestId , key ) ;
sendNextPendingRequest ( ) ;
} else {
toDeviceRpcPendingMap . remove ( key ) ;
}
if ( entry = = null ) {
return ;
}
Integer requestId = entry . getKey ( ) ;
if ( entry . getValue ( ) . isDelivered ( ) ) {
var md = toDeviceRpcPendingMap . remove ( requestId ) ;
if ( rpcSubmitStrategy . equals ( RpcSubmitStrategy . SEQUENTIAL_ON_RESPONSE_FROM_DEVICE ) ) {
clearAwaitRpcResponseScheduler ( md ) ;
sendNextPendingRequest ( rpcId , requestId , "Removed pending RPC!" ) ;
}
return ;
}
Optional < Map . Entry < Integer , ToDeviceRpcRequestMetadata > > firstRpc = getFirstRpc ( ) ;
if ( firstRpc . isPresent ( ) & & requestId . equals ( firstRpc . get ( ) . getKey ( ) ) ) {
toDeviceRpcPendingMap . remove ( requestId ) ;
sendNextPendingRequest ( rpcId , requestId , "Removed pending RPC!" ) ;
return ;
}
toDeviceRpcPendingMap . remove ( requestId ) ;
}
private void registerPendingRpcRequest ( TbActorCtx context , ToDeviceRpcRequestActorMsg msg , boolean sent , ToDeviceRpcRequestMsg rpcRequest , long timeout ) {
@ -321,20 +338,25 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
void processServerSideRpcTimeout ( DeviceActorServerSideRpcTimeoutMsg msg ) {
Integer requestId = msg . getId ( ) ;
ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap . remove ( requestId ) ;
if ( requestMd ! = null ) {
ToDeviceRpcRequest toDeviceRpcRequest = requestMd . getMsg ( ) . getMsg ( ) ;
UUID rpcId = toDeviceRpcRequest . getId ( ) ;
log . debug ( "[{}][{}][{}] RPC request timeout detected!" , deviceId , rpcId , requestId ) ;
if ( toDeviceRpcRequest . isPersisted ( ) ) {
systemContext . getTbRpcService ( ) . save ( tenantId , new RpcId ( rpcId ) , RpcStatus . EXPIRED , null ) ;
}
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor ( new FromDeviceRpcResponse ( rpcId ,
null , requestMd . isSent ( ) ? RpcError . TIMEOUT : RpcError . NO_ACTIVE_CONNECTION ) ) ;
if ( ! requestMd . isDelivered ( ) ) {
log . debug ( "[{}][{}][{}] Pending RPC timeout detected! Going to send next pending request ..." , deviceId , rpcId , requestId ) ;
sendNextPendingRequest ( ) ;
}
var requestMd = toDeviceRpcPendingMap . remove ( requestId ) ;
if ( requestMd = = null ) {
return ;
}
var toDeviceRpcRequest = requestMd . getMsg ( ) . getMsg ( ) ;
UUID rpcId = toDeviceRpcRequest . getId ( ) ;
log . debug ( "[{}][{}][{}] RPC request timeout detected!" , deviceId , rpcId , requestId ) ;
if ( toDeviceRpcRequest . isPersisted ( ) ) {
systemContext . getTbRpcService ( ) . save ( tenantId , new RpcId ( rpcId ) , RpcStatus . EXPIRED , null ) ;
}
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor ( new FromDeviceRpcResponse ( rpcId ,
null , requestMd . isSent ( ) ? RpcError . TIMEOUT : RpcError . NO_ACTIVE_CONNECTION ) ) ;
if ( ! requestMd . isDelivered ( ) ) {
sendNextPendingRequest ( rpcId , requestId , "Pending RPC timeout detected!" ) ;
return ;
}
if ( rpcSubmitStrategy . equals ( RpcSubmitStrategy . SEQUENTIAL_ON_RESPONSE_FROM_DEVICE ) ) {
clearAwaitRpcResponseScheduler ( requestMd ) ;
sendNextPendingRequest ( rpcId , requestId , "Pending RPC timeout detected!" ) ;
}
}
@ -363,10 +385,25 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
private Optional < Map . Entry < Integer , ToDeviceRpcRequestMetadata > > getFirstRpc ( ) {
if ( rpcSubmitStrategy . equals ( RpcSubmitStrategy . SEQUENTIAL_ON_RESPONSE_FROM_DEVICE ) ) {
return toDeviceRpcPendingMap . entrySet ( ) . stream ( )
. findFirst ( ) . filter ( entry - > {
var md = entry . getValue ( ) ;
if ( md . isDelivered ( ) ) {
if ( md . getAwaitRpcResponseFuture ( ) = = null | | md . getAwaitRpcResponseFuture ( ) . isCancelled ( ) ) {
var toDeviceRpcRequest = md . getMsg ( ) . getMsg ( ) ;
scheduleAwaitRpcResponseFuture ( toDeviceRpcRequest . getId ( ) , entry . getKey ( ) ) ;
}
return false ;
}
return true ;
} ) ;
}
return toDeviceRpcPendingMap . entrySet ( ) . stream ( ) . filter ( e - > ! e . getValue ( ) . isDelivered ( ) ) . findFirst ( ) ;
}
private void sendNextPendingRequest ( ) {
private void sendNextPendingRequest ( UUID rpcId , int requestId , String logMessage ) {
log . debug ( "[{}][{}][{}] {} Going to send next pending request ..." , deviceId , rpcId , requestId , logMessage ) ;
if ( rpcSequential ) {
rpcSubscriptions . forEach ( ( id , s ) - > sendPendingRequests ( id , s . getNodeId ( ) ) ) ;
}
@ -588,34 +625,38 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
log . debug ( "[{}][{}] Processing RPC command response: {}" , deviceId , sessionId , responseMsg ) ;
int requestId = responseMsg . getRequestId ( ) ;
ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap . remove ( requestId ) ;
boolean success = requestMd ! = null ;
if ( success ) {
ToDeviceRpcRequest toDeviceRequestMsg = requestMd . getMsg ( ) . getMsg ( ) ;
boolean delivered = requestMd . isDelivered ( ) ;
boolean hasError = StringUtils . isNotEmpty ( responseMsg . getError ( ) ) ;
try {
String payload = hasError ? responseMsg . getError ( ) : responseMsg . getPayload ( ) ;
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor (
new FromDeviceRpcResponse ( toDeviceRequestMsg . getId ( ) , payload , null ) ) ;
if ( toDeviceRequestMsg . isPersisted ( ) ) {
RpcStatus status = hasError ? RpcStatus . FAILED : RpcStatus . SUCCESSFUL ;
JsonNode response ;
try {
response = JacksonUtil . toJsonNode ( payload ) ;
} catch ( IllegalArgumentException e ) {
response = JacksonUtil . newObjectNode ( ) . put ( "error" , payload ) ;
}
systemContext . getTbRpcService ( ) . save ( tenantId , new RpcId ( toDeviceRequestMsg . getId ( ) ) , status , response ) ;
}
} finally {
if ( ! delivered ) {
String errorResponse = hasError ? "error" : "" ;
log . debug ( "[{}][{}][{}] Received {} response for undelivered RPC! Going to send next pending request ..." , deviceId , sessionId , requestId , errorResponse ) ;
sendNextPendingRequest ( ) ;
if ( requestMd = = null ) {
log . debug ( "[{}][{}][{}] RPC command response is stale!" , deviceId , sessionId , requestId ) ;
return ;
}
ToDeviceRpcRequest toDeviceRequestMsg = requestMd . getMsg ( ) . getMsg ( ) ;
UUID rpcId = toDeviceRequestMsg . getId ( ) ;
boolean delivered = requestMd . isDelivered ( ) ;
boolean hasError = StringUtils . isNotEmpty ( responseMsg . getError ( ) ) ;
try {
String payload = hasError ? responseMsg . getError ( ) : responseMsg . getPayload ( ) ;
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor (
new FromDeviceRpcResponse ( rpcId , payload , null ) ) ;
if ( toDeviceRequestMsg . isPersisted ( ) ) {
RpcStatus status = hasError ? RpcStatus . FAILED : RpcStatus . SUCCESSFUL ;
JsonNode response ;
try {
response = JacksonUtil . toJsonNode ( payload ) ;
} catch ( IllegalArgumentException e ) {
response = JacksonUtil . newObjectNode ( ) . put ( "error" , payload ) ;
}
systemContext . getTbRpcService ( ) . save ( tenantId , new RpcId ( rpcId ) , status , response ) ;
}
} finally {
if ( rpcSubmitStrategy . equals ( RpcSubmitStrategy . SEQUENTIAL_ON_RESPONSE_FROM_DEVICE ) ) {
clearAwaitRpcResponseScheduler ( requestMd ) ;
String errorResponse = hasError ? "error " : "" ;
String rpcState = delivered ? "" : "undelivered " ;
sendNextPendingRequest ( rpcId , requestId , String . format ( "Received %sresponse for %sRPC!" , errorResponse , rpcState ) ) ;
} else if ( ! delivered ) {
String errorResponse = hasError ? "error " : "" ;
sendNextPendingRequest ( rpcId , requestId , String . format ( "Received %sresponse for undelivered RPC!" , errorResponse ) ) ;
}
} else {
log . debug ( "[{}][{}][{}] RPC command response is stale!" , deviceId , sessionId , requestId ) ;
}
}
@ -626,39 +667,50 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
int requestId = responseMsg . getRequestId ( ) ;
log . debug ( "[{}][{}][{}][{}] Processing RPC command response status: [{}]" , deviceId , sessionId , rpcId , requestId , status ) ;
ToDeviceRpcRequestMetadata md = toDeviceRpcPendingMap . get ( requestId ) ;
if ( md ! = null ) {
JsonNode response = null ;
if ( status . equals ( RpcStatus . DELIVERED ) ) {
if ( md . getMsg ( ) . getMsg ( ) . isOneway ( ) ) {
toDeviceRpcPendingMap . remove ( requestId ) ;
if ( rpcSequential ) {
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor ( new FromDeviceRpcResponse ( rpcId , null , null ) ) ;
}
} else {
md . setDelivered ( true ) ;
if ( md = = null ) {
log . warn ( "[{}][{}][{}][{}] RPC has already been removed from pending map." , deviceId , sessionId , rpcId , requestId ) ;
return ;
}
var toDeviceRpcRequest = md . getMsg ( ) . getMsg ( ) ;
boolean persisted = toDeviceRpcRequest . isPersisted ( ) ;
boolean oneWayRpc = toDeviceRpcRequest . isOneway ( ) ;
JsonNode response = null ;
if ( status . equals ( RpcStatus . DELIVERED ) ) {
if ( oneWayRpc ) {
toDeviceRpcPendingMap . remove ( requestId ) ;
if ( rpcSequential ) {
var fromDeviceRpcResponse = new FromDeviceRpcResponse ( rpcId , null , null ) ;
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor ( fromDeviceRpcResponse ) ;
}
} else if ( status . equals ( RpcStatus . TIMEOUT ) ) {
Integer maxRpcRetries = md . getMsg ( ) . getMsg ( ) . getRetries ( ) ;
maxRpcRetries = maxRpcRetries = = null ? systemContext . getMaxRpcRetries ( ) : Math . min ( maxRpcRetries , systemContext . getMaxRpcRetries ( ) ) ;
if ( maxRpcRetries < = md . getRetries ( ) ) {
toDeviceRpcPendingMap . remove ( requestId ) ;
status = RpcStatus . FAILED ;
response = JacksonUtil . newObjectNode ( ) . put ( "error" , "There was a Timeout and all retry attempts have been exhausted. Retry attempts set: " + maxRpcRetries ) ;
} else {
md . setRetries ( md . getRetries ( ) + 1 ) ;
} else {
md . setDelivered ( true ) ;
if ( rpcSubmitStrategy . equals ( RpcSubmitStrategy . SEQUENTIAL_ON_RESPONSE_FROM_DEVICE ) ) {
md . setAwaitRpcResponseFuture ( scheduleAwaitRpcResponseFuture ( rpcId , requestId ) ) ;
}
}
if ( md . getMsg ( ) . getMsg ( ) . isPersisted ( ) ) {
systemContext . getTbRpcService ( ) . save ( tenantId , new RpcId ( rpcId ) , status , response ) ;
}
if ( status ! = RpcStatus . SENT ) {
log . debug ( "[{}][{}][{}][{}] RPC was {}! Going to send next pending request ..." , deviceId , sessionId , rpcId , requestId , status . name ( ) . toLowerCase ( ) ) ;
sendNextPendingRequest ( ) ;
} else if ( status . equals ( RpcStatus . TIMEOUT ) ) {
Integer maxRpcRetries = toDeviceRpcRequest . getRetries ( ) ;
maxRpcRetries = maxRpcRetries = = null ?
systemContext . getMaxRpcRetries ( ) : Math . min ( maxRpcRetries , systemContext . getMaxRpcRetries ( ) ) ;
if ( maxRpcRetries < = md . getRetries ( ) ) {
toDeviceRpcPendingMap . remove ( requestId ) ;
status = RpcStatus . FAILED ;
response = JacksonUtil . newObjectNode ( ) . put ( "error" , "There was a Timeout and all retry " +
"attempts have been exhausted. Retry attempts set: " + maxRpcRetries ) ;
} else {
md . setRetries ( md . getRetries ( ) + 1 ) ;
}
} else {
log . warn ( "[{}][{}][{}][{}] RPC has already been removed from pending map." , deviceId , sessionId , rpcId , requestId ) ;
}
if ( persisted ) {
systemContext . getTbRpcService ( ) . save ( tenantId , new RpcId ( rpcId ) , status , response ) ;
}
if ( rpcSubmitStrategy . equals ( RpcSubmitStrategy . SEQUENTIAL_ON_RESPONSE_FROM_DEVICE )
& & status . equals ( RpcStatus . DELIVERED ) & & ! oneWayRpc ) {
return ;
}
if ( ! status . equals ( RpcStatus . SENT ) ) {
sendNextPendingRequest ( rpcId , requestId , String . format ( "RPC was %s!" , status . name ( ) . toLowerCase ( ) ) ) ;
}
}
@ -667,16 +719,16 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if ( subscribeCmd . getUnsubscribe ( ) ) {
log . debug ( "[{}] Canceling attributes subscription for session: [{}]" , deviceId , sessionId ) ;
attributeSubscriptions . remove ( sessionId ) ;
} else {
SessionInfoMetaData sessionMD = sessions . get ( sessionId ) ;
if ( sessionMD = = null ) {
sessionMD = new SessionInfoMetaData ( new SessionInfo ( subscribeCmd . getSessionType ( ) , sessionInfo . getNodeId ( ) ) ) ;
}
sessionMD . setSubscribedToAttributes ( true ) ;
log . debug ( "[{}] Registering attributes subscription for session: [{}]" , deviceId , sessionId ) ;
attributeSubscriptions . put ( sessionId , sessionMD . getSessionInfo ( ) ) ;
dumpSessions ( ) ;
return ;
}
SessionInfoMetaData sessionMD = sessions . get ( sessionId ) ;
if ( sessionMD = = null ) {
sessionMD = new SessionInfoMetaData ( new SessionInfo ( subscribeCmd . getSessionType ( ) , sessionInfo . getNodeId ( ) ) ) ;
}
sessionMD . setSubscribedToAttributes ( true ) ;
log . debug ( "[{}] Registering attributes subscription for session: [{}]" , deviceId , sessionId ) ;
attributeSubscriptions . put ( sessionId , sessionMD . getSessionInfo ( ) ) ;
dumpSessions ( ) ;
}
private UUID getSessionId ( SessionInfoProto sessionInfo ) {
@ -688,17 +740,18 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if ( subscribeCmd . getUnsubscribe ( ) ) {
log . debug ( "[{}] Canceling RPC subscription for session: [{}]" , deviceId , sessionId ) ;
rpcSubscriptions . remove ( sessionId ) ;
} else {
SessionInfoMetaData sessionMD = sessions . get ( sessionId ) ;
if ( sessionMD = = null ) {
sessionMD = new SessionInfoMetaData ( new SessionInfo ( subscribeCmd . getSessionType ( ) , sessionInfo . getNodeId ( ) ) ) ;
}
sessionMD . setSubscribedToRPC ( true ) ;
rpcSubscriptions . put ( sessionId , sessionMD . getSessionInfo ( ) ) ;
log . debug ( "[{}] Registered RPC subscription for session: [{}] Going to check for pending requests ..." , deviceId , sessionId ) ;
sendPendingRequests ( sessionId , sessionInfo . getNodeId ( ) ) ;
dumpSessions ( ) ;
clearAwaitRpcResponseSchedulers ( ) ;
return ;
}
SessionInfoMetaData sessionMD = sessions . get ( sessionId ) ;
if ( sessionMD = = null ) {
sessionMD = new SessionInfoMetaData ( new SessionInfo ( subscribeCmd . getSessionType ( ) , sessionInfo . getNodeId ( ) ) ) ;
}
sessionMD . setSubscribedToRPC ( true ) ;
rpcSubscriptions . put ( sessionId , sessionMD . getSessionInfo ( ) ) ;
log . debug ( "[{}] Registered RPC subscription for session: [{}] Going to check for pending requests ..." , deviceId , sessionId ) ;
sendPendingRequests ( sessionId , sessionInfo . getNodeId ( ) ) ;
dumpSessions ( ) ;
}
private void processSessionStateMsgs ( SessionInfoProto sessionInfo , SessionEventMsg msg ) {
@ -722,6 +775,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
sessions . remove ( sessionId ) ;
attributeSubscriptions . remove ( sessionId ) ;
rpcSubscriptions . remove ( sessionId ) ;
clearAwaitRpcResponseSchedulers ( ) ;
if ( sessions . isEmpty ( ) ) {
reportSessionClose ( ) ;
}
@ -729,6 +783,35 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
private ScheduledFuture < ? > scheduleAwaitRpcResponseFuture ( UUID rpcId , int requestId ) {
return scheduler . schedule ( ( ) - > {
var md = toDeviceRpcPendingMap . remove ( requestId ) ;
if ( md = = null ) {
return ;
}
sendNextPendingRequest ( rpcId , requestId , "RPC was removed from pending map due to await timeout on response from device!" ) ;
var toDeviceRpcRequest = md . getMsg ( ) . getMsg ( ) ;
if ( toDeviceRpcRequest . isPersisted ( ) ) {
var responseAwaitTimeout = JacksonUtil . newObjectNode ( ) . put ( "error" , "There was a timeout awaiting for RPC response from device." ) ;
systemContext . getTbRpcService ( ) . save ( tenantId , new RpcId ( rpcId ) , RpcStatus . FAILED , responseAwaitTimeout ) ;
}
} , systemContext . getRpcResponseTimeout ( ) , TimeUnit . MILLISECONDS ) ;
}
private void clearAwaitRpcResponseSchedulers ( ) {
if ( rpcSubmitStrategy . equals ( RpcSubmitStrategy . SEQUENTIAL_ON_RESPONSE_FROM_DEVICE ) ) {
toDeviceRpcPendingMap . forEach ( ( integer , md ) - > clearAwaitRpcResponseScheduler ( md ) ) ;
}
}
private void clearAwaitRpcResponseScheduler ( ToDeviceRpcRequestMetadata md ) {
var awaitRpcResponseFuture = md . getAwaitRpcResponseFuture ( ) ;
if ( awaitRpcResponseFuture = = null ) {
return ;
}
awaitRpcResponseFuture . cancel ( true ) ;
}
private void handleSessionActivity ( SessionInfoProto sessionInfoProto , SubscriptionInfoProto subscriptionInfo ) {
UUID sessionId = getSessionId ( sessionInfoProto ) ;
Objects . requireNonNull ( sessionId ) ;