From 09d8823205e5d517cec66ff3bd4a84e9d491ee60 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Thu, 30 Apr 2020 20:38:53 +0300 Subject: [PATCH 1/3] Race condition fix --- .../queue/kafka/TbKafkaConsumerTemplate.java | 26 ++++++++++++------- docker/docker-compose.yml | 2 +- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java index 541337579d..6b1c051eeb 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java @@ -81,14 +81,24 @@ public class TbKafkaConsumerTemplate implements TbQueueCon @Override public void subscribe() { - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true)); - subscribed = false; + consumerLock.lock(); + try { + partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true)); + subscribed = false; + } finally { + consumerLock.unlock(); + } } @Override public void subscribe(Set partitions) { - this.partitions = partitions; - subscribed = false; + consumerLock.lock(); + try { + this.partitions = partitions; + subscribed = false; + } finally { + consumerLock.unlock(); + } } @Override @@ -100,13 +110,11 @@ public class TbKafkaConsumerTemplate implements TbQueueCon log.debug("Failed to await subscription", e); } } else { + consumerLock.lock(); try { - consumerLock.lock(); - if (!subscribed) { List topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); topicNames.forEach(admin::createTopicIfNotExists); - consumer.unsubscribe(); consumer.subscribe(topicNames); subscribed = true; } @@ -132,8 +140,8 @@ public class TbKafkaConsumerTemplate implements TbQueueCon @Override public void commit() { + consumerLock.lock(); try { - consumerLock.lock(); consumer.commitAsync(); } finally { consumerLock.unlock(); @@ -142,8 +150,8 @@ public class TbKafkaConsumerTemplate implements TbQueueCon @Override public void unsubscribe() { + consumerLock.lock(); try { - consumerLock.lock(); if (consumer != null) { consumer.unsubscribe(); consumer.close(); diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 2f0d0fb3b3..9061f3e2de 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -28,7 +28,7 @@ services: ZOO_SERVERS: server.1=zookeeper:2888:3888;zookeeper:2181 kafka: restart: always - image: "wurstmeister/kafka:2.12-2.2.1" + image: "wurstmeister/kafka:2.12-2.3.0" ports: - "9092:9092" env_file: From c7f282d39385473928f4111d015bc02ce30b07bb Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Fri, 1 May 2020 12:45:06 +0300 Subject: [PATCH 2/3] Refactoring of the Queue Consumers --- .../AbstractTbQueueConsumerTemplate.java | 136 ++++++++++++++++++ .../queue/kafka/TbKafkaConsumerTemplate.java | 106 +++----------- 2 files changed, 159 insertions(+), 83 deletions(-) create mode 100644 common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java new file mode 100644 index 0000000000..084d10fca9 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java @@ -0,0 +1,136 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.common; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.queue.TbQueueConsumer; +import org.thingsboard.server.queue.TbQueueMsg; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +@Slf4j +public abstract class AbstractTbQueueConsumerTemplate implements TbQueueConsumer { + + private volatile boolean subscribed; + protected volatile Set partitions; + protected final Lock consumerLock = new ReentrantLock(); + + @Getter + private final String topic; + + public AbstractTbQueueConsumerTemplate(String topic) { + this.topic = topic; + } + + @Override + public void subscribe() { + consumerLock.lock(); + try { + partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true)); + subscribed = false; + } finally { + consumerLock.unlock(); + } + } + + @Override + public void subscribe(Set partitions) { + consumerLock.lock(); + try { + this.partitions = partitions; + subscribed = false; + } finally { + consumerLock.unlock(); + } + } + + @Override + public List poll(long durationInMillis) { + if (!subscribed && partitions == null) { + try { + Thread.sleep(durationInMillis); + } catch (InterruptedException e) { + log.debug("Failed to await subscription", e); + } + } else { + consumerLock.lock(); + try { + if (!subscribed) { + doSubscribe(); + subscribed = true; + } + + List records = doPoll(durationInMillis); + if (!records.isEmpty()) { + List result = new ArrayList<>(records.size()); + records.forEach(record -> { + try { + if (record != null) { + result.add(decode(record)); + } + } catch (IOException e) { + log.error("Failed decode record: [{}]", record); + throw new RuntimeException("Failed to decode record: ", e); + } + }); + return result; + } + } finally { + consumerLock.unlock(); + } + } + return Collections.emptyList(); + } + + @Override + public void commit() { + consumerLock.lock(); + try { + doCommit(); + } finally { + consumerLock.unlock(); + } + } + + @Override + public void unsubscribe() { + consumerLock.lock(); + try { + doUnsubscribe(); + } finally { + consumerLock.unlock(); + } + } + + abstract protected List doPoll(long durationInMillis); + + abstract protected T decode(R record) throws IOException; + + abstract protected void doSubscribe(); + + abstract protected void doCommit(); + + abstract protected void doUnsubscribe(); + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java index 6b1c051eeb..fea31854df 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java @@ -16,7 +16,6 @@ package org.thingsboard.server.queue.kafka; import lombok.Builder; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -24,8 +23,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.TbQueueAdmin; -import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueMsg; +import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate; import java.io.IOException; import java.time.Duration; @@ -33,26 +32,17 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Properties; -import java.util.Set; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; /** * Created by ashvayka on 24.09.18. */ @Slf4j -public class TbKafkaConsumerTemplate implements TbQueueConsumer { +public class TbKafkaConsumerTemplate extends AbstractTbQueueConsumerTemplate, T> { private final TbQueueAdmin admin; private final KafkaConsumer consumer; private final TbKafkaDecoder decoder; - private volatile boolean subscribed; - private volatile Set partitions; - private final Lock consumerLock; - - @Getter - private final String topic; @Builder private TbKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder decoder, @@ -60,6 +50,7 @@ public class TbKafkaConsumerTemplate implements TbQueueCon boolean autoCommit, int autoCommitIntervalMs, int maxPollRecords, TbQueueAdmin admin) { + super(topic); Properties props = settings.toProps(); props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); if (groupId != null) { @@ -75,94 +66,43 @@ public class TbKafkaConsumerTemplate implements TbQueueCon this.admin = admin; this.consumer = new KafkaConsumer<>(props); this.decoder = decoder; - this.topic = topic; - this.consumerLock = new ReentrantLock(); } @Override - public void subscribe() { - consumerLock.lock(); - try { - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true)); - subscribed = false; - } finally { - consumerLock.unlock(); - } + protected void doSubscribe() { + List topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); + topicNames.forEach(admin::createTopicIfNotExists); + consumer.subscribe(topicNames); } @Override - public void subscribe(Set partitions) { - consumerLock.lock(); - try { - this.partitions = partitions; - subscribed = false; - } finally { - consumerLock.unlock(); + protected List> doPoll(long durationInMillis) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(durationInMillis)); + if (records.isEmpty()) { + return Collections.emptyList(); + } else { + List> recordList = new ArrayList<>(256); + records.forEach(recordList::add); + return recordList; } } @Override - public List poll(long durationInMillis) { - if (!subscribed && partitions == null) { - try { - Thread.sleep(durationInMillis); - } catch (InterruptedException e) { - log.debug("Failed to await subscription", e); - } - } else { - consumerLock.lock(); - try { - if (!subscribed) { - List topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); - topicNames.forEach(admin::createTopicIfNotExists); - consumer.subscribe(topicNames); - subscribed = true; - } - - ConsumerRecords records = consumer.poll(Duration.ofMillis(durationInMillis)); - if (records.count() > 0) { - List result = new ArrayList<>(); - records.forEach(record -> { - try { - result.add(decode(record)); - } catch (IOException e) { - log.error("Failed decode record: [{}]", record); - } - }); - return result; - } - } finally { - consumerLock.unlock(); - } - } - return Collections.emptyList(); + public T decode(ConsumerRecord record) throws IOException { + return decoder.decode(new KafkaTbQueueMsg(record)); } @Override - public void commit() { - consumerLock.lock(); - try { - consumer.commitAsync(); - } finally { - consumerLock.unlock(); - } + protected void doCommit() { + consumer.commitAsync(); } @Override - public void unsubscribe() { - consumerLock.lock(); - try { - if (consumer != null) { - consumer.unsubscribe(); - consumer.close(); - } - } finally { - consumerLock.unlock(); + protected void doUnsubscribe() { + if (consumer != null) { + consumer.unsubscribe(); + consumer.close(); } } - public T decode(ConsumerRecord record) throws IOException { - return decoder.decode(new KafkaTbQueueMsg(record)); - } - } From 8d5c38b743a91c2ad3739c25c47f93ece5480f08 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Fri, 1 May 2020 17:43:13 +0300 Subject: [PATCH 3/3] Queue refactoring --- .../processing/AbstractConsumerService.java | 1 - .../BatchTbRuleEngineSubmitStrategy.java | 5 +- .../BurstTbRuleEngineSubmitStrategy.java | 10 +- ...lByEntityIdTbRuleEngineSubmitStrategy.java | 8 - ...lByTenantIdTbRuleEngineSubmitStrategy.java | 1 - .../SequentialTbRuleEngineSubmitStrategy.java | 6 +- ...TbRuleEngineProcessingStrategyFactory.java | 8 +- .../TbServiceBusConsumerTemplate.java | 116 +++++--------- ...stractParallelTbQueueConsumerTemplate.java | 53 +++++++ .../AbstractTbQueueConsumerTemplate.java | 20 ++- .../queue/kafka/TbKafkaConsumerTemplate.java | 3 +- .../pubsub/TbPubSubConsumerTemplate.java | 102 ++++--------- .../rabbitmq/TbRabbitMqConsumerTemplate.java | 107 ++++--------- .../queue/sqs/TbAwsSqsConsumerTemplate.java | 142 ++++++------------ 14 files changed, 232 insertions(+), 350 deletions(-) create mode 100644 common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractParallelTbQueueConsumerTemplate.java diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java index c2705fcbdc..4007c9e17d 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java @@ -23,7 +23,6 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; -import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionChangeEvent; diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/BatchTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/BatchTbRuleEngineSubmitStrategy.java index d0b1f7f99a..b9741d2433 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/BatchTbRuleEngineSubmitStrategy.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/BatchTbRuleEngineSubmitStrategy.java @@ -23,7 +23,6 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; @@ -77,8 +76,8 @@ public class BatchTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitS } } int submitSize = pendingPack.size(); - if (log.isInfoEnabled() && submitSize > 0) { - log.info("[{}] submitting [{}] messages to rule engine", queueName, submitSize); + if (log.isDebugEnabled() && submitSize > 0) { + log.debug("[{}] submitting [{}] messages to rule engine", queueName, submitSize); } pendingPack.forEach(msgConsumer); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/BurstTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/BurstTbRuleEngineSubmitStrategy.java index ffd1dd49d1..3420933d3a 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/BurstTbRuleEngineSubmitStrategy.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/BurstTbRuleEngineSubmitStrategy.java @@ -19,14 +19,8 @@ import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import java.util.ArrayList; -import java.util.List; import java.util.UUID; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; -import java.util.function.Function; -import java.util.stream.Collectors; @Slf4j public class BurstTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitStrategy { @@ -37,8 +31,8 @@ public class BurstTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitS @Override public void submitAttempt(BiConsumer> msgConsumer) { - if (log.isInfoEnabled()) { - log.info("[{}] submitting [{}] messages to rule engine", queueName, orderedMsgList.size()); + if (log.isDebugEnabled()) { + log.debug("[{}] submitting [{}] messages to rule engine", queueName, orderedMsgList.size()); } orderedMsgList.forEach(pair -> msgConsumer.accept(pair.uuid, pair.msg)); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByEntityIdTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByEntityIdTbRuleEngineSubmitStrategy.java index ae5993cb1c..473810b86c 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByEntityIdTbRuleEngineSubmitStrategy.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByEntityIdTbRuleEngineSubmitStrategy.java @@ -15,26 +15,18 @@ */ package org.thingsboard.server.service.queue.processing; -import com.google.protobuf.InvalidProtocolBufferException; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.EntityIdFactory; -import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.common.msg.gen.MsgProtos; -import org.thingsboard.server.common.msg.queue.TbMsgCallback; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; -import java.util.stream.Collectors; @Slf4j public abstract class SequentialByEntityIdTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitStrategy { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByTenantIdTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByTenantIdTbRuleEngineSubmitStrategy.java index b258c6db1b..37e9419edd 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByTenantIdTbRuleEngineSubmitStrategy.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByTenantIdTbRuleEngineSubmitStrategy.java @@ -30,6 +30,5 @@ public class SequentialByTenantIdTbRuleEngineSubmitStrategy extends SequentialBy @Override protected EntityId getEntityId(TransportProtos.ToRuleEngineMsg msg) { return new TenantId(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB())); - } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialTbRuleEngineSubmitStrategy.java index ef45b983fc..125a1d8ef8 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialTbRuleEngineSubmitStrategy.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialTbRuleEngineSubmitStrategy.java @@ -19,8 +19,6 @@ import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import java.util.LinkedHashMap; -import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; @@ -63,8 +61,8 @@ public class SequentialTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSu if (idx < listSize) { IdMsgPair pair = orderedMsgList.get(idx); expectedMsgId = pair.uuid; - if (log.isInfoEnabled()) { - log.info("[{}] submitting [{}] message to rule engine", queueName, pair.msg); + if (log.isDebugEnabled()) { + log.debug("[{}] submitting [{}] message to rule engine", queueName, pair.msg); } msgConsumer.accept(pair.uuid, pair.msg); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java index bbf283e962..80b0523a81 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java @@ -82,10 +82,10 @@ public class TbRuleEngineProcessingStrategyFactory { retryCount++; double failedCount = result.getFailedMap().size() + result.getPendingMap().size(); if (maxRetries > 0 && retryCount > maxRetries) { - log.info("[{}] Skip reprocess of the rule engine pack due to max retries", queueName); + log.debug("[{}] Skip reprocess of the rule engine pack due to max retries", queueName); return new TbRuleEngineProcessingDecision(true, null); } else if (maxAllowedFailurePercentage > 0 && (failedCount / initialTotalCount) > maxAllowedFailurePercentage) { - log.info("[{}] Skip reprocess of the rule engine pack due to max allowed failure percentage", queueName); + log.debug("[{}] Skip reprocess of the rule engine pack due to max allowed failure percentage", queueName); return new TbRuleEngineProcessingDecision(true, null); } else { ConcurrentMap> toReprocess = new ConcurrentHashMap<>(initialTotalCount); @@ -98,7 +98,7 @@ public class TbRuleEngineProcessingStrategyFactory { if (retrySuccessful) { result.getSuccessMap().forEach(toReprocess::put); } - log.info("[{}] Going to reprocess {} messages", queueName, toReprocess.size()); + log.debug("[{}] Going to reprocess {} messages", queueName, toReprocess.size()); if (log.isTraceEnabled()) { toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); } @@ -126,7 +126,7 @@ public class TbRuleEngineProcessingStrategyFactory { @Override public TbRuleEngineProcessingDecision analyze(TbRuleEngineProcessingResult result) { if (!result.isSuccess()) { - log.info("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size()); + log.debug("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size()); } if (log.isTraceEnabled()) { result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusConsumerTemplate.java index cca599d59a..4db5ade728 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusConsumerTemplate.java @@ -31,9 +31,9 @@ import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.springframework.util.CollectionUtils; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.TbQueueAdmin; -import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.TbQueueMsgDecoder; +import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate; import org.thingsboard.server.queue.common.DefaultTbQueueMsg; import java.time.Duration; @@ -50,102 +50,72 @@ import java.util.stream.Collectors; import java.util.stream.Stream; @Slf4j -public class TbServiceBusConsumerTemplate implements TbQueueConsumer { +public class TbServiceBusConsumerTemplate extends AbstractTbQueueConsumerTemplate { private final TbQueueAdmin admin; - private final String topic; private final TbQueueMsgDecoder decoder; private final TbServiceBusSettings serviceBusSettings; private final Gson gson = new Gson(); private Set receivers; - private volatile Set partitions; - private volatile boolean subscribed; - private volatile boolean stopped = false; private Map> pendingMessages = new ConcurrentHashMap<>(); private volatile int messagesPerQueue; public TbServiceBusConsumerTemplate(TbQueueAdmin admin, TbServiceBusSettings serviceBusSettings, String topic, TbQueueMsgDecoder decoder) { + super(topic); this.admin = admin; this.decoder = decoder; - this.topic = topic; this.serviceBusSettings = serviceBusSettings; } @Override - public String getTopic() { - return topic; + protected List doPoll(long durationInMillis) { + List>> messageFutures = + receivers.stream() + .map(receiver -> receiver + .receiveAsync(messagesPerQueue, Duration.ofMillis(durationInMillis)) + .whenComplete((messages, err) -> { + if (!CollectionUtils.isEmpty(messages)) { + pendingMessages.put(receiver, messages); + } else if (err != null) { + log.error("Failed to receive messages.", err); + } + })) + .collect(Collectors.toList()); + try { + return fromList(messageFutures) + .get() + .stream() + .flatMap(messages -> CollectionUtils.isEmpty(messages) ? Stream.empty() : messages.stream()) + .collect(Collectors.toList()); + } catch (InterruptedException | ExecutionException e) { + if (stopped) { + log.info("[{}] Service Bus consumer is stopped.", getTopic()); + } else { + log.error("Failed to receive messages", e); + } + return Collections.emptyList(); + } } @Override - public void subscribe() { - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true)); - subscribed = false; + protected void doSubscribe(List topicNames) { + createReceivers(); + messagesPerQueue = receivers.size() / partitions.size(); } @Override - public void subscribe(Set partitions) { - this.partitions = partitions; - subscribed = false; + protected void doCommit() { + pendingMessages.forEach((receiver, msgs) -> + msgs.forEach(msg -> receiver.completeMessageAsync(msg.getDeliveryTag(), TransactionContext.NULL_TXN))); + pendingMessages.clear(); } @Override - public void unsubscribe() { - stopped = true; + protected void doUnsubscribe() { receivers.forEach(CoreMessageReceiver::closeAsync); } - @Override - public List poll(long durationInMillis) { - if (!subscribed && partitions == null) { - try { - Thread.sleep(durationInMillis); - } catch (InterruptedException e) { - log.debug("Failed to await subscription", e); - } - } else { - if (!subscribed) { - createReceivers(); - messagesPerQueue = receivers.size() / partitions.size(); - subscribed = true; - } - - List>> messageFutures = - receivers.stream() - .map(receiver -> receiver - .receiveAsync(messagesPerQueue, Duration.ofMillis(durationInMillis)) - .whenComplete((messages, err) -> { - if (!CollectionUtils.isEmpty(messages)) { - pendingMessages.put(receiver, messages); - } else if (err != null) { - log.error("Failed to receive messages.", err); - } - })) - .collect(Collectors.toList()); - try { - return fromList(messageFutures) - .get() - .stream() - .flatMap(messages -> CollectionUtils.isEmpty(messages) ? Stream.empty() : messages.stream()) - .map(message -> { - try { - return decode(message); - } catch (InvalidProtocolBufferException e) { - log.error("Failed to parse message.", e); - throw new RuntimeException("Failed to parse message.", e); - } - }).collect(Collectors.toList()); - } catch (InterruptedException | ExecutionException e) { - if (stopped) { - log.info("[{}] Service Bus consumer is stopped.", topic); - } else { - log.error("Failed to receive messages", e); - } - } - } - return Collections.emptyList(); - } - private void createReceivers() { List> receiverFutures = partitions.stream() .map(TopicPartitionInfo::getFullTopicName) @@ -167,7 +137,7 @@ public class TbServiceBusConsumerTemplate implements TbQue receivers = new HashSet<>(fromList(receiverFutures).get()); } catch (InterruptedException | ExecutionException e) { if (stopped) { - log.info("[{}] Service Bus consumer is stopped.", topic); + log.info("[{}] Service Bus consumer is stopped.", getTopic()); } else { log.error("Failed to create receivers", e); } @@ -196,13 +166,7 @@ public class TbServiceBusConsumerTemplate implements TbQue } @Override - public void commit() { - pendingMessages.forEach((receiver, msgs) -> - msgs.forEach(msg -> receiver.completeMessageAsync(msg.getDeliveryTag(), TransactionContext.NULL_TXN))); - pendingMessages.clear(); - } - - private T decode(MessageWithDeliveryTag data) throws InvalidProtocolBufferException { + protected T decode(MessageWithDeliveryTag data) throws InvalidProtocolBufferException { DefaultTbQueueMsg msg = gson.fromJson(new String(((Data) data.getMessage().getBody()).getValue().getArray()), DefaultTbQueueMsg.class); return decoder.decode(msg); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractParallelTbQueueConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractParallelTbQueueConsumerTemplate.java new file mode 100644 index 0000000000..bb83a79250 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractParallelTbQueueConsumerTemplate.java @@ -0,0 +1,53 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.common; + +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.queue.TbQueueMsg; + +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +@Slf4j +public abstract class AbstractParallelTbQueueConsumerTemplate extends AbstractTbQueueConsumerTemplate { + + protected ListeningExecutorService consumerExecutor; + + public AbstractParallelTbQueueConsumerTemplate(String topic) { + super(topic); + } + + protected void initNewExecutor(int threadPoolSize) { + if (consumerExecutor != null) { + consumerExecutor.shutdown(); + try { + consumerExecutor.awaitTermination(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + log.trace("Interrupted while waiting for consumer executor to stop"); + } + } + consumerExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadPoolSize)); + } + + protected void shutdownExecutor() { + if (consumerExecutor != null) { + consumerExecutor.shutdownNow(); + } + } + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java index 084d10fca9..c8cc545601 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java @@ -28,11 +28,13 @@ import java.util.List; import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; @Slf4j public abstract class AbstractTbQueueConsumerTemplate implements TbQueueConsumer { private volatile boolean subscribed; + protected volatile boolean stopped = false; protected volatile Set partitions; protected final Lock consumerLock = new ReentrantLock(); @@ -74,10 +76,12 @@ public abstract class AbstractTbQueueConsumerTemplate i log.debug("Failed to await subscription", e); } } else { + long pollStartTs = System.currentTimeMillis(); consumerLock.lock(); try { if (!subscribed) { - doSubscribe(); + List topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); + doSubscribe(topicNames); subscribed = true; } @@ -95,6 +99,17 @@ public abstract class AbstractTbQueueConsumerTemplate i } }); return result; + } else { + long pollDuration = System.currentTimeMillis() - pollStartTs; + if (pollDuration < durationInMillis) { + try { + Thread.sleep(durationInMillis - pollDuration); + } catch (InterruptedException e) { + if (!stopped) { + log.error("Failed to wait.", e); + } + } + } } } finally { consumerLock.unlock(); @@ -115,6 +130,7 @@ public abstract class AbstractTbQueueConsumerTemplate i @Override public void unsubscribe() { + stopped = true; consumerLock.lock(); try { doUnsubscribe(); @@ -127,7 +143,7 @@ public abstract class AbstractTbQueueConsumerTemplate i abstract protected T decode(R record) throws IOException; - abstract protected void doSubscribe(); + abstract protected void doSubscribe(List topicNames); abstract protected void doCommit(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java index fea31854df..75635de7a4 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java @@ -69,8 +69,7 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue } @Override - protected void doSubscribe() { - List topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); + protected void doSubscribe( List topicNames) { topicNames.forEach(admin::createTopicIfNotExists); consumer.subscribe(topicNames); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java index 5dc795739a..7302d19ff7 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.queue.pubsub; +import com.amazonaws.services.sqs.model.Message; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub; @@ -35,11 +36,14 @@ import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.TbQueueMsgDecoder; +import org.thingsboard.server.queue.common.AbstractParallelTbQueueConsumerTemplate; +import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate; import org.thingsboard.server.queue.common.DefaultTbQueueMsg; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.List; import java.util.Objects; import java.util.Set; @@ -47,10 +51,11 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @Slf4j -public class TbPubSubConsumerTemplate implements TbQueueConsumer { +public class TbPubSubConsumerTemplate extends AbstractParallelTbQueueConsumerTemplate { private final Gson gson = new Gson(); private final TbQueueAdmin admin; @@ -58,23 +63,18 @@ public class TbPubSubConsumerTemplate implements TbQueueCo private final TbQueueMsgDecoder decoder; private final TbPubSubSettings pubSubSettings; - private volatile boolean subscribed; - private volatile Set partitions; private volatile Set subscriptionNames; private final List acknowledgeRequests = new CopyOnWriteArrayList<>(); - private ExecutorService consumerExecutor; private final SubscriberStub subscriber; - private volatile boolean stopped; - private volatile int messagesPerTopic; public TbPubSubConsumerTemplate(TbQueueAdmin admin, TbPubSubSettings pubSubSettings, String topic, TbQueueMsgDecoder decoder) { + super(topic); this.admin = admin; this.pubSubSettings = pubSubSettings; this.topic = topic; this.decoder = decoder; - try { SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder() @@ -84,89 +84,50 @@ public class TbPubSubConsumerTemplate implements TbQueueCo .setMaxInboundMessageSize(pubSubSettings.getMaxMsgSize()) .build()) .build(); - this.subscriber = GrpcSubscriberStub.create(subscriberStubSettings); } catch (IOException e) { log.error("Failed to create subscriber.", e); throw new RuntimeException("Failed to create subscriber.", e); } - stopped = false; } @Override - public String getTopic() { - return topic; + protected List doPoll(long durationInMillis) { + try { + List messages = receiveMessages(); + if (!messages.isEmpty()) { + return messages.stream().map(ReceivedMessage::getMessage).collect(Collectors.toList()); + } + } catch (ExecutionException | InterruptedException e) { + if (stopped) { + log.info("[{}] Pub/Sub consumer is stopped.", topic); + } else { + log.error("Failed to receive messages", e); + } + } + return Collections.emptyList(); } @Override - public void subscribe() { - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true)); - subscribed = false; + protected void doSubscribe(List topicNames) { + subscriptionNames = new LinkedHashSet<>(topicNames); + subscriptionNames.forEach(admin::createTopicIfNotExists); + initNewExecutor(subscriptionNames.size() + 1); + messagesPerTopic = pubSubSettings.getMaxMessages() / subscriptionNames.size(); } @Override - public void subscribe(Set partitions) { - this.partitions = partitions; - subscribed = false; + protected void doCommit() { + acknowledgeRequests.forEach(subscriber.acknowledgeCallable()::futureCall); + acknowledgeRequests.clear(); } @Override - public void unsubscribe() { - stopped = true; - if (consumerExecutor != null) { - consumerExecutor.shutdownNow(); - } - + protected void doUnsubscribe() { if (subscriber != null) { subscriber.close(); } - } - - @Override - public List poll(long durationInMillis) { - if (!subscribed && partitions == null) { - try { - Thread.sleep(durationInMillis); - } catch (InterruptedException e) { - log.debug("Failed to await subscription", e); - } - } else { - if (!subscribed) { - subscriptionNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toSet()); - subscriptionNames.forEach(admin::createTopicIfNotExists); - consumerExecutor = Executors.newFixedThreadPool(subscriptionNames.size()); - messagesPerTopic = pubSubSettings.getMaxMessages() / subscriptionNames.size(); - subscribed = true; - } - List messages; - try { - messages = receiveMessages(); - if (!messages.isEmpty()) { - List result = new ArrayList<>(); - messages.forEach(msg -> { - try { - result.add(decode(msg.getMessage())); - } catch (InvalidProtocolBufferException e) { - log.error("Failed decode record: [{}]", msg); - } - }); - return result; - } - } catch (ExecutionException | InterruptedException e) { - if (stopped) { - log.info("[{}] Pub/Sub consumer is stopped.", topic); - } else { - log.error("Failed to receive messages", e); - } - } - } - return Collections.emptyList(); - } - - @Override - public void commit() { - acknowledgeRequests.forEach(subscriber.acknowledgeCallable()::futureCall); - acknowledgeRequests.clear(); + shutdownExecutor(); } private List receiveMessages() throws ExecutionException, InterruptedException { @@ -211,6 +172,7 @@ public class TbPubSubConsumerTemplate implements TbQueueCo return transform.get(); } + @Override public T decode(PubsubMessage message) throws InvalidProtocolBufferException { DefaultTbQueueMsg msg = gson.fromJson(message.getData().toStringUtf8(), DefaultTbQueueMsg.class); return decoder.decode(msg); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqConsumerTemplate.java index 25d7719163..45dc9d6a05 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqConsumerTemplate.java @@ -23,9 +23,9 @@ import com.rabbitmq.client.GetResponse; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.TbQueueAdmin; -import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.TbQueueMsgDecoder; +import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate; import org.thingsboard.server.queue.common.DefaultTbQueueMsg; import java.io.IOException; @@ -37,33 +37,26 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @Slf4j -public class TbRabbitMqConsumerTemplate implements TbQueueConsumer { +public class TbRabbitMqConsumerTemplate extends AbstractTbQueueConsumerTemplate { private final Gson gson = new Gson(); private final TbQueueAdmin admin; - private final String topic; private final TbQueueMsgDecoder decoder; - private final TbRabbitMqSettings rabbitMqSettings; private final Channel channel; private final Connection connection; - private volatile Set partitions; - private volatile boolean subscribed; private volatile Set queues; - private volatile boolean stopped; public TbRabbitMqConsumerTemplate(TbQueueAdmin admin, TbRabbitMqSettings rabbitMqSettings, String topic, TbQueueMsgDecoder decoder) { + super(topic); this.admin = admin; this.decoder = decoder; - this.topic = topic; - this.rabbitMqSettings = rabbitMqSettings; try { connection = rabbitMqSettings.getConnectionFactory().newConnection(); } catch (IOException | TimeoutException e) { log.error("Failed to create connection.", e); throw new RuntimeException("Failed to create connection.", e); } - try { channel = connection.createChannel(); } catch (IOException e) { @@ -74,25 +67,42 @@ public class TbRabbitMqConsumerTemplate implements TbQueue } @Override - public String getTopic() { - return topic; + protected List doPoll(long durationInMillis) { + List result = queues.stream() + .map(queue -> { + try { + return channel.basicGet(queue, false); + } catch (IOException e) { + log.error("Failed to get messages from queue: [{}]", queue); + throw new RuntimeException("Failed to get messages from queue.", e); + } + }).filter(Objects::nonNull).collect(Collectors.toList()); + if (result.size() > 0) { + return result; + } else { + return Collections.emptyList(); + } } @Override - public void subscribe() { - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true)); - subscribed = false; + protected void doSubscribe(List topicNames) { + queues = partitions.stream() + .map(TopicPartitionInfo::getFullTopicName) + .collect(Collectors.toSet()); + queues.forEach(admin::createTopicIfNotExists); } @Override - public void subscribe(Set partitions) { - this.partitions = partitions; - subscribed = false; + protected void doCommit() { + try { + channel.basicAck(0, true); + } catch (IOException e) { + log.error("Failed to ack messages.", e); + } } @Override - public void unsubscribe() { - stopped = true; + protected void doUnsubscribe() { if (channel != null) { try { channel.close(); @@ -109,63 +119,6 @@ public class TbRabbitMqConsumerTemplate implements TbQueue } } - @Override - public List poll(long durationInMillis) { - if (!subscribed && partitions == null) { - try { - Thread.sleep(durationInMillis); - } catch (InterruptedException e) { - log.debug("Failed to await subscription", e); - } - } else { - if (!subscribed) { - queues = partitions.stream() - .map(TopicPartitionInfo::getFullTopicName) - .collect(Collectors.toSet()); - - queues.forEach(admin::createTopicIfNotExists); - subscribed = true; - } - - List result = queues.stream() - .map(queue -> { - try { - return channel.basicGet(queue, false); - } catch (IOException e) { - log.error("Failed to get messages from queue: [{}]", queue); - throw new RuntimeException("Failed to get messages from queue.", e); - } - }).filter(Objects::nonNull).map(message -> { - try { - return decode(message); - } catch (InvalidProtocolBufferException e) { - log.error("Failed to decode message: [{}].", message); - throw new RuntimeException("Failed to decode message.", e); - } - }).collect(Collectors.toList()); - if (result.size() > 0) { - return result; - } - } - try { - Thread.sleep(durationInMillis); - } catch (InterruptedException e) { - if (!stopped) { - log.error("Failed to wait.", e); - } - } - return Collections.emptyList(); - } - - @Override - public void commit() { - try { - channel.basicAck(0, true); - } catch (IOException e) { - log.error("Failed to ack messages.", e); - } - } - public T decode(GetResponse message) throws InvalidProtocolBufferException { DefaultTbQueueMsg msg = gson.fromJson(new String(message.getBody()), DefaultTbQueueMsg.class); return decoder.decode(msg); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsConsumerTemplate.java index 3e71388844..317dd93902 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsConsumerTemplate.java @@ -25,21 +25,17 @@ import com.amazonaws.services.sqs.model.Message; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.Gson; import com.google.protobuf.InvalidProtocolBufferException; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.TbQueueAdmin; -import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.TbQueueMsgDecoder; +import org.thingsboard.server.queue.common.AbstractParallelTbQueueConsumerTemplate; import org.thingsboard.server.queue.common.DefaultTbQueueMsg; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -47,34 +43,28 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @Slf4j -public class TbAwsSqsConsumerTemplate implements TbQueueConsumer { +public class TbAwsSqsConsumerTemplate extends AbstractParallelTbQueueConsumerTemplate { private static final int MAX_NUM_MSGS = 10; private final Gson gson = new Gson(); private final TbQueueAdmin admin; private final AmazonSQS sqsClient; - private final String topic; private final TbQueueMsgDecoder decoder; private final TbAwsSqsSettings sqsSettings; private final List pendingMessages = new CopyOnWriteArrayList<>(); private volatile Set queueUrls; - private volatile Set partitions; - private ListeningExecutorService consumerExecutor; - private volatile boolean subscribed; - private volatile boolean stopped = false; public TbAwsSqsConsumerTemplate(TbQueueAdmin admin, TbAwsSqsSettings sqsSettings, String topic, TbQueueMsgDecoder decoder) { + super(topic); this.admin = admin; this.decoder = decoder; - this.topic = topic; this.sqsSettings = sqsSettings; AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey()); @@ -87,81 +77,64 @@ public class TbAwsSqsConsumerTemplate implements TbQueueCo } @Override - public String getTopic() { - return topic; + protected void doSubscribe(List topicNames) { + queueUrls = topicNames.stream().map(this::getQueueUrl).collect(Collectors.toSet()); + initNewExecutor(queueUrls.size() * sqsSettings.getThreadsPerTopic() + 1); } @Override - public void subscribe() { - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true)); - subscribed = false; + protected List doPoll(long durationInMillis) { + if (!pendingMessages.isEmpty()) { + log.warn("Present {} non committed messages.", pendingMessages.size()); + return Collections.emptyList(); + } + int duration = (int) TimeUnit.MILLISECONDS.toSeconds(durationInMillis); + List>> futureList = queueUrls + .stream() + .map(url -> poll(url, duration)) + .collect(Collectors.toList()); + ListenableFuture>> futureResult = Futures.allAsList(futureList); + try { + return futureResult.get().stream() + .flatMap(List::stream) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } catch (InterruptedException | ExecutionException e) { + if (stopped) { + log.info("[{}] Aws SQS consumer is stopped.", getTopic()); + } else { + log.error("Failed to pool messages.", e); + } + return Collections.emptyList(); + } } @Override - public void subscribe(Set partitions) { - this.partitions = partitions; - subscribed = false; + public T decode(Message message) throws InvalidProtocolBufferException { + DefaultTbQueueMsg msg = gson.fromJson(message.getBody(), DefaultTbQueueMsg.class); + return decoder.decode(msg); } @Override - public void unsubscribe() { - stopped = true; - - if (sqsClient != null) { - sqsClient.shutdown(); - } - if (consumerExecutor != null) { - consumerExecutor.shutdownNow(); - } + protected void doCommit() { + pendingMessages.forEach(msg -> + consumerExecutor.submit(() -> { + List entries = msg.getMessages() + .stream() + .map(message -> new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getReceiptHandle())) + .collect(Collectors.toList()); + sqsClient.deleteMessageBatch(msg.getUrl(), entries); + })); + pendingMessages.clear(); } @Override - public List poll(long durationInMillis) { - if (!subscribed && partitions == null) { - try { - Thread.sleep(durationInMillis); - } catch (InterruptedException e) { - log.debug("Failed to await subscription", e); - } - } else { - if (!subscribed) { - List topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); - queueUrls = topicNames.stream().map(this::getQueueUrl).collect(Collectors.toSet()); - consumerExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(queueUrls.size() * sqsSettings.getThreadsPerTopic() + 1)); - subscribed = true; - } - - if (!pendingMessages.isEmpty()) { - log.warn("Present {} non committed messages.", pendingMessages.size()); - return Collections.emptyList(); - } - - List>> futureList = queueUrls - .stream() - .map(url -> poll(url, (int) TimeUnit.MILLISECONDS.toSeconds(durationInMillis))) - .collect(Collectors.toList()); - ListenableFuture>> futureResult = Futures.allAsList(futureList); - try { - return futureResult.get().stream() - .flatMap(List::stream) - .map(msg -> { - try { - return decode(msg); - } catch (IOException e) { - log.error("Failed to decode message: [{}]", msg); - return null; - } - }).filter(Objects::nonNull) - .collect(Collectors.toList()); - } catch (InterruptedException | ExecutionException e) { - if (stopped) { - log.info("[{}] Aws SQS consumer is stopped.", topic); - } else { - log.error("Failed to pool messages.", e); - } - } + protected void doUnsubscribe() { + stopped = true; + if (sqsClient != null) { + sqsClient.shutdown(); } - return Collections.emptyList(); + shutdownExecutor(); } private ListenableFuture> poll(String url, int waitTimeSeconds) { @@ -194,25 +167,6 @@ public class TbAwsSqsConsumerTemplate implements TbQueueCo }, consumerExecutor); } - @Override - public void commit() { - pendingMessages.forEach(msg -> - consumerExecutor.submit(() -> { - List entries = msg.getMessages() - .stream() - .map(message -> new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getReceiptHandle())) - .collect(Collectors.toList()); - sqsClient.deleteMessageBatch(msg.getUrl(), entries); - })); - - pendingMessages.clear(); - } - - public T decode(Message message) throws InvalidProtocolBufferException { - DefaultTbQueueMsg msg = gson.fromJson(message.getBody(), DefaultTbQueueMsg.class); - return decoder.decode(msg); - } - @Data private static class AwsSqsMsgWrapper { private final String url;