@ -20,12 +20,16 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode ;
import com.google.common.util.concurrent.Futures ;
import com.google.common.util.concurrent.ListenableFuture ;
import com.google.common.util.concurrent.ListeningExecutorService ;
import com.google.common.util.concurrent.MoreExecutors ;
import com.google.protobuf.ByteString ;
import lombok.RequiredArgsConstructor ;
import lombok.extern.slf4j.Slf4j ;
import org.springframework.beans.factory.annotation.Value ;
import org.springframework.stereotype.Service ;
import org.springframework.util.ConcurrentReferenceHashMap ;
import org.thingsboard.common.util.JacksonUtil ;
import org.thingsboard.common.util.ThingsBoardExecutors ;
import org.thingsboard.server.cache.ota.OtaPackageDataCache ;
import org.thingsboard.server.cluster.TbClusterService ;
import org.thingsboard.server.common.data.ApiUsageState ;
@ -104,10 +108,11 @@ import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.profile.TbDeviceProfileCache ;
import org.thingsboard.server.service.resource.TbResourceService ;
import javax.annotation.PostConstruct ;
import javax.annotation.PreDestroy ;
import java.util.List ;
import java.util.Optional ;
import java.util.UUID ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.ConcurrentMap ;
import java.util.concurrent.locks.Lock ;
import java.util.concurrent.locks.ReentrantLock ;
@ -144,12 +149,28 @@ public class DefaultTransportApiService implements TransportApiService {
private final OtaPackageDataCache otaPackageDataCache ;
private final QueueService queueService ;
private final ConcurrentMap < String , ReentrantLock > deviceCreationLocks = new ConcurrentHashMap < > ( ) ;
private final ConcurrentMap < String , ReentrantLock > deviceCreationLocks = new ConcurrentReferenceHashMap < > ( 16 , ConcurrentReferenceHashMap . ReferenceType . WEAK ) ;
@Value ( "${queue.transport_api.max_core_handler_threads:16}" )
private int maxCoreHandlerThreads ;
ListeningExecutorService handlerExecutor ;
private static boolean checkIsMqttCredentials ( DeviceCredentials credentials ) {
return credentials ! = null & & DeviceCredentialsType . MQTT_BASIC . equals ( credentials . getCredentialsType ( ) ) ;
}
@PostConstruct
public void init ( ) {
handlerExecutor = MoreExecutors . listeningDecorator ( ThingsBoardExecutors . newWorkStealingPool ( maxCoreHandlerThreads , "transport-api-service-core-handler" ) ) ;
}
@PreDestroy
public void destroy ( ) {
if ( handlerExecutor ! = null ) {
handlerExecutor . shutdownNow ( ) ;
}
}
@Override
public ListenableFuture < TbProtoQueueMsg < TransportApiResponseMsg > > handle ( TbProtoQueueMsg < TransportApiRequestMsg > tbProtoQueueMsg ) {
TransportApiRequestMsg transportApiRequestMsg = tbProtoQueueMsg . getValue ( ) ;
@ -157,16 +178,19 @@ public class DefaultTransportApiService implements TransportApiService {
if ( transportApiRequestMsg . hasValidateTokenRequestMsg ( ) ) {
ValidateDeviceTokenRequestMsg msg = transportApiRequestMsg . getValidateTokenRequestMsg ( ) ;
result = validateCredentials ( msg . getToken ( ) , DeviceCredentialsType . ACCESS_TOKEN ) ;
final String token = msg . getToken ( ) ;
result = Futures . transformAsync ( handlerExecutor . submit ( ( ) - > validateCredentials ( token , DeviceCredentialsType . ACCESS_TOKEN ) ) , future - > future , MoreExecutors . directExecutor ( ) ) ;
} else if ( transportApiRequestMsg . hasValidateBasicMqttCredRequestMsg ( ) ) {
TransportProtos . ValidateBasicMqttCredRequestMsg msg = transportApiRequestMsg . getValidateBasicMqttCredRequestMsg ( ) ;
result = validateCredentials ( msg ) ;
result = Futures . transformAsync ( handlerExecutor . submit ( ( ) - > validateCredentials ( msg ) ) , future - > future , MoreExecutors . directExecutor ( ) ) ;
} else if ( transportApiRequestMsg . hasValidateX509CertRequestMsg ( ) ) {
ValidateDeviceX509CertRequestMsg msg = transportApiRequestMsg . getValidateX509CertRequestMsg ( ) ;
result = validateCredentials ( msg . getHash ( ) , DeviceCredentialsType . X509_CERTIFICATE ) ;
final String hash = msg . getHash ( ) ;
result = Futures . transformAsync ( handlerExecutor . submit ( ( ) - > validateCredentials ( hash , DeviceCredentialsType . X509_CERTIFICATE ) ) , future - > future , MoreExecutors . directExecutor ( ) ) ;
} else if ( transportApiRequestMsg . hasValidateOrCreateX509CertRequestMsg ( ) ) {
TransportProtos . ValidateOrCreateDeviceX509CertRequestMsg msg = transportApiRequestMsg . getValidateOrCreateX509CertRequestMsg ( ) ;
result = validateOrCreateDeviceX509Certificate ( msg . getCertificateChain ( ) ) ;
final String certChain = msg . getCertificateChain ( ) ;
result = Futures . transformAsync ( handlerExecutor . submit ( ( ) - > validateOrCreateDeviceX509Certificate ( certChain ) ) , future - > future , MoreExecutors . directExecutor ( ) ) ;
} else if ( transportApiRequestMsg . hasGetOrCreateDeviceRequestMsg ( ) ) {
result = handle ( transportApiRequestMsg . getGetOrCreateDeviceRequestMsg ( ) ) ;
} else if ( transportApiRequestMsg . hasEntityProfileRequestMsg ( ) ) {
@ -175,7 +199,8 @@ public class DefaultTransportApiService implements TransportApiService {
result = handle ( transportApiRequestMsg . getLwM2MRequestMsg ( ) ) ;
} else if ( transportApiRequestMsg . hasValidateDeviceLwM2MCredentialsRequestMsg ( ) ) {
ValidateDeviceLwM2MCredentialsRequestMsg msg = transportApiRequestMsg . getValidateDeviceLwM2MCredentialsRequestMsg ( ) ;
result = validateCredentials ( msg . getCredentialsId ( ) , DeviceCredentialsType . LWM2M_CREDENTIALS ) ;
final String credentialsId = msg . getCredentialsId ( ) ;
result = Futures . transformAsync ( handlerExecutor . submit ( ( ) - > validateCredentials ( credentialsId , DeviceCredentialsType . LWM2M_CREDENTIALS ) ) , future - > future , MoreExecutors . directExecutor ( ) ) ;
} else if ( transportApiRequestMsg . hasProvisionDeviceRequestMsg ( ) ) {
result = handle ( transportApiRequestMsg . getProvisionDeviceRequestMsg ( ) ) ;
} else if ( transportApiRequestMsg . hasResourceRequestMsg ( ) ) {