diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java index ba4eeb6ca4..337bb271e5 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java @@ -24,11 +24,15 @@ import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.CreateQueueRequest; import com.amazonaws.services.sqs.model.GetQueueUrlResult; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.util.PropertyUtils; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.Function; import java.util.stream.Collectors; @@ -38,6 +42,8 @@ public class TbAwsSqsAdmin implements TbQueueAdmin { private final Map attributes; private final AmazonSQS sqsClient; private final Map queues; + @Getter + private ExecutorService producerExecutor; public TbAwsSqsAdmin(TbAwsSqsSettings sqsSettings, Map attributes) { this.attributes = attributes; @@ -49,6 +55,11 @@ public class TbAwsSqsAdmin implements TbQueueAdmin { AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey()); credentialsProvider = new AWSStaticCredentialsProvider(awsCredentials); } + int threadPoolSize = sqsSettings.getThreadPoolSize(); + if (threadPoolSize == 0) { + threadPoolSize = 50; //AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE = 50; + } + producerExecutor = Executors.newFixedThreadPool(threadPoolSize, ThingsBoardThreadFactory.forName("aws-sqs-queue-executor")); sqsClient = AmazonSQSClientBuilder.standard() .withCredentials(credentialsProvider) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java index ab602e1960..f5bca724d0 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java @@ -44,10 +44,10 @@ public class TbAwsSqsProducerTemplate implements TbQueuePr private final AmazonSQSAsync sqsClient; private final Gson gson = new Gson(); private final Map queueUrlMap = new ConcurrentHashMap<>(); - private final TbQueueAdmin admin; + private final TbAwsSqsAdmin admin; public TbAwsSqsProducerTemplate(TbQueueAdmin admin, TbAwsSqsSettings sqsSettings, String defaultTopic) { - this.admin = admin; + this.admin = (TbAwsSqsAdmin) admin; this.defaultTopic = defaultTopic; AWSCredentialsProvider credentialsProvider; @@ -61,7 +61,7 @@ public class TbAwsSqsProducerTemplate implements TbQueuePr sqsClient = AmazonSQSAsyncClientBuilder.standard() .withCredentials(credentialsProvider) .withRegion(sqsSettings.getRegion()) - .withExecutorFactory(sqsSettings::getProducerExecutor) + .withExecutorFactory(this.admin::getProducerExecutor) .build(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java index 0a3915496b..1f8e301c87 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java @@ -50,14 +50,4 @@ public class TbAwsSqsSettings { @Value("${queue.aws_sqs.producer_thread_pool_size:0}") private int threadPoolSize; - private ExecutorService producerExecutor; - - @PostConstruct - private void init() { - if (threadPoolSize == 0) { - threadPoolSize = 50; //AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE = 50; - } - producerExecutor = Executors.newFixedThreadPool(threadPoolSize, ThingsBoardThreadFactory.forName("aws-sqs-queue-executor")); - } - }