diff --git a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java index 89a7480708..dfe9870763 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java @@ -20,12 +20,16 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import org.springframework.util.ConcurrentReferenceHashMap; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.cache.ota.OtaPackageDataCache; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.ApiUsageState; @@ -104,10 +108,11 @@ import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.resource.TbResourceService; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.List; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -144,12 +149,28 @@ public class DefaultTransportApiService implements TransportApiService { private final OtaPackageDataCache otaPackageDataCache; private final QueueService queueService; - private final ConcurrentMap deviceCreationLocks = new ConcurrentHashMap<>(); + private final ConcurrentMap deviceCreationLocks = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK); + + @Value("${queue.transport_api.max_core_handler_threads:16}") + private int maxCoreHandlerThreads; + + ListeningExecutorService handlerExecutor; private static boolean checkIsMqttCredentials(DeviceCredentials credentials) { return credentials != null && DeviceCredentialsType.MQTT_BASIC.equals(credentials.getCredentialsType()); } + @PostConstruct + public void init() { + handlerExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(maxCoreHandlerThreads, "transport-api-service-core-handler")); + } + @PreDestroy + public void destroy() { + if (handlerExecutor != null) { + handlerExecutor.shutdownNow(); + } + } + @Override public ListenableFuture> handle(TbProtoQueueMsg tbProtoQueueMsg) { TransportApiRequestMsg transportApiRequestMsg = tbProtoQueueMsg.getValue(); @@ -157,16 +178,19 @@ public class DefaultTransportApiService implements TransportApiService { if (transportApiRequestMsg.hasValidateTokenRequestMsg()) { ValidateDeviceTokenRequestMsg msg = transportApiRequestMsg.getValidateTokenRequestMsg(); - result = validateCredentials(msg.getToken(), DeviceCredentialsType.ACCESS_TOKEN); + final String token = msg.getToken(); + result = Futures.transformAsync(handlerExecutor.submit(() -> validateCredentials(token, DeviceCredentialsType.ACCESS_TOKEN)), future -> future, MoreExecutors.directExecutor()); } else if (transportApiRequestMsg.hasValidateBasicMqttCredRequestMsg()) { TransportProtos.ValidateBasicMqttCredRequestMsg msg = transportApiRequestMsg.getValidateBasicMqttCredRequestMsg(); - result = validateCredentials(msg); + result = Futures.transformAsync(handlerExecutor.submit(() -> validateCredentials(msg)), future -> future, MoreExecutors.directExecutor()); } else if (transportApiRequestMsg.hasValidateX509CertRequestMsg()) { ValidateDeviceX509CertRequestMsg msg = transportApiRequestMsg.getValidateX509CertRequestMsg(); - result = validateCredentials(msg.getHash(), DeviceCredentialsType.X509_CERTIFICATE); + final String hash = msg.getHash(); + result = Futures.transformAsync(handlerExecutor.submit(() -> validateCredentials(hash, DeviceCredentialsType.X509_CERTIFICATE)), future -> future, MoreExecutors.directExecutor()); } else if (transportApiRequestMsg.hasValidateOrCreateX509CertRequestMsg()) { TransportProtos.ValidateOrCreateDeviceX509CertRequestMsg msg = transportApiRequestMsg.getValidateOrCreateX509CertRequestMsg(); - result = validateOrCreateDeviceX509Certificate(msg.getCertificateChain()); + final String certChain = msg.getCertificateChain(); + result = Futures.transformAsync(handlerExecutor.submit(() -> validateOrCreateDeviceX509Certificate(certChain)), future -> future, MoreExecutors.directExecutor()); } else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) { result = handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg()); } else if (transportApiRequestMsg.hasEntityProfileRequestMsg()) { @@ -175,7 +199,8 @@ public class DefaultTransportApiService implements TransportApiService { result = handle(transportApiRequestMsg.getLwM2MRequestMsg()); } else if (transportApiRequestMsg.hasValidateDeviceLwM2MCredentialsRequestMsg()) { ValidateDeviceLwM2MCredentialsRequestMsg msg = transportApiRequestMsg.getValidateDeviceLwM2MCredentialsRequestMsg(); - result = validateCredentials(msg.getCredentialsId(), DeviceCredentialsType.LWM2M_CREDENTIALS); + final String credentialsId = msg.getCredentialsId(); + result = Futures.transformAsync(handlerExecutor.submit(() -> validateCredentials(credentialsId, DeviceCredentialsType.LWM2M_CREDENTIALS)), future -> future, MoreExecutors.directExecutor()); } else if (transportApiRequestMsg.hasProvisionDeviceRequestMsg()) { result = handle(transportApiRequestMsg.getProvisionDeviceRequestMsg()); } else if (transportApiRequestMsg.hasResourceRequestMsg()) { diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index be01c95fad..fe38f1a4c8 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1205,6 +1205,7 @@ queue: max_pending_requests: "${TB_QUEUE_TRANSPORT_MAX_PENDING_REQUESTS:10000}" max_requests_timeout: "${TB_QUEUE_TRANSPORT_MAX_REQUEST_TIMEOUT:10000}" max_callback_threads: "${TB_QUEUE_TRANSPORT_MAX_CALLBACK_THREADS:100}" + max_core_handler_threads: "${TB_QUEUE_TRANSPORT_MAX_CORE_HANDLER_THREADS:16}" request_poll_interval: "${TB_QUEUE_TRANSPORT_REQUEST_POLL_INTERVAL_MS:25}" response_poll_interval: "${TB_QUEUE_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}" core: