@ -83,7 +83,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionInfoProt
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg ;
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg ;
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseStatusMsg ;
import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg ;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg ;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportUpdateCredentialsProto ;
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg ;
@ -182,13 +181,15 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
void processRpcRequest ( TbActorCtx context , ToDeviceRpcRequestActorMsg msg ) {
ToDeviceRpcRequest request = msg . getMsg ( ) ;
UUID rpcId = request . getId ( ) ;
log . debug ( "[{}][{}] Received RPC request to process ..." , deviceId , rpcId ) ;
ToDeviceRpcRequestMsg rpcRequest = creteToDeviceRpcRequestMsg ( request ) ;
long timeout = request . getExpirationTime ( ) - System . currentTimeMillis ( ) ;
boolean persisted = request . isPersisted ( ) ;
if ( timeout < = 0 ) {
log . debug ( "[{}][{}] Ignoring message due to exp time reached, {}" , deviceId , request . get Id ( ) , request . getExpirationTime ( ) ) ;
log . debug ( "[{}][{}] Ignoring message due to exp time reached, {}" , deviceId , rpc Id , request . getExpirationTime ( ) ) ;
if ( persisted ) {
createRpc ( request , RpcStatus . EXPIRED ) ;
}
@ -198,21 +199,23 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
boolean sent = false ;
int requestId = rpcRequest . getRequestId ( ) ;
if ( systemContext . isEdgesEnabled ( ) & & edgeId ! = null ) {
log . debug ( "[{}][{}] device is related to edge [{}]. Saving RPC request to edge queue" , tenantId , deviceId , edgeId . getId ( ) ) ;
log . debug ( "[{}][{}] device is related to edge: [{}]. Saving RPC request: [{}][{}] to edge queue" , tenantId , deviceId , edgeId . getId ( ) , rpcId , requestId ) ;
try {
saveRpcRequestToEdgeQueue ( request , rpcRequest . getR equestId ( ) ) . get ( ) ;
saveRpcRequestToEdgeQueue ( request , requestId ) . get ( ) ;
sent = true ;
} catch ( InterruptedException | ExecutionException e ) {
log . error ( "[{}][{}][{}] Failed to save rpc request to edge queue {}" , tenantId , deviceId , edgeId . getId ( ) , request , e ) ;
log . error ( "[{}][{}][{}] Failed to save RPC request to edge queue {}" , tenantId , deviceId , edgeId . getId ( ) , request , e ) ;
}
} else if ( isSendNewRpcAvailable ( ) ) {
sent = rpcSubscriptions . size ( ) > 0 ;
Set < UUID > syncSessionSet = new HashSet < > ( ) ;
rpcSubscriptions . forEach ( ( key , value ) - > {
sendToTransport ( rpcRequest , key , value . getNodeId ( ) ) ;
if ( SessionType . SYNC = = value . getType ( ) ) {
syncSessionSet . add ( key ) ;
rpcSubscriptions . forEach ( ( sessionId , sessionInfo ) - > {
log . debug ( "[{}][{}][{}][{}] send RPC request to transport ..." , deviceId , sessionId , rpcId , requestId ) ;
sendToTransport ( rpcRequest , sessionId , sessionInfo . getNodeId ( ) ) ;
if ( SessionType . SYNC = = sessionInfo . getType ( ) ) {
syncSessionSet . add ( sessionId ) ;
}
} ) ;
log . trace ( "Rpc syncSessionSet [{}] subscription after sent [{}]" , syncSessionSet , rpcSubscriptions ) ;
@ -221,20 +224,20 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if ( persisted ) {
ObjectNode response = JacksonUtil . newObjectNode ( ) ;
response . put ( "rpcId" , request . get Id ( ) . toString ( ) ) ;
response . put ( "rpcId" , rpc Id . toString ( ) ) ;
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor ( new FromDeviceRpcResponse ( msg . getMsg ( ) . getId ( ) , JacksonUtil . toString ( response ) , null ) ) ;
}
if ( ! persisted & & request . isOneway ( ) & & sent ) {
log . debug ( "[{}] Rpc command response sent [{}]!" , deviceId , request . getId ( ) ) ;
log . debug ( "[{}] RPC command response sent [{}] [{}]!" , deviceId , rpcId , requestId ) ;
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor ( new FromDeviceRpcResponse ( msg . getMsg ( ) . getId ( ) , null , null ) ) ;
} else {
registerPendingRpcRequest ( context , msg , sent , rpcRequest , timeout ) ;
}
if ( sent ) {
log . debug ( "[{}] RPC request {} is sent!" , deviceId , request . getId ( ) ) ;
log . debug ( "[{}][{}][{}] RPC request is sent!" , deviceId , rpcId , requestId ) ;
} else {
log . debug ( "[{}] RPC request {} is NOT sent!" , deviceId , request . getId ( ) ) ;
log . debug ( "[{}][{}][{}] RPC request is NOT sent!" , deviceId , rpcId , requestId ) ;
}
}
@ -242,7 +245,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
return ! rpcSequential | | toDeviceRpcPendingMap . values ( ) . stream ( ) . filter ( md - > ! md . isDelivered ( ) ) . findAny ( ) . isEmpty ( ) ;
}
private Rpc createRpc ( ToDeviceRpcRequest request , RpcStatus status ) {
private void createRpc ( ToDeviceRpcRequest request , RpcStatus status ) {
Rpc rpc = new Rpc ( new RpcId ( request . getId ( ) ) ) ;
rpc . setCreatedTime ( System . currentTimeMillis ( ) ) ;
rpc . setTenantId ( tenantId ) ;
@ -251,7 +254,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
rpc . setRequest ( JacksonUtil . valueToTree ( request ) ) ;
rpc . setStatus ( status ) ;
rpc . setAdditionalInfo ( JacksonUtil . toJsonNode ( request . getAdditionalInfo ( ) ) ) ;
return systemContext . getTbRpcService ( ) . save ( tenantId , rpc ) ;
systemContext . getTbRpcService ( ) . save ( tenantId , rpc ) ;
}
private ToDeviceRpcRequestMsg creteToDeviceRpcRequestMsg ( ToDeviceRpcRequest request ) {
@ -268,82 +271,92 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
. build ( ) ;
}
void processRpcResponsesFromEdge ( TbActorCtx context , FromDeviceRpcResponseActorMsg responseMsg ) {
log . debug ( "[{}] Processing rpc command response from edge session" , deviceId ) ;
void processRpcResponsesFromEdge ( FromDeviceRpcResponseActorMsg responseMsg ) {
log . debug ( "[{}] Processing RPC command response from edge session" , deviceId ) ;
ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap . remove ( responseMsg . getRequestId ( ) ) ;
boolean success = requestMd ! = null ;
if ( success ) {
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor ( responseMsg . getMsg ( ) ) ;
} else {
log . debug ( "[{}] Rpc command response [{}] is stale!" , deviceId , responseMsg . getRequestId ( ) ) ;
log . debug ( "[{}] RPC command response [{}] is stale!" , deviceId , responseMsg . getRequestId ( ) ) ;
}
}
void processRemoveRpc ( TbActorCtx context , RemoveRpcActorMsg msg ) {
log . debug ( "[{}] Processing remove rpc command" , msg . getRequestId ( ) ) ;
void processRemoveRpc ( RemoveRpcActorMsg msg ) {
UUID requestId = msg . getRequestId ( ) ;
log . debug ( "[{}][{}] Received remove RPC request ..." , deviceId , requestId ) ;
Map . Entry < Integer , ToDeviceRpcRequestMetadata > entry = null ;
for ( Map . Entry < Integer , ToDeviceRpcRequestMetadata > e : toDeviceRpcPendingMap . entrySet ( ) ) {
if ( e . getValue ( ) . getMsg ( ) . getMsg ( ) . getId ( ) . equals ( msg . getRequestId ( ) ) ) {
if ( e . getValue ( ) . getMsg ( ) . getMsg ( ) . getId ( ) . equals ( requestId ) ) {
entry = e ;
break ;
}
}
if ( entry ! = null ) {
Integer key = entry . getKey ( ) ;
if ( entry . getValue ( ) . isDelivered ( ) ) {
toDeviceRpcPendingMap . remove ( entry . getKey ( ) ) ;
toDeviceRpcPendingMap . remove ( key ) ;
} else {
Optional < Map . Entry < Integer , ToDeviceRpcRequestMetadata > > firstRpc = getFirstRpc ( ) ;
if ( firstRpc . isPresent ( ) & & entry . getKey ( ) . equals ( firstRpc . get ( ) . getKey ( ) ) ) {
toDeviceRpcPendingMap . remove ( entry . getKey ( ) ) ;
sendNextPendingRequest ( context ) ;
if ( firstRpc . isPresent ( ) & & key . equals ( firstRpc . get ( ) . getKey ( ) ) ) {
toDeviceRpcPendingMap . remove ( key ) ;
log . debug ( "[{}][{}][{}] Removed pending RPC! Going to send next pending request ..." , deviceId , requestId , key ) ;
sendNextPendingRequest ( ) ;
} else {
toDeviceRpcPendingMap . remove ( entry . getKey ( ) ) ;
toDeviceRpcPendingMap . remove ( key ) ;
}
}
}
}
private void registerPendingRpcRequest ( TbActorCtx context , ToDeviceRpcRequestActorMsg msg , boolean sent , ToDeviceRpcRequestMsg rpcRequest , long timeout ) {
toDeviceRpcPendingMap . put ( rpcRequest . getRequestId ( ) , new ToDeviceRpcRequestMetadata ( msg , sent ) ) ;
DeviceActorServerSideRpcTimeoutMsg timeoutMsg = new DeviceActorServerSideRpcTimeoutMsg ( rpcRequest . getRequestId ( ) , timeout ) ;
int requestId = rpcRequest . getRequestId ( ) ;
UUID rpcId = new UUID ( rpcRequest . getRequestIdMSB ( ) , rpcRequest . getRequestIdLSB ( ) ) ;
log . debug ( "[{}][{}][{}] Registering pending RPC request..." , deviceId , rpcId , requestId ) ;
toDeviceRpcPendingMap . put ( requestId , new ToDeviceRpcRequestMetadata ( msg , sent ) ) ;
DeviceActorServerSideRpcTimeoutMsg timeoutMsg = new DeviceActorServerSideRpcTimeoutMsg ( requestId , timeout ) ;
scheduleMsgWithDelay ( context , timeoutMsg , timeoutMsg . getTimeout ( ) ) ;
}
void processServerSideRpcTimeout ( TbActorCtx context , DeviceActorServerSideRpcTimeoutMsg msg ) {
ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap . remove ( msg . getId ( ) ) ;
void processServerSideRpcTimeout ( DeviceActorServerSideRpcTimeoutMsg msg ) {
Integer requestId = msg . getId ( ) ;
ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap . remove ( requestId ) ;
if ( requestMd ! = null ) {
log . debug ( "[{}] RPC request [{}] timeout detected!" , deviceId , msg . getId ( ) ) ;
if ( requestMd . getMsg ( ) . getMsg ( ) . isPersisted ( ) ) {
systemContext . getTbRpcService ( ) . save ( tenantId , new RpcId ( requestMd . getMsg ( ) . getMsg ( ) . getId ( ) ) , RpcStatus . EXPIRED , null ) ;
ToDeviceRpcRequest toDeviceRpcRequest = requestMd . getMsg ( ) . getMsg ( ) ;
UUID rpcId = toDeviceRpcRequest . getId ( ) ;
log . debug ( "[{}][{}][{}] RPC request timeout detected!" , deviceId , rpcId , requestId ) ;
if ( toDeviceRpcRequest . isPersisted ( ) ) {
systemContext . getTbRpcService ( ) . save ( tenantId , new RpcId ( rpcId ) , RpcStatus . EXPIRED , null ) ;
}
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor ( new FromDeviceRpcResponse ( requestMd . getMsg ( ) . getMsg ( ) . getId ( ) ,
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor ( new FromDeviceRpcResponse ( rpc Id ,
null , requestMd . isSent ( ) ? RpcError . TIMEOUT : RpcError . NO_ACTIVE_CONNECTION ) ) ;
if ( ! requestMd . isDelivered ( ) ) {
sendNextPendingRequest ( context ) ;
log . debug ( "[{}][{}][{}] Pending RPC timeout detected! Going to send next pending request ..." , deviceId , rpcId , requestId ) ;
sendNextPendingRequest ( ) ;
}
}
}
private void sendPendingRequests ( TbActorCtx context , UUID sessionId , String nodeId ) {
private void sendPendingRequests ( UUID sessionId , String nodeId ) {
SessionType sessionType = getSessionType ( sessionId ) ;
if ( ! toDeviceRpcPendingMap . isEmpty ( ) ) {
log . debug ( "[{}] Pushing {} pending RPC messages to new async session [{}]" , deviceId , toDeviceRpcPendingMap . size ( ) , sessionId ) ;
log . debug ( "[{}] Pushing {} pending RPC messages to session: [{}]" , deviceId , session Id , toDeviceRpcPendingMap . size ( ) ) ;
if ( sessionType = = SessionType . SYNC ) {
log . debug ( "[{}] Cleanup sync rpc session [{}]" , deviceId , sessionId ) ;
log . debug ( "[{}] Cleanup sync RPC session [{}]" , deviceId , sessionId ) ;
rpcSubscriptions . remove ( sessionId ) ;
}
} else {
log . debug ( "[{}] No pending RPC messages for new async session [{}]" , deviceId , sessionId ) ;
log . debug ( "[{}] No pending RPC messages for session: [{}]" , deviceId , sessionId ) ;
}
Set < Integer > sentOneWayIds = new HashSet < > ( ) ;
if ( rpcSequential ) {
getFirstRpc ( ) . ifPresent ( processPendingRpc ( context , sessionId , nodeId , sentOneWayIds ) ) ;
getFirstRpc ( ) . ifPresent ( processPendingRpc ( sessionId , nodeId , sentOneWayIds ) ) ;
} else if ( sessionType = = SessionType . ASYNC ) {
toDeviceRpcPendingMap . entrySet ( ) . forEach ( processPendingRpc ( context , sessionId , nodeId , sentOneWayIds ) ) ;
toDeviceRpcPendingMap . entrySet ( ) . forEach ( processPendingRpc ( sessionId , nodeId , sentOneWayIds ) ) ;
} else {
toDeviceRpcPendingMap . entrySet ( ) . stream ( ) . findFirst ( ) . ifPresent ( processPendingRpc ( context , sessionId , nodeId , sentOneWayIds ) ) ;
toDeviceRpcPendingMap . entrySet ( ) . stream ( ) . findFirst ( ) . ifPresent ( processPendingRpc ( sessionId , nodeId , sentOneWayIds ) ) ;
}
sentOneWayIds . stream ( ) . filter ( id - > ! toDeviceRpcPendingMap . get ( id ) . getMsg ( ) . getMsg ( ) . isPersisted ( ) ) . forEach ( toDeviceRpcPendingMap : : remove ) ;
@ -353,35 +366,38 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
return toDeviceRpcPendingMap . entrySet ( ) . stream ( ) . filter ( e - > ! e . getValue ( ) . isDelivered ( ) ) . findFirst ( ) ;
}
private void sendNextPendingRequest ( TbActorCtx context ) {
private void sendNextPendingRequest ( ) {
if ( rpcSequential ) {
rpcSubscriptions . forEach ( ( id , s ) - > sendPendingRequests ( context , id , s . getNodeId ( ) ) ) ;
rpcSubscriptions . forEach ( ( id , s ) - > sendPendingRequests ( id , s . getNodeId ( ) ) ) ;
}
}
private Consumer < Map . Entry < Integer , ToDeviceRpcRequestMetadata > > processPendingRpc ( TbActorCtx context , UUID sessionId , String nodeId , Set < Integer > sentOneWayIds ) {
private Consumer < Map . Entry < Integer , ToDeviceRpcRequestMetadata > > processPendingRpc ( UUID sessionId , String nodeId , Set < Integer > sentOneWayIds ) {
return entry - > {
ToDeviceRpcRequest request = entry . getValue ( ) . getMsg ( ) . getMsg ( ) ;
ToDeviceRpcRequestBody body = request . getBody ( ) ;
Integer requestId = entry . getKey ( ) ;
UUID rpcId = request . getId ( ) ;
if ( request . isOneway ( ) & & ! rpcSequential ) {
sentOneWayIds . add ( entry . getKey ( ) ) ;
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor ( new FromDeviceRpcResponse ( request . get Id ( ) , null , null ) ) ;
sentOneWayIds . add ( requestId ) ;
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor ( new FromDeviceRpcResponse ( rpc Id , null , null ) ) ;
}
ToDeviceRpcRequestMsg rpcRequest = ToDeviceRpcRequestMsg . newBuilder ( )
. setRequestId ( entry . getKey ( ) )
. setRequestId ( requestId )
. setMethodName ( body . getMethod ( ) )
. setParams ( body . getParams ( ) )
. setExpirationTime ( request . getExpirationTime ( ) )
. setRequestIdMSB ( request . get Id ( ) . getMostSignificantBits ( ) )
. setRequestIdLSB ( request . get Id ( ) . getLeastSignificantBits ( ) )
. setRequestIdMSB ( rpc Id . getMostSignificantBits ( ) )
. setRequestIdLSB ( rpc Id . getLeastSignificantBits ( ) )
. setOneway ( request . isOneway ( ) )
. setPersisted ( request . isPersisted ( ) )
. build ( ) ;
log . debug ( "[{}][{}][{}][{}] Send pending RPC request to transport ..." , deviceId , sessionId , rpcId , requestId ) ;
sendToTransport ( rpcRequest , sessionId , nodeId ) ;
} ;
}
void process ( TbActorCtx context , T ransportToDeviceActorMsgWrapper wrapper ) {
void process ( TransportToDeviceActorMsgWrapper wrapper ) {
TransportToDeviceActorMsg msg = wrapper . getMsg ( ) ;
TbCallback callback = wrapper . getCallback ( ) ;
var sessionInfo = msg . getSessionInfo ( ) ;
@ -390,36 +406,36 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
processSessionStateMsgs ( sessionInfo , msg . getSessionEvent ( ) ) ;
}
if ( msg . hasSubscribeToAttributes ( ) ) {
processSubscriptionCommands ( context , sessionInfo , msg . getSubscribeToAttributes ( ) ) ;
processSubscriptionCommands ( sessionInfo , msg . getSubscribeToAttributes ( ) ) ;
}
if ( msg . hasSubscribeToRPC ( ) ) {
processSubscriptionCommands ( context , sessionInfo , msg . getSubscribeToRPC ( ) ) ;
processSubscriptionCommands ( sessionInfo , msg . getSubscribeToRPC ( ) ) ;
}
if ( msg . hasSendPendingRPC ( ) ) {
sendPendingRequests ( context , getSessionId ( sessionInfo ) , sessionInfo . getNodeId ( ) ) ;
sendPendingRequests ( getSessionId ( sessionInfo ) , sessionInfo . getNodeId ( ) ) ;
}
if ( msg . hasGetAttributes ( ) ) {
handleGetAttributesRequest ( context , sessionInfo , msg . getGetAttributes ( ) ) ;
handleGetAttributesRequest ( sessionInfo , msg . getGetAttributes ( ) ) ;
}
if ( msg . hasToDeviceRPCCallResponse ( ) ) {
processRpcResponses ( context , sessionInfo , msg . getToDeviceRPCCallResponse ( ) ) ;
processRpcResponses ( sessionInfo , msg . getToDeviceRPCCallResponse ( ) ) ;
}
if ( msg . hasSubscriptionInfo ( ) ) {
handleSessionActivity ( context , sessionInfo , msg . getSubscriptionInfo ( ) ) ;
handleSessionActivity ( sessionInfo , msg . getSubscriptionInfo ( ) ) ;
}
if ( msg . hasClaimDevice ( ) ) {
handleClaimDeviceMsg ( context , sessionInfo , msg . getClaimDevice ( ) ) ;
handleClaimDeviceMsg ( msg . getClaimDevice ( ) ) ;
}
if ( msg . hasRpcResponseStatusMsg ( ) ) {
processRpcResponseStatus ( context , sessionInfo , msg . getRpcResponseStatusMsg ( ) ) ;
processRpcResponseStatus ( sessionInfo , msg . getRpcResponseStatusMsg ( ) ) ;
}
if ( msg . hasUplinkNotificationMsg ( ) ) {
processUplinkNotificationMsg ( context , sessionInfo , msg . getUplinkNotificationMsg ( ) ) ;
processUplinkNotificationMsg ( sessionInfo , msg . getUplinkNotificationMsg ( ) ) ;
}
callback . onSuccess ( ) ;
}
private void processUplinkNotificationMsg ( TbActorCtx context , SessionInfoProto sessionInfo , TransportProtos . UplinkNotificationMsg uplinkNotificationMsg ) {
private void processUplinkNotificationMsg ( SessionInfoProto sessionInfo , TransportProtos . UplinkNotificationMsg uplinkNotificationMsg ) {
String nodeId = sessionInfo . getNodeId ( ) ;
sessions . entrySet ( ) . stream ( )
. filter ( kv - > kv . getValue ( ) . getSessionInfo ( ) . getNodeId ( ) . equals ( nodeId ) & & ( kv . getValue ( ) . isSubscribedToAttributes ( ) | | kv . getValue ( ) . isSubscribedToRPC ( ) ) )
@ -433,7 +449,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
} ) ;
}
private void handleClaimDeviceMsg ( TbActorCtx context , SessionInfoProto sessionInfo , ClaimDeviceMsg msg ) {
private void handleClaimDeviceMsg ( ClaimDeviceMsg msg ) {
DeviceId deviceId = new DeviceId ( new UUID ( msg . getDeviceIdMSB ( ) , msg . getDeviceIdLSB ( ) ) ) ;
systemContext . getClaimDevicesService ( ) . registerClaimingInfo ( tenantId , deviceId , msg . getSecretKey ( ) , msg . getDurationMs ( ) ) ;
}
@ -446,7 +462,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
systemContext . getDeviceStateService ( ) . onDeviceDisconnect ( tenantId , deviceId ) ;
}
private void handleGetAttributesRequest ( TbActorCtx context , SessionInfoProto sessionInfo , GetAttributeRequestMsg request ) {
private void handleGetAttributesRequest ( SessionInfoProto sessionInfo , GetAttributeRequestMsg request ) {
int requestId = request . getRequestId ( ) ;
if ( request . getOnlyShared ( ) ) {
Futures . addCallback ( findAllAttributesByScope ( DataConstants . SHARED_SCOPE ) , new FutureCallback < > ( ) {
@ -530,7 +546,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
return sessions . containsKey ( sessionId ) ? SessionType . ASYNC : SessionType . SYNC ;
}
void processAttributesUpdate ( TbActorCtx context , DeviceAttributesEventNotificationMsg msg ) {
void processAttributesUpdate ( DeviceAttributesEventNotificationMsg msg ) {
if ( attributeSubscriptions . size ( ) > 0 ) {
boolean hasNotificationData = false ;
AttributeUpdateNotificationMsg . Builder notification = AttributeUpdateNotificationMsg . newBuilder ( ) ;
@ -567,19 +583,21 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
private void processRpcResponses ( TbActorCtx context , SessionInfoProto sessionInfo , ToDeviceRpcResponseMsg responseMsg ) {
private void processRpcResponses ( SessionInfoProto sessionInfo , ToDeviceRpcResponseMsg responseMsg ) {
UUID sessionId = getSessionId ( sessionInfo ) ;
log . debug ( "[{}] Processing rpc command response [{}]" , deviceId , sessionId ) ;
ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap . remove ( responseMsg . getRequestId ( ) ) ;
log . debug ( "[{}][{}] Processing RPC command response: {}" , deviceId , sessionId , responseMsg ) ;
int requestId = responseMsg . getRequestId ( ) ;
ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap . remove ( requestId ) ;
boolean success = requestMd ! = null ;
if ( success ) {
ToDeviceRpcRequest toDeviceRequestMsg = requestMd . getMsg ( ) . getMsg ( ) ;
boolean delivered = requestMd . isDelivered ( ) ;
boolean hasError = StringUtils . isNotEmpty ( responseMsg . getError ( ) ) ;
try {
String payload = hasError ? responseMsg . getError ( ) : responseMsg . getPayload ( ) ;
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor (
new FromDeviceRpcResponse ( requestMd . getMsg ( ) . getMsg ( ) . getId ( ) ,
payload , null ) ) ;
if ( requestMd . getMsg ( ) . getMsg ( ) . isPersisted ( ) ) {
new FromDeviceRpcResponse ( toDeviceRequestMsg . getId ( ) , payload , null ) ) ;
if ( toDeviceRequestMsg . isPersisted ( ) ) {
RpcStatus status = hasError ? RpcStatus . FAILED : RpcStatus . SUCCESSFUL ;
JsonNode response ;
try {
@ -587,28 +605,33 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
} catch ( IllegalArgumentException e ) {
response = JacksonUtil . newObjectNode ( ) . put ( "error" , payload ) ;
}
systemContext . getTbRpcService ( ) . save ( tenantId , new RpcId ( requestMd . getMsg ( ) . getMsg ( ) . getId ( ) ) , status , response ) ;
systemContext . getTbRpcService ( ) . save ( tenantId , new RpcId ( toDeviceRequestMsg . getId ( ) ) , status , response ) ;
}
} finally {
if ( hasError & & ! requestMd . isDelivered ( ) ) {
sendNextPendingRequest ( context ) ;
if ( ! delivered ) {
String errorResponse = hasError ? "error" : "" ;
log . debug ( "[{}][{}][{}] Received {} response for undelivered RPC! Going to send next pending request ..." , deviceId , sessionId , requestId , errorResponse ) ;
sendNextPendingRequest ( ) ;
}
}
} else {
log . debug ( "[{}] Rpc command response [{}] is stale!" , deviceId , responseMsg . getRequestId ( ) ) ;
log . debug ( "[{}][{}][{}] RPC command response is stale!" , deviceId , sessionId , requestId ) ;
}
}
private void processRpcResponseStatus ( TbActorCtx context , SessionInfoProto sessionInfo , ToDeviceRpcResponseStatusMsg responseMsg ) {
private void processRpcResponseStatus ( SessionInfoProto sessionInfo , ToDeviceRpcResponseStatusMsg responseMsg ) {
UUID rpcId = new UUID ( responseMsg . getRequestIdMSB ( ) , responseMsg . getRequestIdLSB ( ) ) ;
RpcStatus status = RpcStatus . valueOf ( responseMsg . getStatus ( ) ) ;
ToDeviceRpcRequestMetadata md = toDeviceRpcPendingMap . get ( responseMsg . getRequestId ( ) ) ;
UUID sessionId = getSessionId ( sessionInfo ) ;
int requestId = responseMsg . getRequestId ( ) ;
log . debug ( "[{}][{}][{}][{}] Processing RPC command response status: [{}]" , deviceId , sessionId , rpcId , requestId , status ) ;
ToDeviceRpcRequestMetadata md = toDeviceRpcPendingMap . get ( requestId ) ;
if ( md ! = null ) {
JsonNode response = null ;
if ( status . equals ( RpcStatus . DELIVERED ) ) {
if ( md . getMsg ( ) . getMsg ( ) . isOneway ( ) ) {
toDeviceRpcPendingMap . remove ( responseMsg . getRe questId ( ) ) ;
toDeviceRpcPendingMap . remove ( requestId ) ;
if ( rpcSequential ) {
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor ( new FromDeviceRpcResponse ( rpcId , null , null ) ) ;
}
@ -619,7 +642,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
Integer maxRpcRetries = md . getMsg ( ) . getMsg ( ) . getRetries ( ) ;
maxRpcRetries = maxRpcRetries = = null ? systemContext . getMaxRpcRetries ( ) : Math . min ( maxRpcRetries , systemContext . getMaxRpcRetries ( ) ) ;
if ( maxRpcRetries < = md . getRetries ( ) ) {
toDeviceRpcPendingMap . remove ( responseMsg . getRe questId ( ) ) ;
toDeviceRpcPendingMap . remove ( requestId ) ;
status = RpcStatus . FAILED ;
response = JacksonUtil . newObjectNode ( ) . put ( "error" , "There was a Timeout and all retry attempts have been exhausted. Retry attempts set: " + maxRpcRetries ) ;
} else {
@ -631,17 +654,18 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
systemContext . getTbRpcService ( ) . save ( tenantId , new RpcId ( rpcId ) , status , response ) ;
}
if ( status ! = RpcStatus . SENT ) {
sendNextPendingRequest ( context ) ;
log . debug ( "[{}][{}][{}][{}] RPC was {}! Going to send next pending request ..." , deviceId , sessionId , rpcId , requestId , status . name ( ) . toLowerCase ( ) ) ;
sendNextPendingRequest ( ) ;
}
} else {
log . info ( "[{}][{}] Rpc has already removed from pending map.", deviceId , rpcId ) ;
log . warn ( "[{}][{}][{}][{}] RPC has already been removed from pending map.", deviceId , sessionId , rpcId , request Id ) ;
}
}
private void processSubscriptionCommands ( TbActorCtx context , SessionInfoProto sessionInfo , SubscribeToAttributeUpdatesMsg subscribeCmd ) {
private void processSubscriptionCommands ( SessionInfoProto sessionInfo , SubscribeToAttributeUpdatesMsg subscribeCmd ) {
UUID sessionId = getSessionId ( sessionInfo ) ;
if ( subscribeCmd . getUnsubscribe ( ) ) {
log . debug ( "[{}] Canceling attributes subscription for session [{}]" , deviceId , sessionId ) ;
log . debug ( "[{}] Canceling attributes subscription for session: [{}]" , deviceId , sessionId ) ;
attributeSubscriptions . remove ( sessionId ) ;
} else {
SessionInfoMetaData sessionMD = sessions . get ( sessionId ) ;
@ -649,7 +673,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
sessionMD = new SessionInfoMetaData ( new SessionInfo ( subscribeCmd . getSessionType ( ) , sessionInfo . getNodeId ( ) ) ) ;
}
sessionMD . setSubscribedToAttributes ( true ) ;
log . debug ( "[{}] Registering attributes subscription for session [{}]" , deviceId , sessionId ) ;
log . debug ( "[{}] Registering attributes subscription for session: [{}]" , deviceId , sessionId ) ;
attributeSubscriptions . put ( sessionId , sessionMD . getSessionInfo ( ) ) ;
dumpSessions ( ) ;
}
@ -659,10 +683,10 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
return new UUID ( sessionInfo . getSessionIdMSB ( ) , sessionInfo . getSessionIdLSB ( ) ) ;
}
private void processSubscriptionCommands ( TbActorCtx context , SessionInfoProto sessionInfo , SubscribeToRPCMsg subscribeCmd ) {
private void processSubscriptionCommands ( SessionInfoProto sessionInfo , SubscribeToRPCMsg subscribeCmd ) {
UUID sessionId = getSessionId ( sessionInfo ) ;
if ( subscribeCmd . getUnsubscribe ( ) ) {
log . debug ( "[{}] Canceling rpc subscription for session [{}]" , deviceId , sessionId ) ;
log . debug ( "[{}] Canceling RPC subscription for session: [{}]" , deviceId , sessionId ) ;
rpcSubscriptions . remove ( sessionId ) ;
} else {
SessionInfoMetaData sessionMD = sessions . get ( sessionId ) ;
@ -670,9 +694,9 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
sessionMD = new SessionInfoMetaData ( new SessionInfo ( subscribeCmd . getSessionType ( ) , sessionInfo . getNodeId ( ) ) ) ;
}
sessionMD . setSubscribedToRPC ( true ) ;
log . debug ( "[{}] Registering rpc subscription for session [{}]" , deviceId , sessionId ) ;
rpcSubscriptions . put ( sessionId , sessionMD . getSessionInfo ( ) ) ;
sendPendingRequests ( context , sessionId , sessionInfo . getNodeId ( ) ) ;
log . debug ( "[{}] Registered RPC subscription for session: [{}] Going to check for pending requests ..." , deviceId , sessionId ) ;
sendPendingRequests ( sessionId , sessionInfo . getNodeId ( ) ) ;
dumpSessions ( ) ;
}
}
@ -682,10 +706,10 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
Objects . requireNonNull ( sessionId ) ;
if ( msg . getEvent ( ) = = SessionEvent . OPEN ) {
if ( sessions . containsKey ( sessionId ) ) {
log . debug ( "[{}] Received duplicate session open event [{}] " , deviceId , sessionId ) ;
log . debug ( "[{}][{}] Received duplicate session open event. " , deviceId , sessionId ) ;
return ;
}
log . debug ( "[{}] Processing new session [{}]. Current sessions size {}" , deviceId , sessionId , sessions . size ( ) ) ;
log . debug ( "[{}] Processing new session: [{}] Current sessions size: {}" , deviceId , sessionId , sessions . size ( ) ) ;
sessions . put ( sessionId , new SessionInfoMetaData ( new SessionInfo ( SessionType . ASYNC , sessionInfo . getNodeId ( ) ) ) ) ;
if ( sessions . size ( ) = = 1 ) {
@ -694,7 +718,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
systemContext . getDeviceStateService ( ) . onDeviceActivity ( tenantId , deviceId , System . currentTimeMillis ( ) ) ;
dumpSessions ( ) ;
} else if ( msg . getEvent ( ) = = SessionEvent . CLOSED ) {
log . debug ( "[{}] Canceling subscriptions for closed session [{}] " , deviceId , sessionId ) ;
log . debug ( "[{}][{}] Canceling subscriptions for closed session. " , deviceId , sessionId ) ;
sessions . remove ( sessionId ) ;
attributeSubscriptions . remove ( sessionId ) ;
rpcSubscriptions . remove ( sessionId ) ;
@ -705,7 +729,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
private void handleSessionActivity ( TbActorCtx context , SessionInfoProto sessionInfoProto , SubscriptionInfoProto subscriptionInfo ) {
private void handleSessionActivity ( SessionInfoProto sessionInfoProto , SubscriptionInfoProto subscriptionInfo ) {
UUID sessionId = getSessionId ( sessionInfoProto ) ;
Objects . requireNonNull ( sessionId ) ;
@ -742,7 +766,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
private void notifyTransportAboutClosedSessionMaxSessionsLimit ( UUID sessionId , SessionInfoMetaData sessionMd ) {
log . debug ( "remove eldest session (max concurrent sessions limit reached per device) sessionId [{}] sessionMd [{}]" , sessionId , sessionMd ) ;
log . debug ( "remove eldest session (max concurrent sessions limit reached per device) sessionId: [{}] sessionMd: [{}]" , sessionId , sessionMd ) ;
notifyTransportAboutClosedSession ( sessionId , sessionMd , "max concurrent sessions limit reached per device!" ) ;
}
@ -806,14 +830,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
systemContext . getTbCoreToTransportService ( ) . process ( nodeId , msg ) ;
}
private void sendToTransport ( ToServerRpcResponseMsg rpcMsg , UUID sessionId , String nodeId ) {
ToTransportMsg msg = ToTransportMsg . newBuilder ( )
. setSessionIdMSB ( sessionId . getMostSignificantBits ( ) )
. setSessionIdLSB ( sessionId . getLeastSignificantBits ( ) )
. setToServerResponse ( rpcMsg ) . build ( ) ;
systemContext . getTbCoreToTransportService ( ) . process ( nodeId , msg ) ;
}
private ListenableFuture < Void > saveRpcRequestToEdgeQueue ( ToDeviceRpcRequest msg , Integer requestId ) {
ObjectNode body = JacksonUtil . newObjectNode ( ) ;
body . put ( "requestId" , requestId ) ;
@ -914,14 +930,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
log . debug ( "[{}] Restored session: {}" , deviceId , sessionMD ) ;
}
log . debug ( "[{}] Restored sessions: {}, rpc subscriptions: {}, attribute subscriptions: {}" , deviceId , sessions . size ( ) , rpcSubscriptions . size ( ) , attributeSubscriptions . size ( ) ) ;
log . debug ( "[{}] Restored sessions: {}, RPC subscriptions: {}, attribute subscriptions: {}" , deviceId , sessions . size ( ) , rpcSubscriptions . size ( ) , attributeSubscriptions . size ( ) ) ;
}
private void dumpSessions ( ) {
if ( systemContext . isLocalCacheType ( ) ) {
return ;
}
log . debug ( "[{}] Dumping sessions: {}, rpc subscriptions: {}, attribute subscriptions: {} to cache" , deviceId , sessions . size ( ) , rpcSubscriptions . size ( ) , attributeSubscriptions . size ( ) ) ;
log . debug ( "[{}] Dumping sessions: {}, RPC subscriptions: {}, attribute subscriptions: {} to cache" , deviceId , sessions . size ( ) , rpcSubscriptions . size ( ) , attributeSubscriptions . size ( ) ) ;
List < SessionSubscriptionInfoProto > sessionsList = new ArrayList < > ( sessions . size ( ) ) ;
sessions . forEach ( ( uuid , sessionMD ) - > {
if ( sessionMD . getSessionInfo ( ) . getType ( ) = = SessionType . SYNC ) {