Browse Source

moved producer executor to TbAwsSqsAdmin

pull/9915/head
dashevchenko 2 years ago
parent
commit
63a85161b2
  1. 11
      common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java
  2. 6
      common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java
  3. 10
      common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java

11
common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java

@ -24,11 +24,15 @@ import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.GetQueueUrlResult;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.util.PropertyUtils;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -38,6 +42,8 @@ public class TbAwsSqsAdmin implements TbQueueAdmin {
private final Map<String, String> attributes;
private final AmazonSQS sqsClient;
private final Map<String, String> queues;
@Getter
private ExecutorService producerExecutor;
public TbAwsSqsAdmin(TbAwsSqsSettings sqsSettings, Map<String, String> attributes) {
this.attributes = attributes;
@ -49,6 +55,11 @@ public class TbAwsSqsAdmin implements TbQueueAdmin {
AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey());
credentialsProvider = new AWSStaticCredentialsProvider(awsCredentials);
}
int threadPoolSize = sqsSettings.getThreadPoolSize();
if (threadPoolSize == 0) {
threadPoolSize = 50; //AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE = 50;
}
producerExecutor = Executors.newFixedThreadPool(threadPoolSize, ThingsBoardThreadFactory.forName("aws-sqs-queue-executor"));
sqsClient = AmazonSQSClientBuilder.standard()
.withCredentials(credentialsProvider)

6
common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java

@ -44,10 +44,10 @@ public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
private final AmazonSQSAsync sqsClient;
private final Gson gson = new Gson();
private final Map<String, String> queueUrlMap = new ConcurrentHashMap<>();
private final TbQueueAdmin admin;
private final TbAwsSqsAdmin admin;
public TbAwsSqsProducerTemplate(TbQueueAdmin admin, TbAwsSqsSettings sqsSettings, String defaultTopic) {
this.admin = admin;
this.admin = (TbAwsSqsAdmin) admin;
this.defaultTopic = defaultTopic;
AWSCredentialsProvider credentialsProvider;
@ -61,7 +61,7 @@ public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
sqsClient = AmazonSQSAsyncClientBuilder.standard()
.withCredentials(credentialsProvider)
.withRegion(sqsSettings.getRegion())
.withExecutorFactory(sqsSettings::getProducerExecutor)
.withExecutorFactory(this.admin::getProducerExecutor)
.build();
}

10
common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsSettings.java

@ -50,14 +50,4 @@ public class TbAwsSqsSettings {
@Value("${queue.aws_sqs.producer_thread_pool_size:0}")
private int threadPoolSize;
private ExecutorService producerExecutor;
@PostConstruct
private void init() {
if (threadPoolSize == 0) {
threadPoolSize = 50; //AmazonSQSAsyncClient.DEFAULT_THREAD_POOL_SIZE = 50;
}
producerExecutor = Executors.newFixedThreadPool(threadPoolSize, ThingsBoardThreadFactory.forName("aws-sqs-queue-executor"));
}
}

Loading…
Cancel
Save