@ -26,7 +26,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils ;
import org.thingsboard.common.util.JacksonUtil ;
import org.thingsboard.common.util.LinkedHashMapRemoveEldest ;
import org.thingsboard.server.common.data.rpc.RpcError ;
import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg ;
import org.thingsboard.rule.engine.api.msg.DeviceCredentialsUpdateNotificationMsg ;
import org.thingsboard.rule.engine.api.msg.DeviceEdgeUpdateMsg ;
@ -49,9 +48,11 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry ;
import org.thingsboard.server.common.data.page.PageData ;
import org.thingsboard.server.common.data.page.PageLink ;
import org.thingsboard.server.common.data.page.SortOrder ;
import org.thingsboard.server.common.data.relation.EntityRelation ;
import org.thingsboard.server.common.data.relation.RelationTypeGroup ;
import org.thingsboard.server.common.data.rpc.Rpc ;
import org.thingsboard.server.common.data.rpc.RpcError ;
import org.thingsboard.server.common.data.rpc.RpcStatus ;
import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody ;
import org.thingsboard.server.common.data.security.DeviceCredentials ;
@ -59,6 +60,7 @@ import org.thingsboard.server.common.data.security.DeviceCredentialsType;
import org.thingsboard.server.common.msg.TbActorMsg ;
import org.thingsboard.server.common.msg.TbMsgMetaData ;
import org.thingsboard.server.common.msg.queue.TbCallback ;
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse ;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest ;
import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg ;
import org.thingsboard.server.gen.transport.TransportProtos ;
@ -78,15 +80,14 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionType;
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg ;
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg ;
import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionInfoProto ;
import org.thingsboard.server.gen.transport.TransportProtos.ToDevicePersistedRpcResponseMsg ;
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg ;
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg ;
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseStatusMsg ;
import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg ;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg ;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportUpdateCredentialsProto ;
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg ;
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto ;
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse ;
import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg ;
import org.thingsboard.server.service.rpc.RemoveRpcActorMsg ;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg ;
@ -98,9 +99,11 @@ import java.util.Arrays;
import java.util.Collections ;
import java.util.HashMap ;
import java.util.HashSet ;
import java.util.LinkedHashMap ;
import java.util.List ;
import java.util.Map ;
import java.util.Objects ;
import java.util.Optional ;
import java.util.Set ;
import java.util.UUID ;
import java.util.function.Consumer ;
@ -119,6 +122,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
private final Map < UUID , SessionInfo > attributeSubscriptions ;
private final Map < UUID , SessionInfo > rpcSubscriptions ;
private final Map < Integer , ToDeviceRpcRequestMetadata > toDeviceRpcPendingMap ;
private final boolean rpcSequential ;
private int rpcSeq = 0 ;
private String deviceName ;
@ -130,9 +134,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
super ( systemContext ) ;
this . tenantId = tenantId ;
this . deviceId = deviceId ;
this . rpcSequential = systemContext . isRpcSequential ( ) ;
this . attributeSubscriptions = new HashMap < > ( ) ;
this . rpcSubscriptions = new HashMap < > ( ) ;
this . toDeviceRpcPendingMap = new HashMap < > ( ) ;
this . toDeviceRpcPendingMap = new Linked HashMap< > ( ) ;
this . sessions = new LinkedHashMapRemoveEldest < > ( systemContext . getMaxConcurrentSessionsPerDevice ( ) , this : : notifyTransportAboutClosedSessionMaxSessionsLimit ) ;
if ( initAttributes ( ) ) {
restoreSessions ( ) ;
@ -183,19 +188,19 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
if ( timeout < = 0 ) {
log . debug ( "[{}][{}] Ignoring message due to exp time reached, {}" , deviceId , request . getId ( ) , request . getExpirationTime ( ) ) ;
if ( persisted ) {
createRpc ( request , RpcStatus . TIMEOUT ) ;
createRpc ( request , RpcStatus . EXPIRED ) ;
}
return ;
} else if ( persisted ) {
createRpc ( request , RpcStatus . QUEUED ) ;
}
boolean sent ;
boolean sent = false ;
if ( systemContext . isEdgesEnabled ( ) & & edgeId ! = null ) {
log . debug ( "[{}][{}] device is related to edge [{}]. Saving RPC request to edge queue" , tenantId , deviceId , edgeId . getId ( ) ) ;
saveRpcRequestToEdgeQueue ( request , rpcRequest . getRequestId ( ) ) ;
sent = true ;
} else {
} else if ( isSendNewRpcAvailable ( ) ) {
sent = rpcSubscriptions . size ( ) > 0 ;
Set < UUID > syncSessionSet = new HashSet < > ( ) ;
rpcSubscriptions . forEach ( ( key , value ) - > {
@ -227,6 +232,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
}
}
private boolean isSendNewRpcAvailable ( ) {
return ! rpcSequential | | toDeviceRpcPendingMap . values ( ) . stream ( ) . filter ( md - > ! md . isDelivered ( ) ) . findAny ( ) . isEmpty ( ) ;
}
private Rpc createRpc ( ToDeviceRpcRequest request , RpcStatus status ) {
Rpc rpc = new Rpc ( new RpcId ( request . getId ( ) ) ) ;
rpc . setCreatedTime ( System . currentTimeMillis ( ) ) ;
@ -266,16 +275,26 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
void processRemoveRpc ( TbActorCtx context , RemoveRpcActorMsg msg ) {
log . debug ( "[{}] Processing remove rpc command" , msg . getRequestId ( ) ) ;
Integer requestId = null ;
for ( Map . Entry < Integer , ToDeviceRpcRequestMetadata > entry : toDeviceRpcPendingMap . entrySet ( ) ) {
if ( entry . getValue ( ) . getMsg ( ) . getMsg ( ) . getId ( ) . equals ( msg . getRequestId ( ) ) ) {
requestId = entry . getKey ( ) ;
Map . Entry < Integer , ToDeviceRpcRequestMetadata > entry = null ;
for ( Map . Entry < Integer , ToDeviceRpcRequestMetadata > e : toDeviceRpcPendingMap . entrySet ( ) ) {
if ( e . getValue ( ) . getMsg ( ) . getMsg ( ) . getId ( ) . equals ( msg . getRequestId ( ) ) ) {
entry = e ;
break ;
}
}
if ( requestId ! = null ) {
toDeviceRpcPendingMap . remove ( requestId ) ;
if ( entry ! = null ) {
if ( entry . getValue ( ) . isDelivered ( ) ) {
toDeviceRpcPendingMap . remove ( entry . getKey ( ) ) ;
} else {
Optional < Map . Entry < Integer , ToDeviceRpcRequestMetadata > > firstRpc = getFirstRpc ( ) ;
if ( firstRpc . isPresent ( ) & & entry . getKey ( ) . equals ( firstRpc . get ( ) . getKey ( ) ) ) {
toDeviceRpcPendingMap . remove ( entry . getKey ( ) ) ;
sendNextPendingRequest ( context ) ;
} else {
toDeviceRpcPendingMap . remove ( entry . getKey ( ) ) ;
}
}
}
}
@ -290,14 +309,17 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
if ( requestMd ! = null ) {
log . debug ( "[{}] RPC request [{}] timeout detected!" , deviceId , msg . getId ( ) ) ;
if ( requestMd . getMsg ( ) . getMsg ( ) . isPersisted ( ) ) {
systemContext . getTbRpcService ( ) . save ( tenantId , new RpcId ( requestMd . getMsg ( ) . getMsg ( ) . getId ( ) ) , RpcStatus . TIMEOUT , null ) ;
systemContext . getTbRpcService ( ) . save ( tenantId , new RpcId ( requestMd . getMsg ( ) . getMsg ( ) . getId ( ) ) , RpcStatus . EXPIRED , null ) ;
}
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor ( new FromDeviceRpcResponse ( requestMd . getMsg ( ) . getMsg ( ) . getId ( ) ,
null , requestMd . isSent ( ) ? RpcError . TIMEOUT : RpcError . NO_ACTIVE_CONNECTION ) ) ;
if ( ! requestMd . isDelivered ( ) ) {
sendNextPendingRequest ( context ) ;
}
}
}
private void sendPendingRequests ( TbActorCtx context , UUID sessionId , SessionInfoProto sessionInfo ) {
private void sendPendingRequests ( TbActorCtx context , UUID sessionId , String nodeId ) {
SessionType sessionType = getSessionType ( sessionId ) ;
if ( ! toDeviceRpcPendingMap . isEmpty ( ) ) {
log . debug ( "[{}] Pushing {} pending RPC messages to new async session [{}]" , deviceId , toDeviceRpcPendingMap . size ( ) , sessionId ) ;
@ -309,20 +331,33 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
log . debug ( "[{}] No pending RPC messages for new async session [{}]" , deviceId , sessionId ) ;
}
Set < Integer > sentOneWayIds = new HashSet < > ( ) ;
if ( sessionType = = SessionType . ASYNC ) {
toDeviceRpcPendingMap . entrySet ( ) . forEach ( processPendingRpc ( context , sessionId , sessionInfo . getNodeId ( ) , sentOneWayIds ) ) ;
if ( rpcSequential ) {
getFirstRpc ( ) . ifPresent ( processPendingRpc ( context , sessionId , nodeId , sentOneWayIds ) ) ;
} else if ( sessionType = = SessionType . ASYNC ) {
toDeviceRpcPendingMap . entrySet ( ) . forEach ( processPendingRpc ( context , sessionId , nodeId , sentOneWayIds ) ) ;
} else {
toDeviceRpcPendingMap . entrySet ( ) . stream ( ) . findFirst ( ) . ifPresent ( processPendingRpc ( context , sessionId , sessionInfo . getNodeId ( ) , sentOneWayIds ) ) ;
toDeviceRpcPendingMap . entrySet ( ) . stream ( ) . findFirst ( ) . ifPresent ( processPendingRpc ( context , sessionId , nodeId , sentOneWayIds ) ) ;
}
sentOneWayIds . stream ( ) . filter ( id - > ! toDeviceRpcPendingMap . get ( id ) . getMsg ( ) . getMsg ( ) . isPersisted ( ) ) . forEach ( toDeviceRpcPendingMap : : remove ) ;
}
private Optional < Map . Entry < Integer , ToDeviceRpcRequestMetadata > > getFirstRpc ( ) {
return toDeviceRpcPendingMap . entrySet ( ) . stream ( ) . filter ( e - > ! e . getValue ( ) . isDelivered ( ) ) . findFirst ( ) ;
}
private void sendNextPendingRequest ( TbActorCtx context ) {
if ( rpcSequential ) {
rpcSubscriptions . forEach ( ( id , s ) - > sendPendingRequests ( context , id , s . getNodeId ( ) ) ) ;
}
}
private Consumer < Map . Entry < Integer , ToDeviceRpcRequestMetadata > > processPendingRpc ( TbActorCtx context , UUID sessionId , String nodeId , Set < Integer > sentOneWayIds ) {
return entry - > {
ToDeviceRpcRequest request = entry . getValue ( ) . getMsg ( ) . getMsg ( ) ;
ToDeviceRpcRequestBody body = request . getBody ( ) ;
if ( request . isOneway ( ) ) {
if ( request . isOneway ( ) & & ! rpcSequential ) {
sentOneWayIds . add ( entry . getKey ( ) ) ;
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor ( new FromDeviceRpcResponse ( request . getId ( ) , null , null ) ) ;
}
@ -355,7 +390,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
processSubscriptionCommands ( context , sessionInfo , msg . getSubscribeToRPC ( ) ) ;
}
if ( msg . hasSendPendingRPC ( ) ) {
sendPendingRequests ( context , getSessionId ( sessionInfo ) , sessionInfo ) ;
sendPendingRequests ( context , getSessionId ( sessionInfo ) , sessionInfo . getNodeId ( ) ) ;
}
if ( msg . hasGetAttributes ( ) ) {
handleGetAttributesRequest ( context , sessionInfo , msg . getGetAttributes ( ) ) ;
@ -369,8 +404,8 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
if ( msg . hasClaimDevice ( ) ) {
handleClaimDeviceMsg ( context , sessionInfo , msg . getClaimDevice ( ) ) ;
}
if ( msg . hasPersisted RpcResponseMsg ( ) ) {
processPersisted RpcResponses ( context , sessionInfo , msg . getPersisted RpcResponseMsg ( ) ) ;
if ( msg . hasRpcResponseStatus Msg ( ) ) {
processRpcResponseStatu s ( context , sessionInfo , msg . getRpcResponseStatus Msg ( ) ) ;
}
if ( msg . hasUplinkNotificationMsg ( ) ) {
processUplinkNotificationMsg ( context , sessionInfo , msg . getUplinkNotificationMsg ( ) ) ;
@ -530,28 +565,66 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
boolean success = requestMd ! = null ;
if ( success ) {
boolean hasError = StringUtils . isNotEmpty ( responseMsg . getError ( ) ) ;
String payload = hasError ? responseMsg . getError ( ) : responseMsg . getPayload ( ) ;
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor (
new FromDeviceRpcResponse ( requestMd . getMsg ( ) . getMsg ( ) . getId ( ) ,
payload , null ) ) ;
if ( requestMd . getMsg ( ) . getMsg ( ) . isPersisted ( ) ) {
RpcStatus status = hasError ? RpcStatus . FAILED : RpcStatus . SUCCESSFUL ;
JsonNode response ;
try {
response = JacksonUtil . toJsonNode ( payload ) ;
} catch ( IllegalArgumentException e ) {
response = JacksonUtil . newObjectNode ( ) . put ( "error" , payload ) ;
try {
String payload = hasError ? responseMsg . getError ( ) : responseMsg . getPayload ( ) ;
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor (
new FromDeviceRpcResponse ( requestMd . getMsg ( ) . getMsg ( ) . getId ( ) ,
payload , null ) ) ;
if ( requestMd . getMsg ( ) . getMsg ( ) . isPersisted ( ) ) {
RpcStatus status = hasError ? RpcStatus . FAILED : RpcStatus . SUCCESSFUL ;
JsonNode response ;
try {
response = JacksonUtil . toJsonNode ( payload ) ;
} catch ( IllegalArgumentException e ) {
response = JacksonUtil . newObjectNode ( ) . put ( "error" , payload ) ;
}
systemContext . getTbRpcService ( ) . save ( tenantId , new RpcId ( requestMd . getMsg ( ) . getMsg ( ) . getId ( ) ) , status , response ) ;
}
} finally {
if ( hasError ) {
sendNextPendingRequest ( context ) ;
}
systemContext . getTbRpcService ( ) . save ( tenantId , new RpcId ( requestMd . getMsg ( ) . getMsg ( ) . getId ( ) ) , status , response ) ;
}
} else {
log . debug ( "[{}] Rpc command response [{}] is stale!" , deviceId , responseMsg . getRequestId ( ) ) ;
}
}
private void processPersisted RpcResponses ( TbActorCtx context , SessionInfoProto sessionInfo , ToDevicePersisted RpcResponseMsg responseMsg ) {
private void processRpcResponseStatu s ( TbActorCtx context , SessionInfoProto sessionInfo , ToDeviceRpcResponseStatus Msg responseMsg ) {
UUID rpcId = new UUID ( responseMsg . getRequestIdMSB ( ) , responseMsg . getRequestIdLSB ( ) ) ;
systemContext . getTbRpcService ( ) . save ( tenantId , new RpcId ( rpcId ) , RpcStatus . valueOf ( responseMsg . getStatus ( ) ) , null ) ;
RpcStatus status = RpcStatus . valueOf ( responseMsg . getStatus ( ) ) ;
ToDeviceRpcRequestMetadata md = toDeviceRpcPendingMap . get ( responseMsg . getRequestId ( ) ) ;
if ( md ! = null ) {
if ( status . equals ( RpcStatus . DELIVERED ) ) {
if ( md . getMsg ( ) . getMsg ( ) . isOneway ( ) ) {
toDeviceRpcPendingMap . remove ( responseMsg . getRequestId ( ) ) ;
if ( rpcSequential ) {
systemContext . getTbCoreDeviceRpcService ( ) . processRpcResponseFromDeviceActor ( new FromDeviceRpcResponse ( rpcId , null , null ) ) ;
}
} else {
md . setDelivered ( true ) ;
}
} else if ( status . equals ( RpcStatus . TIMEOUT ) ) {
Integer maxRpcRetries = md . getMsg ( ) . getMsg ( ) . getRetries ( ) ;
maxRpcRetries = maxRpcRetries = = null ? systemContext . getMaxRpcRetries ( ) : Math . min ( maxRpcRetries , systemContext . getMaxRpcRetries ( ) ) ;
if ( maxRpcRetries < = md . getRetries ( ) ) {
toDeviceRpcPendingMap . remove ( responseMsg . getRequestId ( ) ) ;
status = RpcStatus . FAILED ;
} else {
md . setRetries ( md . getRetries ( ) + 1 ) ;
}
}
if ( md . getMsg ( ) . getMsg ( ) . isPersisted ( ) ) {
systemContext . getTbRpcService ( ) . save ( tenantId , new RpcId ( rpcId ) , status , null ) ;
}
if ( status ! = RpcStatus . SENT ) {
sendNextPendingRequest ( context ) ;
}
} else {
log . info ( "[{}][{}] Rpc has already removed from pending map." , deviceId , rpcId ) ;
}
}
private void processSubscriptionCommands ( TbActorCtx context , SessionInfoProto sessionInfo , SubscribeToAttributeUpdatesMsg subscribeCmd ) {
@ -588,7 +661,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
sessionMD . setSubscribedToRPC ( true ) ;
log . debug ( "[{}] Registering rpc subscription for session [{}]" , deviceId , sessionId ) ;
rpcSubscriptions . put ( sessionId , sessionMD . getSessionInfo ( ) ) ;
sendPendingRequests ( context , sessionId , sessionInfo ) ;
sendPendingRequests ( context , sessionId , sessionInfo . getNodeId ( ) ) ;
dumpSessions ( ) ;
}
}
@ -625,20 +698,22 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
UUID sessionId = getSessionId ( sessionInfoProto ) ;
Objects . requireNonNull ( sessionId ) ;
SessionInfoMetaData sessionMD = sessions . computeIfAbsent ( sessionId ,
id - > new SessionInfoMetaData ( new SessionInfo ( SessionType . ASYNC , sessionInfoProto . getNodeId ( ) ) , subscriptionInfo . getLastActivityTime ( ) ) ) ;
sessionMD . setLastActivityTime ( subscriptionInfo . getLastActivityTime ( ) ) ;
sessionMD . setSubscribedToAttributes ( subscriptionInfo . getAttribute Subscription ( ) ) ;
sessionMD . setSubscribedToRPC ( subscriptionInfo . getRpcSubscription ( ) ) ;
if ( subscriptionInfo . getAttributeSubscription ( ) ) {
attributeSubscriptions . putIfAbsent ( sessionId , sessionMD . getSessionInfo ( ) ) ;
}
if ( subscriptionInfo . getRpcSubscription ( ) ) {
rpcSubscriptions . putIfAbsent ( sessionId , sessionMD . getSessionInfo ( ) ) ;
SessionInfoMetaData sessionMD = sessions . get ( sessionId ) ;
if ( sessionMD ! = null ) {
sessionMD . setLastActivityTime ( subscriptionInfo . getLastActivityTime ( ) ) ;
sessionMD . setSubscribedToAttributes ( subscriptionInfo . getAttributeSubscription ( ) ) ;
sessionMD . setSubscribedToRPC ( subscriptionInfo . getRpc Subscription ( ) ) ;
if ( subscriptionInfo . getAttributeSubscription ( ) ) {
attributeSubscriptions . putIfAbsent ( sessionId , sessionMD . getSessionInfo ( ) ) ;
}
if ( subscriptionInfo . getRpcSubscription ( ) ) {
rpcSubscriptions . putIfAbsent ( sessionId , sessionMD . getSessionInfo ( ) ) ;
}
}
systemContext . getDeviceStateService ( ) . onDeviceActivity ( tenantId , deviceId , subscriptionInfo . getLastActivityTime ( ) ) ;
dumpSessions ( ) ;
if ( sessionMD ! = null ) {
dumpSessions ( ) ;
}
}
void processCredentialsUpdate ( TbActorMsg msg ) {
@ -856,7 +931,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
void init ( TbActorCtx ctx ) {
schedulePeriodicMsgWithDelay ( ctx , SessionTimeoutCheckMsg . instance ( ) , systemContext . getSessionReportTimeout ( ) , systemContext . getSessionReportTimeout ( ) ) ;
PageLink pageLink = new PageLink ( 1024 ) ;
PageLink pageLink = new PageLink ( 1024 , 0 , null , new SortOrder ( "createdTime" ) ) ;
PageData < Rpc > pageData ;
do {
pageData = systemContext . getTbRpcService ( ) . findAllByDeviceIdAndStatus ( tenantId , deviceId , RpcStatus . QUEUED , pageLink ) ;
@ -864,7 +939,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
ToDeviceRpcRequest msg = JacksonUtil . convertValue ( rpc . getRequest ( ) , ToDeviceRpcRequest . class ) ;
long timeout = rpc . getExpirationTime ( ) - System . currentTimeMillis ( ) ;
if ( timeout < = 0 ) {
rpc . setStatus ( RpcStatus . TIMEOUT ) ;
rpc . setStatus ( RpcStatus . EXPIRED ) ;
systemContext . getTbRpcService ( ) . save ( tenantId , rpc ) ;
} else {
registerPendingRpcRequest ( ctx , new ToDeviceRpcRequestActorMsg ( systemContext . getServiceId ( ) , msg ) , false , creteToDeviceRpcRequestMsg ( msg ) , timeout ) ;