From 3ac0cc241a23af1ecc885055f81f2d4bbcff0f2d Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Wed, 27 Dec 2023 13:47:52 +0200 Subject: [PATCH] TbAwsSqsProducerTemplate: native thread allocation error fix (provided one executor for message sending with limited thread pool size) --- .../queue/sqs/TbAwsSqsProducerTemplate.java | 38 +++++++------------ .../server/queue/sqs/TbAwsSqsSettings.java | 18 +++++++++ 2 files changed, 31 insertions(+), 25 deletions(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java index 863e96a714..ab602e1960 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java @@ -20,15 +20,11 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; -import com.amazonaws.services.sqs.AmazonSQS; -import com.amazonaws.services.sqs.AmazonSQSClientBuilder; +import com.amazonaws.handlers.AsyncHandler; +import com.amazonaws.services.sqs.AmazonSQSAsync; +import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder; import com.amazonaws.services.sqs.model.SendMessageRequest; import com.amazonaws.services.sqs.model.SendMessageResult; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; @@ -41,16 +37,14 @@ import org.thingsboard.server.queue.common.DefaultTbQueueMsg; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; @Slf4j public class TbAwsSqsProducerTemplate implements TbQueueProducer { private final String defaultTopic; - private final AmazonSQS sqsClient; + private final AmazonSQSAsync sqsClient; private final Gson gson = new Gson(); private final Map queueUrlMap = new ConcurrentHashMap<>(); private final TbQueueAdmin admin; - private ListeningExecutorService producerExecutor; public TbAwsSqsProducerTemplate(TbQueueAdmin admin, TbAwsSqsSettings sqsSettings, String defaultTopic) { this.admin = admin; @@ -64,11 +58,11 @@ public class TbAwsSqsProducerTemplate implements TbQueuePr credentialsProvider = new AWSStaticCredentialsProvider(awsCredentials); } - sqsClient = AmazonSQSClientBuilder.standard() + sqsClient = AmazonSQSAsyncClientBuilder.standard() .withCredentials(credentialsProvider) .withRegion(sqsSettings.getRegion()) + .withExecutorFactory(sqsSettings::getProducerExecutor) .build(); - producerExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); } @Override @@ -91,30 +85,24 @@ public class TbAwsSqsProducerTemplate implements TbQueuePr sendMsgRequest.withMessageGroupId(sqsMsgId); sendMsgRequest.withMessageDeduplicationId(sqsMsgId); - ListenableFuture future = producerExecutor.submit(() -> sqsClient.sendMessage(sendMsgRequest)); - - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(SendMessageResult result) { + sqsClient.sendMessageAsync(sendMsgRequest, new AsyncHandler() { + @Override public void onError(Exception e) { if (callback != null) { - callback.onSuccess(new AwsSqsTbQueueMsgMetadata(result.getSdkHttpMetadata())); + callback.onFailure(e); } } - @Override - public void onFailure(Throwable t) { + @Override public void onSuccess(SendMessageRequest request, + SendMessageResult sendMessageResult) { if (callback != null) { - callback.onFailure(t); + callback.onSuccess(new AwsSqsTbQueueMsgMetadata(sendMessageResult.getSdkHttpMetadata())); } } - }, producerExecutor); + }); } @Override public void stop() { - if (producerExecutor != null) { - producerExecutor.shutdownNow(); - } if (sqsClient != null) { sqsClient.shutdown(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java index 89dc9d7e47..0a3915496b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java @@ -20,6 +20,11 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; +import org.thingsboard.common.util.ThingsBoardThreadFactory; + +import javax.annotation.PostConstruct; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; @Slf4j @ConditionalOnExpression("'${queue.type:null}'=='aws-sqs'") @@ -42,4 +47,17 @@ public class TbAwsSqsSettings { @Value("${queue.aws_sqs.threads_per_topic}") private int threadsPerTopic; + @Value("${queue.aws_sqs.producer_thread_pool_size:0}") + private int threadPoolSize; + + private ExecutorService producerExecutor; + + @PostConstruct + private void init() { + if (threadPoolSize == 0) { + threadPoolSize = 50; //AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE = 50; + } + producerExecutor = Executors.newFixedThreadPool(threadPoolSize, ThingsBoardThreadFactory.forName("aws-sqs-queue-executor")); + } + }