@ -183,7 +183,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
void processRpcRequest ( TbActorCtx context , ToDeviceRpcRequestActorMsg msg ) {
ToDeviceRpcRequest request = msg . getMsg ( ) ;
UUID rpcId = request . getId ( ) ;
log . debug ( "[{}][{}] Received rpc request to process ..." , deviceId , rpcId ) ;
log . debug ( "[{}][{}] Received RPC request to process ..." , deviceId , rpcId ) ;
ToDeviceRpcRequestMsg rpcRequest = creteToDeviceRpcRequestMsg ( request ) ;
long timeout = request . getExpirationTime ( ) - System . currentTimeMillis ( ) ;
@ -202,18 +202,18 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
boolean sent = false ;
int requestId = rpcRequest . getRequestId ( ) ;
if ( systemContext . isEdgesEnabled ( ) & & edgeId ! = null ) {
log . debug ( "[{}][{}] device is related to edge: [{}]. Saving rpc request: [{}][{}] to edge queue" , tenantId , deviceId , edgeId . getId ( ) , rpcId , requestId ) ;
log . debug ( "[{}][{}] device is related to edge: [{}]. Saving RPC request: [{}][{}] to edge queue" , tenantId , deviceId , edgeId . getId ( ) , rpcId , requestId ) ;
try {
saveRpcRequestToEdgeQueue ( request , rpcRequest . getRequestId ( ) ) . get ( ) ;
sent = true ;
} catch ( InterruptedException | ExecutionException e ) {
log . error ( "[{}][{}][{}] Failed to save rpc request to edge queue {}" , tenantId , deviceId , edgeId . getId ( ) , request , e ) ;
log . error ( "[{}][{}][{}] Failed to save RPC request to edge queue {}" , tenantId , deviceId , edgeId . getId ( ) , request , e ) ;
}
} else if ( isSendNewRpcAvailable ( ) ) {
sent = rpcSubscriptions . size ( ) > 0 ;
Set < UUID > syncSessionSet = new HashSet < > ( ) ;
rpcSubscriptions . forEach ( ( sessionId , sessionInfo ) - > {
log . debug ( "[{}][{}][{}][{}] send rpc request to transport ..." , deviceId , sessionId , rpcId , requestId ) ;
log . debug ( "[{}][{}][{}][{}] send RPC request to transport ..." , deviceId , sessionId , rpcId , requestId ) ;
sendToTransport ( rpcRequest , sessionId , sessionInfo . getNodeId ( ) ) ;
if ( SessionType . SYNC = = sessionInfo . getType ( ) ) {
syncSessionSet . add ( sessionId ) ;
@ -230,15 +230,15 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
if ( ! persisted & & request . isOneway ( ) & & sent ) {
log . debug ( "[{}] Rpc command response sent [{}][{}]!" , deviceId , rpcId , requestId ) ;
log . debug ( "[{}] RPC command response sent [{}][{}]!" , deviceId , rpcId , requestId ) ;
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor ( new FromDeviceRpcResponse ( msg . getMsg ( ) . getId ( ) , null , null ) ) ;
} else {
registerPendingRpcRequest ( context , msg , sent , rpcRequest , timeout ) ;
}
if ( sent ) {
log . debug ( "[{}][{}][{}] Rpc request is sent!" , deviceId , rpcId , requestId ) ;
log . debug ( "[{}][{}][{}] RPC request is sent!" , deviceId , rpcId , requestId ) ;
} else {
log . debug ( "[{}][{}][{}] Rpc request is NOT sent!" , deviceId , rpcId , requestId ) ;
log . debug ( "[{}][{}][{}] RPC request is NOT sent!" , deviceId , rpcId , requestId ) ;
}
}
@ -277,19 +277,19 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
void processRpcResponsesFromEdge ( TbActorCtx context , FromDeviceRpcResponseActorMsg responseMsg ) {
log . debug ( "[{}] Processing rpc command response from edge session" , deviceId ) ;
log . debug ( "[{}] Processing RPC command response from edge session" , deviceId ) ;
ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap . remove ( responseMsg . getRequestId ( ) ) ;
boolean success = requestMd ! = null ;
if ( success ) {
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor ( responseMsg . getMsg ( ) ) ;
} else {
log . debug ( "[{}] Rpc command response [{}] is stale!" , deviceId , responseMsg . getRequestId ( ) ) ;
log . debug ( "[{}] RPC command response [{}] is stale!" , deviceId , responseMsg . getRequestId ( ) ) ;
}
}
void processRemoveRpc ( TbActorCtx context , RemoveRpcActorMsg msg ) {
UUID requestId = msg . getRequestId ( ) ;
log . debug ( "[{}][{}] Received remove rpc request ..." , deviceId , requestId ) ;
log . debug ( "[{}][{}] Received remove RPC request ..." , deviceId , requestId ) ;
Map . Entry < Integer , ToDeviceRpcRequestMetadata > entry = null ;
for ( Map . Entry < Integer , ToDeviceRpcRequestMetadata > e : toDeviceRpcPendingMap . entrySet ( ) ) {
if ( e . getValue ( ) . getMsg ( ) . getMsg ( ) . getId ( ) . equals ( msg . getRequestId ( ) ) ) {
@ -306,7 +306,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
Optional < Map . Entry < Integer , ToDeviceRpcRequestMetadata > > firstRpc = getFirstRpc ( ) ;
if ( firstRpc . isPresent ( ) & & key . equals ( firstRpc . get ( ) . getKey ( ) ) ) {
toDeviceRpcPendingMap . remove ( key ) ;
log . debug ( "[{}][{}][{}] Removed pending rpc ! Going to send next pending request ..." , deviceId , requestId , key ) ;
log . debug ( "[{}][{}][{}] Removed pending RPC ! Going to send next pending request ..." , deviceId , requestId , key ) ;
sendNextPendingRequest ( context ) ;
} else {
toDeviceRpcPendingMap . remove ( key ) ;
@ -316,7 +316,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
private void registerPendingRpcRequest ( TbActorCtx context , ToDeviceRpcRequestActorMsg msg , boolean sent , ToDeviceRpcRequestMsg rpcRequest , long timeout ) {
log . debug ( "[{}][{}][{}] Registering pending rpc request..." , deviceId , getRpcIdFromRequest ( rpcRequest ) , rpcRequest . getRequestId ( ) ) ;
log . debug ( "[{}][{}][{}] Registering pending RPC request..." , deviceId , getRpcIdFromRequest ( rpcRequest ) , rpcRequest . getRequestId ( ) ) ;
toDeviceRpcPendingMap . put ( rpcRequest . getRequestId ( ) , new ToDeviceRpcRequestMetadata ( msg , sent ) ) ;
DeviceActorServerSideRpcTimeoutMsg timeoutMsg = new DeviceActorServerSideRpcTimeoutMsg ( rpcRequest . getRequestId ( ) , timeout ) ;
scheduleMsgWithDelay ( context , timeoutMsg , timeoutMsg . getTimeout ( ) ) ;
@ -327,14 +327,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap . remove ( requestId ) ;
if ( requestMd ! = null ) {
UUID rpcId = requestMd . getMsg ( ) . getMsg ( ) . getId ( ) ;
log . debug ( "[{}][{}][{}] Rpc request timeout detected!" , deviceId , rpcId , requestId ) ;
log . debug ( "[{}][{}][{}] RPC request timeout detected!" , deviceId , rpcId , requestId ) ;
if ( requestMd . getMsg ( ) . getMsg ( ) . isPersisted ( ) ) {
systemContext . getTbRpcService ( ) . save ( tenantId , new RpcId ( rpcId ) , RpcStatus . EXPIRED , null ) ;
}
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor ( new FromDeviceRpcResponse ( rpcId ,
null , requestMd . isSent ( ) ? RpcError . TIMEOUT : RpcError . NO_ACTIVE_CONNECTION ) ) ;
if ( ! requestMd . isDelivered ( ) ) {
log . debug ( "[{}][{}][{}] Pending rpc timeout detected! Going to send next pending request ..." , deviceId , rpcId , requestId ) ;
log . debug ( "[{}][{}][{}] Pending RPC timeout detected! Going to send next pending request ..." , deviceId , rpcId , requestId ) ;
sendNextPendingRequest ( context ) ;
}
}
@ -343,13 +343,13 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
private void sendPendingRequests ( TbActorCtx context , UUID sessionId , String nodeId ) {
SessionType sessionType = getSessionType ( sessionId ) ;
if ( ! toDeviceRpcPendingMap . isEmpty ( ) ) {
log . debug ( "[{}][{}] Pushing {} pending rpc messages to new async session!" , deviceId , sessionId , toDeviceRpcPendingMap . size ( ) ) ;
log . debug ( "[{}][{}] Pushing {} pending RPC messages to new async session!" , deviceId , sessionId , toDeviceRpcPendingMap . size ( ) ) ;
if ( sessionType = = SessionType . SYNC ) {
log . debug ( "[{}] Cleanup sync rpc session [{}]" , deviceId , sessionId ) ;
log . debug ( "[{}] Cleanup sync RPC session [{}]" , deviceId , sessionId ) ;
rpcSubscriptions . remove ( sessionId ) ;
}
} else {
log . debug ( "[{}] No pending rpc messages for new async session [{}]" , deviceId , sessionId ) ;
log . debug ( "[{}] No pending RPC messages for new async session [{}]" , deviceId , sessionId ) ;
}
Set < Integer > sentOneWayIds = new HashSet < > ( ) ;
@ -393,7 +393,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
. setOneway ( request . isOneway ( ) )
. setPersisted ( request . isPersisted ( ) )
. build ( ) ;
log . debug ( "[{}][{}][{}][{}] Send pending rpc request to transport ..." , deviceId , sessionId , getRpcIdFromRequest ( rpcRequest ) , requestId ) ;
log . debug ( "[{}][{}][{}][{}] Send pending RPC request to transport ..." , deviceId , sessionId , getRpcIdFromRequest ( rpcRequest ) , requestId ) ;
sendToTransport ( rpcRequest , sessionId , nodeId ) ;
} ;
}
@ -586,10 +586,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
private void processRpcResponses ( TbActorCtx context , SessionInfoProto sessionInfo , ToDeviceRpcResponseMsg responseMsg ) {
UUID sessionId = getSessionId ( sessionInfo ) ;
log . debug ( "[{}][{}] Processing rpc command response: {}" , deviceId , sessionId , responseMsg ) ;
log . debug ( "[{}][{}] Processing RPC command response: {}" , deviceId , sessionId , responseMsg ) ;
ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap . remove ( responseMsg . getRequestId ( ) ) ;
boolean success = requestMd ! = null ;
if ( success ) {
ToDeviceRpcRequest toDeviceRequestMsg = requestMd . getMsg ( ) . getMsg ( ) ;
boolean delivered = requestMd . isDelivered ( ) ;
boolean hasError = StringUtils . isNotEmpty ( responseMsg . getError ( ) ) ;
try {
@ -599,12 +600,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
} else if ( delivered ) {
payload = responseMsg . getPayload ( ) ;
} else {
payload = "Received response for undelivered rpc : " + responseMsg . getPayload ( ) ;
payload = "Received response for undelivered RPC : " + responseMsg . getPayload ( ) ;
}
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor (
new FromDeviceRpcResponse ( requestMd . getMsg ( ) . getMsg ( ) . getId ( ) ,
new FromDeviceRpcResponse ( toDeviceRequestMsg . getId ( ) ,
payload , null ) ) ;
if ( requestMd . getMsg ( ) . getMsg ( ) . isPersisted ( ) ) {
if ( toDeviceRequestMsg . isPersisted ( ) ) {
RpcStatus status = hasError | | ! delivered ? RpcStatus . FAILED : RpcStatus . SUCCESSFUL ;
JsonNode response ;
try {
@ -612,17 +613,17 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
} catch ( IllegalArgumentException e ) {
response = JacksonUtil . newObjectNode ( ) . put ( "error" , payload ) ;
}
systemContext . getTbRpcService ( ) . save ( tenantId , new RpcId ( requestMd . getMsg ( ) . getMsg ( ) . getId ( ) ) , status , response ) ;
systemContext . getTbRpcService ( ) . save ( tenantId , new RpcId ( toDeviceRequestMsg . getId ( ) ) , status , response ) ;
}
} finally {
if ( ! delivered ) {
String errorResponse = hasError ? "error" : "" ;
log . debug ( "[{}][{}][{}] Received {} response for undelivered rpc ! Going to send next pending request ..." , deviceId , sessionId , responseMsg . getRequestId ( ) , errorResponse ) ;
log . debug ( "[{}][{}][{}] Received {} response for undelivered RPC ! Going to send next pending request ..." , deviceId , sessionId , responseMsg . getRequestId ( ) , errorResponse ) ;
sendNextPendingRequest ( context ) ;
}
}
} else {
log . debug ( "[{}][{}][{}] Rpc command response is stale!" , deviceId , sessionId , responseMsg . getRequestId ( ) ) ;
log . debug ( "[{}][{}][{}] RPC command response is stale!" , deviceId , sessionId , responseMsg . getRequestId ( ) ) ;
}
}
@ -631,7 +632,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
RpcStatus status = RpcStatus . valueOf ( responseMsg . getStatus ( ) ) ;
UUID sessionId = getSessionId ( sessionInfo ) ;
int requestId = responseMsg . getRequestId ( ) ;
log . debug ( "[{}][{}][{}][{}] Processing rpc command response status: [{}]" , deviceId , sessionId , rpcId , requestId , status ) ;
log . debug ( "[{}][{}][{}][{}] Processing RPC command response status: [{}]" , deviceId , sessionId , rpcId , requestId , status ) ;
ToDeviceRpcRequestMetadata md = toDeviceRpcPendingMap . get ( requestId ) ;
if ( md ! = null ) {
@ -661,11 +662,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
systemContext . getTbRpcService ( ) . save ( tenantId , new RpcId ( rpcId ) , status , response ) ;
}
if ( status ! = RpcStatus . SENT ) {
log . debug ( "[{}][{}][{}][{}] Rpc was {}! Going to send next pending request ..." , deviceId , sessionId , rpcId , requestId , status . name ( ) . toLowerCase ( ) ) ;
log . debug ( "[{}][{}][{}][{}] RPC was {}! Going to send next pending request ..." , deviceId , sessionId , rpcId , requestId , status . name ( ) . toLowerCase ( ) ) ;
sendNextPendingRequest ( context ) ;
}
} else {
log . warn ( "[{}][{}][{}][{}] Rpc has already been removed from pending map." , deviceId , sessionId , rpcId , requestId ) ;
log . warn ( "[{}][{}][{}][{}] RPC has already been removed from pending map." , deviceId , sessionId , rpcId , requestId ) ;
}
}
@ -693,7 +694,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
private void processSubscriptionCommands ( TbActorCtx context , SessionInfoProto sessionInfo , SubscribeToRPCMsg subscribeCmd ) {
UUID sessionId = getSessionId ( sessionInfo ) ;
if ( subscribeCmd . getUnsubscribe ( ) ) {
log . debug ( "[{}] Canceling rpc subscription for session: [{}]" , deviceId , sessionId ) ;
log . debug ( "[{}] Canceling RPC subscription for session: [{}]" , deviceId , sessionId ) ;
rpcSubscriptions . remove ( sessionId ) ;
} else {
SessionInfoMetaData sessionMD = sessions . get ( sessionId ) ;
@ -702,7 +703,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
sessionMD . setSubscribedToRPC ( true ) ;
rpcSubscriptions . put ( sessionId , sessionMD . getSessionInfo ( ) ) ;
log . debug ( "[{}] Registered rpc subscription for session: [{}] Going to check for pending requests ..." , deviceId , sessionId ) ;
log . debug ( "[{}] Registered RPC subscription for session: [{}] Going to check for pending requests ..." , deviceId , sessionId ) ;
sendPendingRequests ( context , sessionId , sessionInfo . getNodeId ( ) ) ;
dumpSessions ( ) ;
}
@ -945,14 +946,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
log . debug ( "[{}] Restored session: {}" , deviceId , sessionMD ) ;
}
log . debug ( "[{}] Restored sessions: {}, rpc subscriptions: {}, attribute subscriptions: {}" , deviceId , sessions . size ( ) , rpcSubscriptions . size ( ) , attributeSubscriptions . size ( ) ) ;
log . debug ( "[{}] Restored sessions: {}, RPC subscriptions: {}, attribute subscriptions: {}" , deviceId , sessions . size ( ) , rpcSubscriptions . size ( ) , attributeSubscriptions . size ( ) ) ;
}
private void dumpSessions ( ) {
if ( systemContext . isLocalCacheType ( ) ) {
return ;
}
log . debug ( "[{}] Dumping sessions: {}, rpc subscriptions: {}, attribute subscriptions: {} to cache" , deviceId , sessions . size ( ) , rpcSubscriptions . size ( ) , attributeSubscriptions . size ( ) ) ;
log . debug ( "[{}] Dumping sessions: {}, RPC subscriptions: {}, attribute subscriptions: {} to cache" , deviceId , sessions . size ( ) , rpcSubscriptions . size ( ) , attributeSubscriptions . size ( ) ) ;
List < SessionSubscriptionInfoProto > sessionsList = new ArrayList < > ( sessions . size ( ) ) ;
sessions . forEach ( ( uuid , sessionMD ) - > {
if ( sessionMD . getSessionInfo ( ) . getType ( ) = = SessionType . SYNC ) {