@ -188,7 +188,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
if ( deviceSessionCtx ! = null ) {
deregisterSession ( deviceName , deviceSessionCtx ) ;
} else {
log . debug ( "[{}] Device [{}] was already removed from the gateway session" , sessionId , deviceName ) ;
log . debug ( "[{}][{}][{}] Device [{}] was already removed from the gateway session" , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId , deviceName ) ;
}
}
@ -205,17 +205,17 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
}
protected void processOnConnect ( MqttPublishMessage msg , String deviceName , String deviceType ) {
log . trace ( "[{}] onDeviceConnect: {}" , sessionId , deviceName ) ;
log . trace ( "[{}][{}][{}] onDeviceConnect: [ {}] " , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId , deviceName ) ;
Futures . addCallback ( onDeviceConnect ( deviceName , deviceType ) , new FutureCallback < > ( ) {
@Override
public void onSuccess ( @Nullable T result ) {
ack ( msg , ReturnCode . SUCCESS ) ;
log . trace ( "[{}] onDeviceConnectOk: {}" , sessionId , deviceName ) ;
log . trace ( "[{}][{}][{}] onDeviceConnectOk: [ {}] " , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId , deviceName ) ;
}
@Override
public void onFailure ( Throwable t ) {
log . warn ( "[{}] Failed to process device connect command: {}" , sessionId , deviceName , t ) ;
log . warn ( "[{}][{}][{}] Failed to process device connect command: [ {}] " , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId , deviceName , t ) ;
}
} , context . getExecutor ( ) ) ;
@ -259,7 +259,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
public void onSuccess ( GetOrCreateDeviceFromGatewayResponse msg ) {
T deviceSessionCtx = newDeviceSessionCtx ( msg ) ;
if ( devices . putIfAbsent ( deviceName , deviceSessionCtx ) = = null ) {
log . trace ( "[{}] First got or created device [{}], type [{}] for the gateway session" , sessionId , deviceName , deviceType ) ;
log . trace ( "[{}][{}][{}] First got or created device [{}], type [{}] for the gateway session" , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId , deviceName , deviceType ) ;
SessionInfoProto deviceSessionInfo = deviceSessionCtx . getSessionInfo ( ) ;
transportService . registerAsyncSession ( deviceSessionInfo , deviceSessionCtx ) ;
transportService . process ( TransportProtos . TransportToDeviceActorMsg . newBuilder ( )
@ -275,7 +275,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
@Override
public void onError ( Throwable e ) {
log . warn ( "[{}] Failed to process device connect command: {}" , sessionId , deviceName , e ) ;
log . warn ( "[{}][{}][{}] Failed to process device connect command at getDeviceCreationFuture : [ {}] " , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId , deviceName , e ) ;
futureToSet . setException ( e ) ;
deviceFutures . remove ( deviceName ) ;
}
@ -348,14 +348,14 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
TransportProtos . PostTelemetryMsg postTelemetryMsg = JsonConverter . convertToTelemetryProto ( deviceEntry . getValue ( ) . getAsJsonArray ( ) ) ;
processPostTelemetryMsg ( deviceCtx , postTelemetryMsg , deviceName , msgId ) ;
} catch ( Throwable e ) {
log . warn ( "[{}][{}] Failed to convert telemetry: {}" , gateway . getDeviceId ( ) , deviceName , deviceEntry . getValue ( ) , e ) ;
log . warn ( "[{}][{}][{}] Failed to convert telemetry: [ {}] " , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , deviceName , deviceEntry . getValue ( ) , e ) ;
channel . close ( ) ;
}
}
@Override
public void onFailure ( Throwable t ) {
log . debug ( "[{}] Failed to process device telemetry command: {}" , sessionId , deviceName , t ) ;
log . debug ( "[{}][{}][{}] Failed to process device telemetry command: [ {}] " , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId , deviceName , t ) ;
}
} , context . getExecutor ( ) ) ;
}
@ -380,19 +380,19 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
TransportProtos . PostTelemetryMsg postTelemetryMsg = ProtoConverter . validatePostTelemetryMsg ( msg . toByteArray ( ) ) ;
processPostTelemetryMsg ( deviceCtx , postTelemetryMsg , deviceName , msgId ) ;
} catch ( Throwable e ) {
log . warn ( "[{}][{}] Failed to convert telemetry: {}" , gateway . getDeviceId ( ) , deviceName , msg , e ) ;
log . warn ( "[{}][{}][{}] Failed to convert telemetry: [ {}] " , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , deviceName , msg , e ) ;
channel . close ( ) ;
}
}
@Override
public void onFailure ( Throwable t ) {
log . debug ( "[{}] Failed to process device telemetry command: {}" , sessionId , deviceName , t ) ;
log . debug ( "[{}][{}][{}] Failed to process device telemetry command: [ {}] " , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId , deviceName , t ) ;
}
} , context . getExecutor ( ) ) ;
} ) ;
} else {
log . debug ( "[{}] Devices telemetry messages is empty for: [{}] " , sessionId , gateway . getDeviceId ( ) ) ;
log . debug ( "[{}][{}][{}] Devices telemetry messages is empty" , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId ) ;
throw new IllegalArgumentException ( "[" + sessionId + "] Devices telemetry messages is empty for [" + gateway . getDeviceId ( ) + "]" ) ;
}
} catch ( RuntimeException | InvalidProtocolBufferException e ) {
@ -433,13 +433,13 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
TransportProtos . ClaimDeviceMsg claimDeviceMsg = JsonConverter . convertToClaimDeviceProto ( deviceId , deviceEntry . getValue ( ) ) ;
processClaimDeviceMsg ( deviceCtx , claimDeviceMsg , deviceName , msgId ) ;
} catch ( Throwable e ) {
log . warn ( "[{}][{}] Failed to convert claim message: {}" , gateway . getDeviceId ( ) , deviceName , deviceEntry . getValue ( ) , e ) ;
log . warn ( "[{}][{}][{}] Failed to convert claim message: [ {}] " , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , deviceName , deviceEntry . getValue ( ) , e ) ;
}
}
@Override
public void onFailure ( Throwable t ) {
log . debug ( "[{}] Failed to process device claiming command: {}" , sessionId , deviceName , t ) ;
log . debug ( "[{}][{}][{}] Failed to process device claiming command: [ {}] " , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId , deviceName , t ) ;
}
} , context . getExecutor ( ) ) ;
}
@ -468,18 +468,18 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
TransportProtos . ClaimDeviceMsg claimDeviceMsg = ProtoConverter . convertToClaimDeviceProto ( deviceId , claimRequest . toByteArray ( ) ) ;
processClaimDeviceMsg ( deviceCtx , claimDeviceMsg , deviceName , msgId ) ;
} catch ( Throwable e ) {
log . warn ( "[{}][{}] Failed to convert claim message: {}" , gateway . getDeviceId ( ) , deviceName , claimRequest , e ) ;
log . warn ( "[{}][{}][{}] Failed to convert claim message: [ {}] " , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , deviceName , claimRequest , e ) ;
}
}
@Override
public void onFailure ( Throwable t ) {
log . debug ( "[{}] Failed to process device claiming command: {}" , sessionId , deviceName , t ) ;
log . debug ( "[{}][{}][{}] Failed to process device claiming command: [ {}] " , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId , deviceName , t ) ;
}
} , context . getExecutor ( ) ) ;
} ) ;
} else {
log . debug ( "[{}] Devices claim messages is empty for: [{}] " , sessionId , gateway . getDeviceId ( ) ) ;
log . debug ( "[{}][{}][{}] Devices claim messages is empty" , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId ) ;
throw new IllegalArgumentException ( "[" + sessionId + "] Devices claim messages is empty for [" + gateway . getDeviceId ( ) + "]" ) ;
}
} catch ( RuntimeException | InvalidProtocolBufferException e ) {
@ -510,7 +510,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
@Override
public void onFailure ( Throwable t ) {
log . debug ( "[{}] Failed to process device attributes command: {}" , sessionId , deviceName , t ) ;
log . debug ( "[{}][{}][{}] Failed to process device attributes command: [ {}] " , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId , deviceName , t ) ;
}
} , context . getExecutor ( ) ) ;
}
@ -538,18 +538,18 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
TransportProtos . PostAttributeMsg postAttributeMsg = ProtoConverter . validatePostAttributeMsg ( kvListProto ) ;
processPostAttributesMsg ( deviceCtx , postAttributeMsg , deviceName , msgId ) ;
} catch ( Throwable e ) {
log . warn ( "[{}][{}] Failed to process device attributes command: {}" , gateway . getDeviceId ( ) , deviceName , kvListProto , e ) ;
log . warn ( "[{}][{}][{}] Failed to process device attributes command: [ {}] " , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , deviceName , kvListProto , e ) ;
}
}
@Override
public void onFailure ( Throwable t ) {
log . debug ( "[{}] Failed to process device attributes command: {}" , sessionId , deviceName , t ) ;
log . debug ( "[{}][{}][{}] Failed to process device attributes command: [ {}] " , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId , deviceName , t ) ;
}
} , context . getExecutor ( ) ) ;
} ) ;
} else {
log . debug ( "[{}] Devices attributes keys list is empty for: [{}] " , sessionId , gateway . getDeviceId ( ) ) ;
log . debug ( "[{}][{}][{}] Devices attributes keys list is empty" , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId ) ;
throw new IllegalArgumentException ( "[" + sessionId + "] Devices attributes keys list is empty for [" + gateway . getDeviceId ( ) + "]" ) ;
}
} catch ( RuntimeException | InvalidProtocolBufferException e ) {
@ -618,7 +618,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
@Override
public void onFailure ( Throwable t ) {
log . debug ( "[{}] Failed to process device Rpc response command: {}" , sessionId , deviceName , t ) ;
log . debug ( "[{}][{}][{}] Failed to process device Rpc response command: [ {}] " , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId , deviceName , t ) ;
}
} , context . getExecutor ( ) ) ;
} else {
@ -643,7 +643,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
@Override
public void onFailure ( Throwable t ) {
log . debug ( "[{}] Failed to process device Rpc response command: {}" , sessionId , deviceName , t ) ;
log . debug ( "[{}][{}][{}] Failed to process device Rpc response command: [ {}] " , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId , deviceName , t ) ;
}
} , context . getExecutor ( ) ) ;
} catch ( RuntimeException | InvalidProtocolBufferException e ) {
@ -667,7 +667,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
@Override
public void onFailure ( Throwable t ) {
ack ( mqttMsg , ReturnCode . IMPLEMENTATION_SPECIFIC ) ;
log . debug ( "[{}] Failed to process device attributes request command: {}" , sessionId , deviceName , t ) ;
log . debug ( "[{}][{}][{}] Failed to process device attributes request command: [ {}] " , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId , deviceName , t ) ;
}
} , context . getExecutor ( ) ) ;
}
@ -687,7 +687,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
protected ListenableFuture < T > checkDeviceConnected ( String deviceName ) {
T ctx = devices . get ( deviceName ) ;
if ( ctx = = null ) {
log . debug ( "[{}] Missing device [{}] for the gateway session" , sessionId , deviceName ) ;
log . debug ( "[{}][{}][{}] Missing device [{}] for the gateway session" , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId , deviceName ) ;
return onDeviceConnect ( deviceName , DEFAULT_DEVICE_TYPE ) ;
} else {
return Futures . immediateFuture ( ctx ) ;
@ -733,7 +733,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
}
transportService . deregisterSession ( deviceSessionCtx . getSessionInfo ( ) ) ;
transportService . process ( deviceSessionCtx . getSessionInfo ( ) , SESSION_EVENT_MSG_CLOSED , null ) ;
log . debug ( "[{}] Removed device [{}] from the gateway session" , sessionId , deviceName ) ;
log . debug ( "[{}][{}][{}] Removed device [{}] from the gateway session" , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId , deviceName ) ;
}
public void sendSparkplugStateOnTelemetry ( TransportProtos . SessionInfoProto sessionInfo , String deviceName , SparkplugConnectionState connectionState , long ts ) {
@ -749,7 +749,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
return new TransportServiceCallback < Void > ( ) {
@Override
public void onSuccess ( Void dummy ) {
log . trace ( "[{}][{}] Published msg: {}" , sessionId , deviceName , msg ) ;
log . trace ( "[{}][{}][{}][{}] Published msg: [ {}] " , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId , deviceName , msg ) ;
if ( msgId > 0 ) {
ctx . writeAndFlush ( MqttTransportHandler . createMqttPubAckMsg ( deviceSessionCtx , msgId , ReturnCode . SUCCESS ) ) ;
}
@ -757,7 +757,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
@Override
public void onError ( Throwable e ) {
log . trace ( "[{}] Failed to publish msg: {} for device: {}" , sessionId , msg , deviceName , e ) ;
log . trace ( "[{}][{}][{}] Failed to publish msg: [ {}] for device: [ {}] " , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId , msg , deviceName , e ) ;
ctx . close ( ) ;
}
} ;