@ -29,25 +29,35 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.DeviceProfile ;
import org.thingsboard.server.common.data.DeviceTransportType ;
import org.thingsboard.server.common.data.EntityType ;
import org.thingsboard.server.common.data.Tenant ;
import org.thingsboard.server.common.data.id.DeviceId ;
import org.thingsboard.server.common.data.id.DeviceProfileId ;
import org.thingsboard.server.common.data.id.RuleChainId ;
import org.thingsboard.server.common.data.id.TenantId ;
import org.thingsboard.server.common.data.id.TenantProfileId ;
import org.thingsboard.server.common.msg.TbMsg ;
import org.thingsboard.server.common.msg.TbMsgMetaData ;
import org.thingsboard.server.common.msg.queue.ServiceQueue ;
import org.thingsboard.server.common.msg.queue.ServiceType ;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo ;
import org.thingsboard.server.common.msg.session.SessionMsgType ;
import org.thingsboard.server.common.msg.tools.TbRateLimits ;
import org.thingsboard.server.common.msg.tools.TbRateLimitsException ;
import org.thingsboard.server.common.stats.MessagesStats ;
import org.thingsboard.server.common.stats.StatsFactory ;
import org.thingsboard.server.common.stats.StatsType ;
import org.thingsboard.server.common.transport.SessionMsgListener ;
import org.thingsboard.server.common.transport.TransportProfileCache ;
import org.thingsboard.server.common.transport.TransportDevice ProfileCache ;
import org.thingsboard.server.common.transport.TransportService ;
import org.thingsboard.server.common.transport.TransportServiceCallback ;
import org.thingsboard.server.common.transport.TransportTenantProfileCache ;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse ;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo ;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse ;
import org.thingsboard.server.common.transport.limits.TransportRateLimit ;
import org.thingsboard.server.common.transport.limits.TransportRateLimitService ;
import org.thingsboard.server.common.transport.limits.TransportRateLimitType ;
import org.thingsboard.server.common.transport.profile.TenantProfileUpdateResult ;
import org.thingsboard.server.common.transport.util.DataDecodingEncodingService ;
import org.thingsboard.server.common.transport.util.JsonUtils ;
import org.thingsboard.server.gen.transport.TransportProtos ;
import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceRequestMsg ;
@ -69,15 +79,13 @@ import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider ;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider ;
import org.thingsboard.server.queue.provider.TbTransportQueueFactory ;
import org.thingsboard.server.common.stats.MessagesStats ;
import org.thingsboard.server.common.stats.StatsFactory ;
import org.thingsboard.server.common.stats.StatsType ;
import javax.annotation.PostConstruct ;
import javax.annotation.PreDestroy ;
import java.util.Collections ;
import java.util.List ;
import java.util.Map ;
import java.util.Optional ;
import java.util.Random ;
import java.util.UUID ;
import java.util.concurrent.ConcurrentHashMap ;
@ -98,12 +106,6 @@ import java.util.concurrent.atomic.AtomicInteger;
@ConditionalOnExpression ( "('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true') || '${service.type:null}'=='tb-transport'" )
public class DefaultTransportService implements TransportService {
@Value ( "${transport.rate_limits.enabled}" )
private boolean rateLimitEnabled ;
@Value ( "${transport.rate_limits.tenant}" )
private String perTenantLimitsConf ;
@Value ( "${transport.rate_limits.device}" )
private String perDevicesLimitsConf ;
@Value ( "${transport.sessions.inactivity_timeout}" )
private long sessionInactivityTimeout ;
@Value ( "${transport.sessions.report_timeout}" )
@ -119,7 +121,10 @@ public class DefaultTransportService implements TransportService {
private final PartitionService partitionService ;
private final TbServiceInfoProvider serviceInfoProvider ;
private final StatsFactory statsFactory ;
private final TransportProfileCache transportProfileCache ;
private final TransportDeviceProfileCache deviceProfileCache ;
private final TransportTenantProfileCache tenantProfileCache ;
private final TransportRateLimitService rateLimitService ;
private final DataDecodingEncodingService dataDecodingEncodingService ;
protected TbQueueRequestTemplate < TbProtoQueueMsg < TransportApiRequestMsg > , TbProtoQueueMsg < TransportApiResponseMsg > > transportApiRequestTemplate ;
protected TbQueueProducer < TbProtoQueueMsg < ToRuleEngineMsg > > ruleEngineMsgProducer ;
@ -132,14 +137,11 @@ public class DefaultTransportService implements TransportService {
protected ScheduledExecutorService schedulerExecutor ;
protected ExecutorService transportCallbackExecutor ;
private ExecutorService mainConsumerExecutor ;
private final ConcurrentMap < UUID , SessionMetaData > sessions = new ConcurrentHashMap < > ( ) ;
private final Map < String , RpcRequestMetadata > toServerRpcPendingMap = new ConcurrentHashMap < > ( ) ;
//TODO 3.2: @ybondarenko Implement cleanup of this maps.
private final ConcurrentMap < TenantId , TbRateLimits > perTenantLimits = new ConcurrentHashMap < > ( ) ;
private final ConcurrentMap < DeviceId , TbRateLimits > perDeviceLimits = new ConcurrentHashMap < > ( ) ;
private ExecutorService mainConsumerExecutor = Executors . newSingleThreadExecutor ( ThingsBoardThreadFactory . forName ( "transport-consumer" ) ) ;
private volatile boolean stopped = false ;
public DefaultTransportService ( TbServiceInfoProvider serviceInfoProvider ,
@ -147,22 +149,22 @@ public class DefaultTransportService implements TransportService {
TbQueueProducerProvider producerProvider ,
PartitionService partitionService ,
StatsFactory statsFactory ,
TransportProfileCache transportProfileCache ) {
TransportDeviceProfileCache deviceProfileCache ,
TransportTenantProfileCache tenantProfileCache ,
TransportRateLimitService rateLimitService , DataDecodingEncodingService dataDecodingEncodingService ) {
this . serviceInfoProvider = serviceInfoProvider ;
this . queueProvider = queueProvider ;
this . producerProvider = producerProvider ;
this . partitionService = partitionService ;
this . statsFactory = statsFactory ;
this . transportProfileCache = transportProfileCache ;
this . deviceProfileCache = deviceProfileCache ;
this . tenantProfileCache = tenantProfileCache ;
this . rateLimitService = rateLimitService ;
this . dataDecodingEncodingService = dataDecodingEncodingService ;
}
@PostConstruct
public void init ( ) {
if ( rateLimitEnabled ) {
//Just checking the configuration parameters
new TbRateLimits ( perTenantLimitsConf ) ;
new TbRateLimits ( perDevicesLimitsConf ) ;
}
this . ruleEngineProducerStats = statsFactory . createMessagesStats ( StatsType . RULE_ENGINE . getName ( ) + ".producer" ) ;
this . tbCoreProducerStats = statsFactory . createMessagesStats ( StatsType . CORE . getName ( ) + ".producer" ) ;
this . transportApiStats = statsFactory . createMessagesStats ( StatsType . TRANSPORT . getName ( ) + ".producer" ) ;
@ -177,6 +179,7 @@ public class DefaultTransportService implements TransportService {
TopicPartitionInfo tpi = partitionService . getNotificationsTopic ( ServiceType . TB_TRANSPORT , serviceInfoProvider . getServiceId ( ) ) ;
transportNotificationsConsumer . subscribe ( Collections . singleton ( tpi ) ) ;
transportApiRequestTemplate . init ( ) ;
mainConsumerExecutor = Executors . newSingleThreadExecutor ( ThingsBoardThreadFactory . forName ( "transport-consumer" ) ) ;
mainConsumerExecutor . execute ( ( ) - > {
while ( ! stopped ) {
try {
@ -208,10 +211,6 @@ public class DefaultTransportService implements TransportService {
@PreDestroy
public void destroy ( ) {
if ( rateLimitEnabled ) {
perTenantLimits . clear ( ) ;
perDeviceLimits . clear ( ) ;
}
stopped = true ;
if ( transportNotificationsConsumer ! = null ) {
@ -232,7 +231,7 @@ public class DefaultTransportService implements TransportService {
}
@Override
public ScheduledExecutorService getSchedulerExecutor ( ) {
public ScheduledExecutorService getSchedulerExecutor ( ) {
return this . schedulerExecutor ;
}
@ -242,12 +241,12 @@ public class DefaultTransportService implements TransportService {
}
@Override
public TransportProtos . GetTenantRoutingInfo ResponseMsg getRoutingInfo ( TransportProtos . GetTenantRoutingInfo RequestMsg msg ) {
public TransportProtos . GetEntityProfile ResponseMsg getRoutingInfo ( TransportProtos . GetEntityProfile RequestMsg msg ) {
TbProtoQueueMsg < TransportProtos . TransportApiRequestMsg > protoMsg =
new TbProtoQueueMsg < > ( UUID . randomUUID ( ) , TransportProtos . TransportApiRequestMsg . newBuilder ( ) . setGetTenantRoutingInfo RequestMsg ( msg ) . build ( ) ) ;
new TbProtoQueueMsg < > ( UUID . randomUUID ( ) , TransportProtos . TransportApiRequestMsg . newBuilder ( ) . setEntityProfile RequestMsg ( msg ) . build ( ) ) ;
try {
TbProtoQueueMsg < TransportApiResponseMsg > response = transportApiRequestTemplate . send ( protoMsg ) . get ( ) ;
return response . getValue ( ) . getGetTenantRoutingInfo ResponseMsg ( ) ;
return response . getValue ( ) . getEntityProfile ResponseMsg ( ) ;
} catch ( InterruptedException | ExecutionException e ) {
throw new RuntimeException ( e ) ;
}
@ -289,7 +288,7 @@ public class DefaultTransportService implements TransportService {
result . deviceInfo ( tdi ) ;
ByteString profileBody = msg . getProfileBody ( ) ;
if ( profileBody ! = null & & ! profileBody . isEmpty ( ) ) {
DeviceProfile profile = transport ProfileCache. getOrCreate ( tdi . getDeviceProfileId ( ) , profileBody ) ;
DeviceProfile profile = device ProfileCache. getOrCreate ( tdi . getDeviceProfileId ( ) , profileBody ) ;
if ( transportType ! = DeviceTransportType . DEFAULT
& & profile ! = null & & profile . getTransportType ( ) ! = DeviceTransportType . DEFAULT & & profile . getTransportType ( ) ! = transportType ) {
log . debug ( "[{}] Device profile [{}] has different transport type: {}, expected: {}" , tdi . getDeviceId ( ) , tdi . getDeviceProfileId ( ) , profile . getTransportType ( ) , transportType ) ;
@ -315,7 +314,7 @@ public class DefaultTransportService implements TransportService {
result . deviceInfo ( tdi ) ;
ByteString profileBody = msg . getProfileBody ( ) ;
if ( profileBody ! = null & & ! profileBody . isEmpty ( ) ) {
result . deviceProfile ( transport ProfileCache. getOrCreate ( tdi . getDeviceProfileId ( ) , profileBody ) ) ;
result . deviceProfile ( device ProfileCache. getOrCreate ( tdi . getDeviceProfileId ( ) , profileBody ) ) ;
}
}
return result . build ( ) ;
@ -339,8 +338,8 @@ public class DefaultTransportService implements TransportService {
log . trace ( "Processing msg: {}" , requestMsg ) ;
TbProtoQueueMsg < TransportApiRequestMsg > protoMsg = new TbProtoQueueMsg < > ( UUID . randomUUID ( ) , TransportApiRequestMsg . newBuilder ( ) . setProvisionDeviceRequestMsg ( requestMsg ) . build ( ) ) ;
ListenableFuture < ProvisionDeviceResponseMsg > response = Futures . transform ( transportApiRequestTemplate . send ( protoMsg ) , tmp - >
tmp . getValue ( ) . getProvisionDeviceResponseMsg ( )
, MoreExecutors . directExecutor ( ) ) ;
tmp . getValue ( ) . getProvisionDeviceResponseMsg ( )
, MoreExecutors . directExecutor ( ) ) ;
AsyncCallbackTemplate . withCallback ( response , callback : : onSuccess , callback : : onError , transportCallbackExecutor ) ;
}
@ -580,12 +579,11 @@ public class DefaultTransportService implements TransportService {
if ( log . isTraceEnabled ( ) ) {
log . trace ( "[{}] Processing msg: {}" , toSessionId ( sessionInfo ) , msg ) ;
}
if ( ! rateLimitEnabled ) {
return true ;
}
TenantId tenantId = new TenantId ( new UUID ( sessionInfo . getTenantIdMSB ( ) , sessionInfo . getTenantIdLSB ( ) ) ) ;
TbRateLimits rateLimits = perTenantLimits . computeIfAbsent ( tenantId , id - > new TbRateLimits ( perTenantLimitsConf ) ) ;
if ( ! rateLimits . tryConsume ( ) ) {
TransportRateLimit tenantRateLimit = rateLimitService . getRateLimit ( tenantId , TransportRateLimitType . TENANT_MAX_MSGS ) ;
if ( ! tenantRateLimit . tryConsume ( ) ) {
if ( callback ! = null ) {
callback . onError ( new TbRateLimitsException ( EntityType . TENANT ) ) ;
}
@ -595,8 +593,8 @@ public class DefaultTransportService implements TransportService {
return false ;
}
DeviceId deviceId = new DeviceId ( new UUID ( sessionInfo . getDeviceIdMSB ( ) , sessionInfo . getDeviceIdLSB ( ) ) ) ;
rateLimits = perDeviceLimits . computeIfAbsent ( deviceId , id - > new TbRateLimits ( perDevicesLimitsConf ) ) ;
if ( ! rateLimits . tryConsume ( ) ) {
TransportRateLimit deviceRateLimit = rateLimitService . getRateLimit ( tenantId , deviceId , TransportRateLimitType . DEVICE_MAX_MSGS ) ;
if ( ! deviceRateLimit . tryConsume ( ) ) {
if ( callback ! = null ) {
callback . onError ( new TbRateLimitsException ( EntityType . DEVICE ) ) ;
}
@ -637,16 +635,40 @@ public class DefaultTransportService implements TransportService {
deregisterSession ( md . getSessionInfo ( ) ) ;
}
} else {
if ( toSessionMsg . hasDeviceProfileUpdateMsg ( ) ) {
DeviceProfile deviceProfile = transportProfileCache . put ( toSessionMsg . getDeviceProfileUpdateMsg ( ) . getData ( ) ) ;
if ( deviceProfile ! = null ) {
onProfileUpdate ( deviceProfile ) ;
if ( toSessionMsg . hasEntityUpdateMsg ( ) ) {
TransportProtos . EntityUpdateMsg msg = toSessionMsg . getEntityUpdateMsg ( ) ;
EntityType entityType = EntityType . valueOf ( msg . getEntityType ( ) ) ;
if ( EntityType . DEVICE_PROFILE . equals ( entityType ) ) {
DeviceProfile deviceProfile = deviceProfileCache . put ( msg . getData ( ) ) ;
if ( deviceProfile ! = null ) {
onProfileUpdate ( deviceProfile ) ;
}
} else if ( EntityType . TENANT_PROFILE . equals ( entityType ) ) {
TenantProfileUpdateResult update = tenantProfileCache . put ( msg . getData ( ) ) ;
rateLimitService . update ( update ) ;
} else if ( EntityType . TENANT . equals ( entityType ) ) {
Optional < Tenant > profileOpt = dataDecodingEncodingService . decode ( msg . getData ( ) . toByteArray ( ) ) ;
if ( profileOpt . isPresent ( ) ) {
Tenant tenant = profileOpt . get ( ) ;
boolean updated = tenantProfileCache . put ( tenant . getId ( ) , tenant . getTenantProfileId ( ) ) ;
if ( updated ) {
rateLimitService . update ( tenant . getId ( ) ) ;
}
}
}
} else if ( toSessionMsg . hasEntityDeleteMsg ( ) ) {
TransportProtos . EntityDeleteMsg msg = toSessionMsg . getEntityDeleteMsg ( ) ;
EntityType entityType = EntityType . valueOf ( msg . getEntityType ( ) ) ;
UUID entityUuid = new UUID ( msg . getEntityIdMSB ( ) , msg . getEntityIdLSB ( ) ) ;
if ( EntityType . DEVICE_PROFILE . equals ( entityType ) ) {
deviceProfileCache . evict ( new DeviceProfileId ( new UUID ( msg . getEntityIdMSB ( ) , msg . getEntityIdLSB ( ) ) ) ) ;
} else if ( EntityType . TENANT_PROFILE . equals ( entityType ) ) {
tenantProfileCache . remove ( new TenantProfileId ( entityUuid ) ) ;
} else if ( EntityType . TENANT . equals ( entityType ) ) {
rateLimitService . remove ( new TenantId ( entityUuid ) ) ;
} else if ( EntityType . DEVICE . equals ( entityType ) ) {
rateLimitService . remove ( new DeviceId ( entityUuid ) ) ;
}
} else if ( toSessionMsg . hasDeviceProfileDeleteMsg ( ) ) {
transportProfileCache . evict ( new DeviceProfileId ( new UUID (
toSessionMsg . getDeviceProfileDeleteMsg ( ) . getProfileIdMSB ( ) ,
toSessionMsg . getDeviceProfileDeleteMsg ( ) . getProfileIdLSB ( )
) ) ) ;
} else {
//TODO: should we notify the device actor about missed session?
log . debug ( "[{}] Missing session." , sessionId ) ;
@ -654,38 +676,6 @@ public class DefaultTransportService implements TransportService {
}
}
@Override
public void getDeviceProfile ( DeviceProfileId deviceProfileId , TransportServiceCallback < DeviceProfile > callback ) {
DeviceProfile deviceProfile = transportProfileCache . get ( deviceProfileId ) ;
if ( deviceProfile ! = null ) {
callback . onSuccess ( deviceProfile ) ;
} else {
log . trace ( "Processing device profile request: [{}]" , deviceProfileId ) ;
TransportProtos . GetDeviceProfileRequestMsg msg = TransportProtos . GetDeviceProfileRequestMsg . newBuilder ( )
. setProfileIdMSB ( deviceProfileId . getId ( ) . getMostSignificantBits ( ) )
. setProfileIdLSB ( deviceProfileId . getId ( ) . getLeastSignificantBits ( ) )
. build ( ) ;
TbProtoQueueMsg < TransportApiRequestMsg > protoMsg = new TbProtoQueueMsg < > ( UUID . randomUUID ( ) ,
TransportApiRequestMsg . newBuilder ( ) . setGetDeviceProfileRequestMsg ( msg ) . build ( ) ) ;
AsyncCallbackTemplate . withCallback ( transportApiRequestTemplate . send ( protoMsg ) ,
response - > {
ByteString devProfileBody = response . getValue ( ) . getGetDeviceProfileResponseMsg ( ) . getData ( ) ;
if ( devProfileBody ! = null & & ! devProfileBody . isEmpty ( ) ) {
DeviceProfile profile = transportProfileCache . put ( devProfileBody ) ;
if ( profile ! = null ) {
callback . onSuccess ( profile ) ;
} else {
log . warn ( "Failed to decode device profile: {}" , devProfileBody ) ;
callback . onError ( new IllegalArgumentException ( "Failed to decode device profile!" ) ) ;
}
} else {
log . warn ( "Failed to find device profile: [{}]" , deviceProfileId ) ;
callback . onError ( new IllegalArgumentException ( "Failed to find device profile!" ) ) ;
}
} , callback : : onError , transportCallbackExecutor ) ;
}
}
@Override
public void onProfileUpdate ( DeviceProfile deviceProfile ) {
long deviceProfileIdMSB = deviceProfile . getId ( ) . getId ( ) . getMostSignificantBits ( ) ;
@ -750,7 +740,7 @@ public class DefaultTransportService implements TransportService {
private RuleChainId resolveRuleChainId ( TransportProtos . SessionInfoProto sessionInfo ) {
DeviceProfileId deviceProfileId = new DeviceProfileId ( new UUID ( sessionInfo . getDeviceProfileIdMSB ( ) , sessionInfo . getDeviceProfileIdLSB ( ) ) ) ;
DeviceProfile deviceProfile = transport ProfileCache. get ( deviceProfileId ) ;
DeviceProfile deviceProfile = device ProfileCache. get ( deviceProfileId ) ;
RuleChainId ruleChainId ;
if ( deviceProfile = = null ) {
log . warn ( "[{}] Device profile is null!" , deviceProfileId ) ;
@ -779,7 +769,7 @@ public class DefaultTransportService implements TransportService {
}
}
private class StatsCallback implements TbQueueCallback {
private static class StatsCallback implements TbQueueCallback {
private final TbQueueCallback callback ;
private final MessagesStats stats ;