@ -80,6 +80,8 @@ import java.util.Set;
import java.util.UUID ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.ConcurrentMap ;
import java.util.concurrent.atomic.AtomicBoolean ;
import java.util.concurrent.atomic.AtomicInteger ;
import java.util.concurrent.locks.Lock ;
import java.util.concurrent.locks.ReentrantLock ;
import java.util.function.Consumer ;
@ -405,18 +407,34 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
protected void onDeviceTelemetryJson ( int msgId , ByteBuf payload ) throws AdaptorException {
JsonElement json = JsonMqttAdaptor . validateJsonPayload ( sessionId , payload ) ;
validateJsonObject ( json ) ;
for ( Map . Entry < String , JsonElement > deviceEntry : json . getAsJsonObject ( ) . entrySet ( ) ) {
if ( ! deviceEntry . getValue ( ) . isJsonArray ( ) ) {
log . warn ( "{}[{}]" , CAN_T_PARSE_VALUE , json ) ;
continue ;
}
List < Map . Entry < String , JsonElement > > deviceEntries = json . getAsJsonObject ( ) . entrySet ( ) . stream ( )
. filter ( entry - > {
final boolean isArray = entry . getValue ( ) . isJsonArray ( ) ;
if ( ! isArray ) {
log . warn ( "{} device='{}' value={}" , CAN_T_PARSE_VALUE , entry . getKey ( ) , entry . getValue ( ) ) ;
}
return isArray ;
} )
. toList ( ) ;
if ( deviceEntries . isEmpty ( ) ) {
log . debug ( "[{}][{}][{}] Devices telemetry message is empty" , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId ) ;
throw new IllegalArgumentException ( "[" + sessionId + "] Devices telemetry message is empty for [" + gateway . getDeviceId ( ) + "]" ) ;
}
AtomicInteger remaining = new AtomicInteger ( deviceEntries . size ( ) ) ;
AtomicBoolean ackSent = new AtomicBoolean ( false ) ;
for ( Map . Entry < String , JsonElement > deviceEntry : deviceEntries ) {
String deviceName = deviceEntry . getKey ( ) ;
process ( deviceName , deviceCtx - > processPostTelemetryMsg ( deviceCtx , deviceEntry . getValue ( ) , deviceName , msgId ) ,
t - > failedToProcessLog ( deviceName , TELEMETRY , t ) ) ;
process ( deviceName , deviceCtx - > processPostTelemetryMsg ( deviceCtx , deviceEntry . getValue ( ) , deviceName , msgId ,
remaining , ackSent ) ,
t - > processFailure ( msgId , deviceName , TELEMETRY , ackSent , t ) ) ;
}
}
private void processPostTelemetryMsg ( T deviceCtx , JsonElement msg , String deviceName , int msgId ) {
private void processPostTelemetryMsg ( T deviceCtx , JsonElement msg , String deviceName , int msgId , AtomicInteger remaining , AtomicBoolean ackSent ) {
try {
long systemTs = System . currentTimeMillis ( ) ;
TbPair < TransportProtos . PostTelemetryMsg , List < GatewayMetadata > > gatewayPayloadPair = JsonConverter . convertToGatewayTelemetry ( msg . getAsJsonArray ( ) , systemTs ) ;
@ -425,10 +443,10 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
if ( ! CollectionUtils . isEmpty ( metadata ) ) {
gatewayMetricsService . process ( deviceSessionCtx . getSessionInfo ( ) , gateway . getDeviceId ( ) , metadata , systemTs ) ;
}
transportService . process ( deviceCtx . getSessionInfo ( ) , postTelemetryMsg , getPubAckCallback ( channel , deviceName , msgId , postTelemetryMsg ) ) ;
transportService . process ( deviceCtx . getSessionInfo ( ) , postTelemetryMsg , getAggregate PubAckCallback ( channel , msgId , deviceName , postTelemetryMsg , remaining , ackSent ) ) ;
} catch ( Throwable e ) {
log . warn ( "[{}][{}][{}] Failed to convert telemetry: [{}]" , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , deviceName , msg , e ) ;
ackOrClose ( msgId ) ;
ackOrClose ( msgId , ackSent ) ;
}
}
@ -441,23 +459,28 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
throw new IllegalArgumentException ( "[" + sessionId + "] Devices telemetry messages is empty for [" + gateway . getDeviceId ( ) + "]" ) ;
}
AtomicInteger remaining = new AtomicInteger ( deviceMsgList . size ( ) ) ;
AtomicBoolean ackSent = new AtomicBoolean ( false ) ;
deviceMsgList . forEach ( telemetryMsg - > {
String deviceName = checkDeviceName ( telemetryMsg . getDeviceName ( ) ) ;
process ( deviceName , deviceCtx - > processPostTelemetryMsg ( deviceCtx , telemetryMsg . getMsg ( ) , deviceName , msgId ) ,
t - > failedToProcessLog ( deviceName , TELEMETRY , t ) ) ;
process ( deviceName , deviceCtx - > processPostTelemetryMsg ( deviceCtx , telemetryMsg . getMsg ( ) , deviceName , msgId ,
remaining , ackSent ) ,
t - > processFailure ( msgId , deviceName , TELEMETRY , ackSent , t ) ) ;
} ) ;
} catch ( RuntimeException | InvalidProtocolBufferException e ) {
throw new AdaptorException ( e ) ;
}
}
protected void processPostTelemetryMsg ( MqttDeviceAwareSessionContext deviceCtx , TransportProtos . PostTelemetryMsg msg , String deviceName , int msgId ) {
protected void processPostTelemetryMsg ( MqttDeviceAwareSessionContext deviceCtx , TransportProtos . PostTelemetryMsg msg , String deviceName , int msgId ,
AtomicInteger remaining , AtomicBoolean ackSent ) {
try {
TransportProtos . PostTelemetryMsg postTelemetryMsg = ProtoConverter . validatePostTelemetryMsg ( msg . toByteArray ( ) ) ;
transportService . process ( deviceCtx . getSessionInfo ( ) , postTelemetryMsg , getPubAckCallback ( channel , deviceName , msgId , postTelemetryMsg ) ) ;
transportService . process ( deviceCtx . getSessionInfo ( ) , postTelemetryMsg , getAggregate PubAckCallback ( channel , msgId , deviceName , postTelemetryMsg , remaining , ackSent ) ) ;
} catch ( Throwable e ) {
log . warn ( "[{}][{}][{}] Failed to convert telemetry: [{}]" , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , deviceName , msg , e ) ;
ackOrClose ( msgId ) ;
ackOrClose ( msgId , ackSent ) ;
}
}
@ -475,26 +498,42 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
private void onDeviceClaimJson ( int msgId , ByteBuf payload ) throws AdaptorException {
JsonElement json = JsonMqttAdaptor . validateJsonPayload ( sessionId , payload ) ;
validateJsonObject ( json ) ;
for ( Map . Entry < String , JsonElement > deviceEntry : json . getAsJsonObject ( ) . entrySet ( ) ) {
if ( ! deviceEntry . getValue ( ) . isJsonObject ( ) ) {
log . warn ( "{}[{}]" , CAN_T_PARSE_VALUE , json ) ;
continue ;
}
List < Map . Entry < String , JsonElement > > deviceEntries = json . getAsJsonObject ( ) . entrySet ( ) . stream ( )
. filter ( entry - > {
boolean isJsonObject = entry . getValue ( ) . isJsonObject ( ) ;
if ( ! isJsonObject ) {
log . warn ( "{} device='{}' value={}" , CAN_T_PARSE_VALUE , entry . getKey ( ) , entry . getValue ( ) ) ;
}
return isJsonObject ;
} )
. toList ( ) ;
if ( deviceEntries . isEmpty ( ) ) {
log . debug ( "[{}][{}][{}] Devices claim message is empty" , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId ) ;
throw new IllegalArgumentException ( "[" + sessionId + "] Devices claim message is empty for [" + gateway . getDeviceId ( ) + "]" ) ;
}
AtomicInteger remaining = new AtomicInteger ( deviceEntries . size ( ) ) ;
AtomicBoolean ackSent = new AtomicBoolean ( false ) ;
for ( Map . Entry < String , JsonElement > deviceEntry : deviceEntries ) {
String deviceName = deviceEntry . getKey ( ) ;
process ( deviceName , deviceCtx - > processClaimDeviceMsg ( deviceCtx , deviceEntry . getValue ( ) , deviceName , msgId ) ,
t - > failedToProcessLog ( deviceName , CLAIMING , t ) ) ;
process ( deviceName , deviceCtx - > processClaimDeviceMsg ( deviceCtx , deviceEntry . getValue ( ) , deviceName , msgId ,
remaining , ackSent ) ,
t - > processFailure ( msgId , deviceName , CLAIMING , ackSent , t ) ) ;
}
}
private void processClaimDeviceMsg ( MqttDeviceAwareSessionContext deviceCtx , JsonElement claimRequest , String deviceName , int msgId ) {
private void processClaimDeviceMsg ( MqttDeviceAwareSessionContext deviceCtx , JsonElement claimRequest , String deviceName , int msgId ,
AtomicInteger remaining , AtomicBoolean ackSent ) {
try {
DeviceId deviceId = deviceCtx . getDeviceId ( ) ;
TransportProtos . ClaimDeviceMsg claimDeviceMsg = JsonConverter . convertToClaimDeviceProto ( deviceId , claimRequest ) ;
transportService . process ( deviceCtx . getSessionInfo ( ) , claimDeviceMsg , getPubAckCallback ( channel , deviceName , msgId , claimDeviceMsg ) ) ;
transportService . process ( deviceCtx . getSessionInfo ( ) , claimDeviceMsg , getAggregate PubAckCallback ( channel , msgId , deviceName , claimDeviceMsg , remaining , ackSent ) ) ;
} catch ( Throwable e ) {
log . warn ( "[{}][{}][{}] Failed to convert claim message: [{}]" , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , deviceName , claimRequest , e ) ;
ackOrClose ( msgId ) ;
ackOrClose ( msgId , ackSent ) ;
}
}
@ -507,49 +546,70 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
throw new IllegalArgumentException ( "[" + sessionId + "] Devices claim messages is empty for [" + gateway . getDeviceId ( ) + "]" ) ;
}
AtomicInteger remaining = new AtomicInteger ( claimMsgList . size ( ) ) ;
AtomicBoolean ackSent = new AtomicBoolean ( false ) ;
claimMsgList . forEach ( claimDeviceMsg - > {
String deviceName = checkDeviceName ( claimDeviceMsg . getDeviceName ( ) ) ;
process ( deviceName , deviceCtx - > processClaimDeviceMsg ( deviceCtx , claimDeviceMsg . getClaimRequest ( ) , deviceName , msgId ) ,
t - > failedToProcessLog ( deviceName , CLAIMING , t ) ) ;
process ( deviceName , deviceCtx - > processClaimDeviceMsg ( deviceCtx , claimDeviceMsg . getClaimRequest ( ) , deviceName , msgId ,
remaining , ackSent ) ,
t - > processFailure ( msgId , deviceName , CLAIMING , ackSent , t ) ) ;
} ) ;
} catch ( RuntimeException | InvalidProtocolBufferException e ) {
throw new AdaptorException ( e ) ;
}
}
private void processClaimDeviceMsg ( MqttDeviceAwareSessionContext deviceCtx , TransportApiProtos . ClaimDevice claimRequest , String deviceName , int msgId ) {
private void processClaimDeviceMsg ( MqttDeviceAwareSessionContext deviceCtx , TransportApiProtos . ClaimDevice claimRequest , String deviceName , int msgId ,
AtomicInteger remaining , AtomicBoolean ackSent ) {
try {
DeviceId deviceId = deviceCtx . getDeviceId ( ) ;
TransportProtos . ClaimDeviceMsg claimDeviceMsg = ProtoConverter . convertToClaimDeviceProto ( deviceId , claimRequest . toByteArray ( ) ) ;
transportService . process ( deviceCtx . getSessionInfo ( ) , claimDeviceMsg , getPubAckCallback ( channel , deviceName , msgId , claimDeviceMsg ) ) ;
transportService . process ( deviceCtx . getSessionInfo ( ) , claimDeviceMsg , getAggregate PubAckCallback ( channel , msgId , deviceName , claimDeviceMsg , remaining , ackSent ) ) ;
} catch ( Throwable e ) {
log . warn ( "[{}][{}][{}] Failed to convert claim message: [{}]" , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , deviceName , claimRequest , e ) ;
ackOrClose ( msgId ) ;
ackOrClose ( msgId , ackSent ) ;
}
}
private void onDeviceAttributesJson ( int msgId , ByteBuf payload ) throws AdaptorException {
JsonElement json = JsonMqttAdaptor . validateJsonPayload ( sessionId , payload ) ;
validateJsonObject ( json ) ;
for ( Map . Entry < String , JsonElement > deviceEntry : json . getAsJsonObject ( ) . entrySet ( ) ) {
if ( ! deviceEntry . getValue ( ) . isJsonObject ( ) ) {
log . warn ( "{}[{}]" , CAN_T_PARSE_VALUE , json ) ;
continue ;
}
List < Map . Entry < String , JsonElement > > deviceEntries = json . getAsJsonObject ( ) . entrySet ( ) . stream ( )
. filter ( entry - > {
boolean isJsonObject = entry . getValue ( ) . isJsonObject ( ) ;
if ( ! isJsonObject ) {
log . warn ( "{} device='{}' value={}" , CAN_T_PARSE_VALUE , entry . getKey ( ) , entry . getValue ( ) ) ;
}
return isJsonObject ;
} )
. toList ( ) ;
if ( deviceEntries . isEmpty ( ) ) {
log . debug ( "[{}][{}][{}] Devices attribute message is empty" , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId ) ;
throw new IllegalArgumentException ( "[" + sessionId + "] Devices attribute message is empty for [" + gateway . getDeviceId ( ) + "]" ) ;
}
AtomicInteger remaining = new AtomicInteger ( deviceEntries . size ( ) ) ;
AtomicBoolean ackSent = new AtomicBoolean ( false ) ;
for ( Map . Entry < String , JsonElement > deviceEntry : deviceEntries ) {
String deviceName = deviceEntry . getKey ( ) ;
process ( deviceName , deviceCtx - > processPostAttributesMsg ( deviceCtx , deviceEntry . getValue ( ) , deviceName , msgId ) ,
t - > failedToProcessLog ( deviceName , ATTRIBUTE , t ) ) ;
process ( deviceName , deviceCtx - > processPostAttributesMsg ( deviceCtx , deviceEntry . getValue ( ) , deviceName , msgId ,
remaining , ackSent ) ,
t - > processFailure ( msgId , deviceName , ATTRIBUTE , ackSent , t ) ) ;
}
}
private void processPostAttributesMsg ( MqttDeviceAwareSessionContext deviceCtx , JsonElement msg , String deviceName , int msgId ) {
private void processPostAttributesMsg ( MqttDeviceAwareSessionContext deviceCtx , JsonElement msg , String deviceName , int msgId ,
AtomicInteger remaining , AtomicBoolean ackSent ) {
try {
TransportProtos . PostAttributeMsg postAttributeMsg = JsonConverter . convertToAttributesProto ( msg . getAsJsonObject ( ) ) ;
transportService . process ( deviceCtx . getSessionInfo ( ) , postAttributeMsg , getPubAckCallback ( channel , deviceName , msgId , postAttributeMsg ) ) ;
transportService . process ( deviceCtx . getSessionInfo ( ) , postAttributeMsg , getAggregate PubAckCallback ( channel , msgId , deviceName , postAttributeMsg , remaining , ackSent ) ) ;
} catch ( Throwable e ) {
log . warn ( "[{}][{}][{}] Failed to process device attributes command: [{}]" , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , deviceName , msg , e ) ;
ackOrClose ( msgId ) ;
ackOrClose ( msgId , ackSent ) ;
}
}
@ -562,23 +622,28 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
throw new IllegalArgumentException ( "[" + sessionId + "] Devices attributes keys list is empty for [" + gateway . getDeviceId ( ) + "]" ) ;
}
AtomicInteger remaining = new AtomicInteger ( attributesMsgList . size ( ) ) ;
AtomicBoolean ackSent = new AtomicBoolean ( false ) ;
attributesMsgList . forEach ( attributesMsg - > {
String deviceName = checkDeviceName ( attributesMsg . getDeviceName ( ) ) ;
process ( deviceName , deviceCtx - > processPostAttributesMsg ( deviceCtx , attributesMsg . getMsg ( ) , deviceName , msgId ) ,
t - > failedToProcessLog ( deviceName , ATTRIBUTE , t ) ) ;
process ( deviceName , deviceCtx - > processPostAttributesMsg ( deviceCtx , attributesMsg . getMsg ( ) , deviceName , msgId ,
remaining , ackSent ) ,
t - > processFailure ( msgId , deviceName , ATTRIBUTE , ackSent , t ) ) ;
} ) ;
} catch ( RuntimeException | InvalidProtocolBufferException e ) {
throw new AdaptorException ( e ) ;
}
}
protected void processPostAttributesMsg ( MqttDeviceAwareSessionContext deviceCtx , TransportProtos . PostAttributeMsg kvListProto , String deviceName , int msgId ) {
protected void processPostAttributesMsg ( MqttDeviceAwareSessionContext deviceCtx , TransportProtos . PostAttributeMsg kvListProto , String deviceName , int msgId ,
AtomicInteger remaining , AtomicBoolean ackSent ) {
try {
TransportProtos . PostAttributeMsg postAttributeMsg = ProtoConverter . validatePostAttributeMsg ( kvListProto ) ;
transportService . process ( deviceCtx . getSessionInfo ( ) , postAttributeMsg , getPubAckCallback ( channel , deviceName , msgId , postAttributeMsg ) ) ;
transportService . process ( deviceCtx . getSessionInfo ( ) , postAttributeMsg , getAggregate PubAckCallback ( channel , msgId , deviceName , postAttributeMsg , remaining , ackSent ) ) ;
} catch ( Throwable e ) {
log . warn ( "[{}][{}][{}] Failed to process device attributes command: [{}]" , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , deviceName , kvListProto , e ) ;
ackOrClose ( msgId ) ;
ackOrClose ( msgId , ackSent ) ;
}
}
@ -647,27 +712,34 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
}
private void onDeviceRpcResponse ( Integer requestId , String data , String deviceName , int msgId ) {
process ( deviceName , deviceCtx - > processRpcResponseMsg ( deviceCtx , requestId , data , deviceName , msgId ) ,
t - > failedToProcessLog ( deviceName , RPC_RESPONSE , t ) ) ;
AtomicInteger remaining = new AtomicInteger ( 1 ) ;
AtomicBoolean ackSent = new AtomicBoolean ( false ) ;
process ( deviceName , deviceCtx - > processRpcResponseMsg ( deviceCtx , requestId , data , deviceName , msgId , remaining , ackSent ) ,
t - > processFailure ( msgId , deviceName , RPC_RESPONSE , ackSent , t ) ) ;
}
private void processRpcResponseMsg ( MqttDeviceAwareSessionContext deviceCtx , Integer requestId , String data , String deviceName , int msgId ) {
private void processRpcResponseMsg ( MqttDeviceAwareSessionContext deviceCtx , Integer requestId , String data , String deviceName ,
int msgId , AtomicInteger remaining , AtomicBoolean ackSent ) {
TransportProtos . ToDeviceRpcResponseMsg rpcResponseMsg = TransportProtos . ToDeviceRpcResponseMsg . newBuilder ( )
. setRequestId ( requestId ) . setPayload ( data ) . build ( ) ;
transportService . process ( deviceCtx . getSessionInfo ( ) , rpcResponseMsg , getPubAckCallback ( channel , deviceName , msgId , rpcResponseMsg ) ) ;
transportService . process ( deviceCtx . getSessionInfo ( ) , rpcResponseMsg ,
getAggregatePubAckCallback ( channel , msgId , deviceName , rpcResponseMsg , remaining , ackSent ) ) ;
}
private void processGetAttributeRequestMessage ( MqttPublishMessage mqttMsg , String deviceName , TransportProtos . GetAttributeRequestMsg requestMsg ) {
int msgId = getMsgId ( mqttMsg ) ;
process ( deviceName , deviceCtx - > processGetAttributeRequestMessage ( deviceCtx , requestMsg , deviceName , msgId ) ,
t - > {
failedToProcessLog ( deviceName , ATTRIBUTES_REQUEST , t ) ;
ack ( mqttMsg , MqttReasonCodes . PubAck . IMPLEMENTATION_SPECIFIC_ERROR ) ;
} ) ;
AtomicInteger remaining = new AtomicInteger ( 1 ) ;
AtomicBoolean ackSent = new AtomicBoolean ( false ) ;
process ( deviceName , deviceCtx - > {
processGetAttributeRequestMessage ( deviceCtx , requestMsg , deviceName , msgId , remaining , ackSent ) ;
} ,
t - > processFailure ( msgId , deviceName , ATTRIBUTES_REQUEST , ackSent , MqttReasonCodes . PubAck . IMPLEMENTATION_SPECIFIC_ERROR , t ) ) ;
}
private void processGetAttributeRequestMessage ( T deviceCtx , TransportProtos . GetAttributeRequestMsg requestMsg , String deviceName , int msgId ) {
transportService . process ( deviceCtx . getSessionInfo ( ) , requestMsg , getPubAckCallback ( channel , deviceName , msgId , requestMsg ) ) ;
private void processGetAttributeRequestMessage ( T deviceCtx , TransportProtos . GetAttributeRequestMsg requestMsg ,
String deviceName , int msgId , AtomicInteger remaining , AtomicBoolean ackSent ) {
transportService . process ( deviceCtx . getSessionInfo ( ) , requestMsg ,
getAggregatePubAckCallback ( channel , msgId , deviceName , requestMsg , remaining , ackSent ) ) ;
}
private TransportProtos . GetAttributeRequestMsg toGetAttributeRequestMsg ( int requestId , boolean clientScope , Set < String > keys ) {
@ -718,9 +790,11 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
}
}
protected void ackOrClose ( int msgId ) {
protected void ackOrClose ( int msgId , AtomicBoolean ackSent ) {
if ( MqttVersion . MQTT_5 . equals ( deviceSessionCtx . getMqttVersion ( ) ) ) {
ack ( msgId , MqttReasonCodes . PubAck . PAYLOAD_FORMAT_INVALID ) ;
if ( ackSent . compareAndSet ( false , true ) ) {
ack ( msgId , MqttReasonCodes . PubAck . PAYLOAD_FORMAT_INVALID ) ;
}
} else {
channel . close ( ) ;
}
@ -742,23 +816,38 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
keyValueProtoBuilder . setType ( TransportProtos . KeyValueType . STRING_V ) ;
keyValueProtoBuilder . setStringV ( connectionState . name ( ) ) ;
TransportProtos . PostTelemetryMsg postTelemetryMsg = postTelemetryMsgCreated ( keyValueProtoBuilder . build ( ) , ts ) ;
transportService . process ( sessionInfo , postTelemetryMsg , getPubAckCallback ( channel , deviceName , - 1 , postTelemetryMsg ) ) ;
TransportServiceCallback < Void > pubAckCallback = getAggregatePubAckCallback ( channel , - 1 , deviceName , postTelemetryMsg ,
new AtomicInteger ( 1 ) , new AtomicBoolean ( false ) ) ;
transportService . process ( sessionInfo , postTelemetryMsg , pubAckCallback ) ;
}
public ConcurrentMap < String , T > getDevices ( ) {
public ConcurrentMap < String , T > getDevices ( ) {
return this . devices ;
}
private < T > TransportServiceCallback < Void > getPubAckCallback ( final ChannelHandlerContext ctx , final String deviceName , final int msgId , final T msg ) {
protected < T > TransportServiceCallback < Void > getAggregatePubAckCallback (
final ChannelHandlerContext ctx ,
final int msgId ,
final String deviceName ,
final T msg ,
final AtomicInteger remaining ,
final AtomicBoolean ackSent ) {
return new TransportServiceCallback < Void > ( ) {
@Override
public void onSuccess ( Void dummy ) {
log . trace ( "[{}][{}][{}][{}] Published msg: [{}]" , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId , deviceName , msg ) ;
if ( msgId > 0 ) {
ctx . writeAndFlush ( MqttTransportHandler . createMqttPubAckMsg ( deviceSessionCtx , msgId , MqttReasonCodes . PubAck . SUCCESS . byteValue ( ) ) ) ;
} else {
log . trace ( "[{}][{}][{}] Wrong msg id: [{}]" , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId , msg ) ;
ctx . writeAndFlush ( MqttTransportHandler . createMqttPubAckMsg ( deviceSessionCtx , msgId , MqttReasonCodes . PubAck . UNSPECIFIED_ERROR . byteValue ( ) ) ) ;
if ( remaining . decrementAndGet ( ) = = 0 & & ackSent . compareAndSet ( false , true ) ) {
if ( msgId > 0 ) {
ctx . writeAndFlush ( MqttTransportHandler . createMqttPubAckMsg (
deviceSessionCtx , msgId , MqttReasonCodes . PubAck . SUCCESS . byteValue ( ) ) ) ;
} else {
log . trace ( "[{}][{}][{}] Wrong msg id: [{}]" , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId , msgId ) ;
ctx . writeAndFlush ( MqttTransportHandler . createMqttPubAckMsg (
deviceSessionCtx , msgId , MqttReasonCodes . PubAck . UNSPECIFIED_ERROR . byteValue ( ) ) ) ;
}
}
if ( msgId < = 0 ) {
closeDeviceSession ( deviceName , MqttReasonCodes . Disconnect . MALFORMED_PACKET ) ;
}
}
@ -767,11 +856,20 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
public void onError ( Throwable e ) {
log . trace ( "[{}][{}][{}] Failed to publish msg: [{}] for device: [{}]" , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId , msg , deviceName , e ) ;
if ( e instanceof TbRateLimitsException ) {
if ( ackSent . compareAndSet ( false , true ) ) {
ctx . writeAndFlush ( MqttTransportHandler . createMqttPubAckMsg (
deviceSessionCtx , msgId , MqttReasonCodes . PubAck . QUOTA_EXCEEDED . byteValue ( ) ) ) ;
ctx . close ( ) ;
}
closeDeviceSession ( deviceName , MqttReasonCodes . Disconnect . MESSAGE_RATE_TOO_HIGH ) ;
} else {
if ( ackSent . compareAndSet ( false , true ) ) {
ctx . writeAndFlush ( MqttTransportHandler . createMqttPubAckMsg (
deviceSessionCtx , msgId , MqttReasonCodes . PubAck . UNSPECIFIED_ERROR . byteValue ( ) ) ) ;
ctx . close ( ) ;
}
closeDeviceSession ( deviceName , MqttReasonCodes . Disconnect . UNSPECIFIED_ERROR ) ;
}
ctx . close ( ) ;
}
} ;
}
@ -790,10 +888,20 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
}
}
protected void failedToProcessLog ( String deviceName , String msgType , Throwable t ) {
log . debug ( "[{}][{}][{}] Failed to process device {} command: [{}]" , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId , msgType , deviceName , t ) ;
protected void processFailure ( int msgId , String deviceName , String msgType , AtomicBoolean ackSent , Throwable t ) {
if ( DataConstants . MAXIMUM_NUMBER_OF_DEVICES_REACHED . equals ( t . getMessage ( ) ) ) {
processFailure ( msgId , deviceName , msgType , ackSent , MqttReasonCodes . PubAck . QUOTA_EXCEEDED , t ) ;
} else {
processFailure ( msgId , deviceName , msgType , ackSent , MqttReasonCodes . PubAck . UNSPECIFIED_ERROR , t ) ;
}
}
protected void processFailure ( int msgId , String deviceName , String msgType , AtomicBoolean ackSent , MqttReasonCodes . PubAck pubAck , Throwable t ) {
log . debug ( "[{}][{}][{}] Failed to process device {} command: [{}]" , gateway . getTenantId ( ) , gateway . getDeviceId ( ) , sessionId , msgType , deviceName , t ) ;
if ( ackSent . compareAndSet ( false , true ) ) {
ack ( msgId , pubAck ) ;
}
}
private void closeDeviceSession ( String deviceName , MqttReasonCodes . Disconnect returnCode ) {
try {