diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java index c2705fcbdc..4007c9e17d 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java @@ -23,7 +23,6 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; -import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionChangeEvent; diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/BatchTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/BatchTbRuleEngineSubmitStrategy.java index d0b1f7f99a..b9741d2433 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/BatchTbRuleEngineSubmitStrategy.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/BatchTbRuleEngineSubmitStrategy.java @@ -23,7 +23,6 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; @@ -77,8 +76,8 @@ public class BatchTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitS } } int submitSize = pendingPack.size(); - if (log.isInfoEnabled() && submitSize > 0) { - log.info("[{}] submitting [{}] messages to rule engine", queueName, submitSize); + if (log.isDebugEnabled() && submitSize > 0) { + log.debug("[{}] submitting [{}] messages to rule engine", queueName, submitSize); } pendingPack.forEach(msgConsumer); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/BurstTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/BurstTbRuleEngineSubmitStrategy.java index ffd1dd49d1..3420933d3a 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/BurstTbRuleEngineSubmitStrategy.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/BurstTbRuleEngineSubmitStrategy.java @@ -19,14 +19,8 @@ import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import java.util.ArrayList; -import java.util.List; import java.util.UUID; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; -import java.util.function.Function; -import java.util.stream.Collectors; @Slf4j public class BurstTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitStrategy { @@ -37,8 +31,8 @@ public class BurstTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitS @Override public void submitAttempt(BiConsumer> msgConsumer) { - if (log.isInfoEnabled()) { - log.info("[{}] submitting [{}] messages to rule engine", queueName, orderedMsgList.size()); + if (log.isDebugEnabled()) { + log.debug("[{}] submitting [{}] messages to rule engine", queueName, orderedMsgList.size()); } orderedMsgList.forEach(pair -> msgConsumer.accept(pair.uuid, pair.msg)); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByEntityIdTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByEntityIdTbRuleEngineSubmitStrategy.java index ae5993cb1c..473810b86c 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByEntityIdTbRuleEngineSubmitStrategy.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByEntityIdTbRuleEngineSubmitStrategy.java @@ -15,26 +15,18 @@ */ package org.thingsboard.server.service.queue.processing; -import com.google.protobuf.InvalidProtocolBufferException; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.EntityIdFactory; -import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.common.msg.gen.MsgProtos; -import org.thingsboard.server.common.msg.queue.TbMsgCallback; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; -import java.util.stream.Collectors; @Slf4j public abstract class SequentialByEntityIdTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitStrategy { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByTenantIdTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByTenantIdTbRuleEngineSubmitStrategy.java index b258c6db1b..37e9419edd 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByTenantIdTbRuleEngineSubmitStrategy.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialByTenantIdTbRuleEngineSubmitStrategy.java @@ -30,6 +30,5 @@ public class SequentialByTenantIdTbRuleEngineSubmitStrategy extends SequentialBy @Override protected EntityId getEntityId(TransportProtos.ToRuleEngineMsg msg) { return new TenantId(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB())); - } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialTbRuleEngineSubmitStrategy.java index ef45b983fc..125a1d8ef8 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialTbRuleEngineSubmitStrategy.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/SequentialTbRuleEngineSubmitStrategy.java @@ -19,8 +19,6 @@ import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import java.util.LinkedHashMap; -import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; @@ -63,8 +61,8 @@ public class SequentialTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSu if (idx < listSize) { IdMsgPair pair = orderedMsgList.get(idx); expectedMsgId = pair.uuid; - if (log.isInfoEnabled()) { - log.info("[{}] submitting [{}] message to rule engine", queueName, pair.msg); + if (log.isDebugEnabled()) { + log.debug("[{}] submitting [{}] message to rule engine", queueName, pair.msg); } msgConsumer.accept(pair.uuid, pair.msg); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java index bbf283e962..80b0523a81 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java @@ -82,10 +82,10 @@ public class TbRuleEngineProcessingStrategyFactory { retryCount++; double failedCount = result.getFailedMap().size() + result.getPendingMap().size(); if (maxRetries > 0 && retryCount > maxRetries) { - log.info("[{}] Skip reprocess of the rule engine pack due to max retries", queueName); + log.debug("[{}] Skip reprocess of the rule engine pack due to max retries", queueName); return new TbRuleEngineProcessingDecision(true, null); } else if (maxAllowedFailurePercentage > 0 && (failedCount / initialTotalCount) > maxAllowedFailurePercentage) { - log.info("[{}] Skip reprocess of the rule engine pack due to max allowed failure percentage", queueName); + log.debug("[{}] Skip reprocess of the rule engine pack due to max allowed failure percentage", queueName); return new TbRuleEngineProcessingDecision(true, null); } else { ConcurrentMap> toReprocess = new ConcurrentHashMap<>(initialTotalCount); @@ -98,7 +98,7 @@ public class TbRuleEngineProcessingStrategyFactory { if (retrySuccessful) { result.getSuccessMap().forEach(toReprocess::put); } - log.info("[{}] Going to reprocess {} messages", queueName, toReprocess.size()); + log.debug("[{}] Going to reprocess {} messages", queueName, toReprocess.size()); if (log.isTraceEnabled()) { toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); } @@ -126,7 +126,7 @@ public class TbRuleEngineProcessingStrategyFactory { @Override public TbRuleEngineProcessingDecision analyze(TbRuleEngineProcessingResult result) { if (!result.isSuccess()) { - log.info("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size()); + log.debug("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size()); } if (log.isTraceEnabled()) { result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java index f108de4a43..3355c55d7e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java @@ -67,6 +67,7 @@ public class TbServiceBusAdmin implements TbQueueAdmin { try { QueueDescription queueDescription = new QueueDescription(topic); + queueDescription.setRequiresDuplicateDetection(false); setQueueConfigs(queueDescription); client.createQueue(queueDescription); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusConsumerTemplate.java index cca599d59a..734c79d927 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusConsumerTemplate.java @@ -31,9 +31,9 @@ import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.springframework.util.CollectionUtils; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.TbQueueAdmin; -import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.TbQueueMsgDecoder; +import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate; import org.thingsboard.server.queue.common.DefaultTbQueueMsg; import java.time.Duration; @@ -50,102 +50,72 @@ import java.util.stream.Collectors; import java.util.stream.Stream; @Slf4j -public class TbServiceBusConsumerTemplate implements TbQueueConsumer { +public class TbServiceBusConsumerTemplate extends AbstractTbQueueConsumerTemplate { private final TbQueueAdmin admin; - private final String topic; private final TbQueueMsgDecoder decoder; private final TbServiceBusSettings serviceBusSettings; private final Gson gson = new Gson(); private Set receivers; - private volatile Set partitions; - private volatile boolean subscribed; - private volatile boolean stopped = false; - private Map> pendingMessages = new ConcurrentHashMap<>(); + private final Map> pendingMessages = new ConcurrentHashMap<>(); private volatile int messagesPerQueue; public TbServiceBusConsumerTemplate(TbQueueAdmin admin, TbServiceBusSettings serviceBusSettings, String topic, TbQueueMsgDecoder decoder) { + super(topic); this.admin = admin; this.decoder = decoder; - this.topic = topic; this.serviceBusSettings = serviceBusSettings; } @Override - public String getTopic() { - return topic; + protected List doPoll(long durationInMillis) { + List>> messageFutures = + receivers.stream() + .map(receiver -> receiver + .receiveAsync(messagesPerQueue, Duration.ofMillis(durationInMillis)) + .whenComplete((messages, err) -> { + if (!CollectionUtils.isEmpty(messages)) { + pendingMessages.put(receiver, messages); + } else if (err != null) { + log.error("Failed to receive messages.", err); + } + })) + .collect(Collectors.toList()); + try { + return fromList(messageFutures) + .get() + .stream() + .flatMap(messages -> CollectionUtils.isEmpty(messages) ? Stream.empty() : messages.stream()) + .collect(Collectors.toList()); + } catch (InterruptedException | ExecutionException e) { + if (stopped) { + log.info("[{}] Service Bus consumer is stopped.", getTopic()); + } else { + log.error("Failed to receive messages", e); + } + return Collections.emptyList(); + } } @Override - public void subscribe() { - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true)); - subscribed = false; + protected void doSubscribe(List topicNames) { + createReceivers(); + messagesPerQueue = receivers.size() / partitions.size(); } @Override - public void subscribe(Set partitions) { - this.partitions = partitions; - subscribed = false; + protected void doCommit() { + pendingMessages.forEach((receiver, msgs) -> + msgs.forEach(msg -> receiver.completeMessageAsync(msg.getDeliveryTag(), TransactionContext.NULL_TXN))); + pendingMessages.clear(); } @Override - public void unsubscribe() { - stopped = true; + protected void doUnsubscribe() { receivers.forEach(CoreMessageReceiver::closeAsync); } - @Override - public List poll(long durationInMillis) { - if (!subscribed && partitions == null) { - try { - Thread.sleep(durationInMillis); - } catch (InterruptedException e) { - log.debug("Failed to await subscription", e); - } - } else { - if (!subscribed) { - createReceivers(); - messagesPerQueue = receivers.size() / partitions.size(); - subscribed = true; - } - - List>> messageFutures = - receivers.stream() - .map(receiver -> receiver - .receiveAsync(messagesPerQueue, Duration.ofMillis(durationInMillis)) - .whenComplete((messages, err) -> { - if (!CollectionUtils.isEmpty(messages)) { - pendingMessages.put(receiver, messages); - } else if (err != null) { - log.error("Failed to receive messages.", err); - } - })) - .collect(Collectors.toList()); - try { - return fromList(messageFutures) - .get() - .stream() - .flatMap(messages -> CollectionUtils.isEmpty(messages) ? Stream.empty() : messages.stream()) - .map(message -> { - try { - return decode(message); - } catch (InvalidProtocolBufferException e) { - log.error("Failed to parse message.", e); - throw new RuntimeException("Failed to parse message.", e); - } - }).collect(Collectors.toList()); - } catch (InterruptedException | ExecutionException e) { - if (stopped) { - log.info("[{}] Service Bus consumer is stopped.", topic); - } else { - log.error("Failed to receive messages", e); - } - } - } - return Collections.emptyList(); - } - private void createReceivers() { List> receiverFutures = partitions.stream() .map(TopicPartitionInfo::getFullTopicName) @@ -167,7 +137,7 @@ public class TbServiceBusConsumerTemplate implements TbQue receivers = new HashSet<>(fromList(receiverFutures).get()); } catch (InterruptedException | ExecutionException e) { if (stopped) { - log.info("[{}] Service Bus consumer is stopped.", topic); + log.info("[{}] Service Bus consumer is stopped.", getTopic()); } else { log.error("Failed to create receivers", e); } @@ -196,13 +166,7 @@ public class TbServiceBusConsumerTemplate implements TbQue } @Override - public void commit() { - pendingMessages.forEach((receiver, msgs) -> - msgs.forEach(msg -> receiver.completeMessageAsync(msg.getDeliveryTag(), TransactionContext.NULL_TXN))); - pendingMessages.clear(); - } - - private T decode(MessageWithDeliveryTag data) throws InvalidProtocolBufferException { + protected T decode(MessageWithDeliveryTag data) throws InvalidProtocolBufferException { DefaultTbQueueMsg msg = gson.fromJson(new String(((Data) data.getMessage().getBody()).getValue().getArray()), DefaultTbQueueMsg.class); return decoder.decode(msg); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusProducerTemplate.java index 5d9a931378..3c5228198d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusProducerTemplate.java @@ -33,6 +33,7 @@ import org.thingsboard.server.queue.common.DefaultTbQueueMsg; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -42,14 +43,14 @@ public class TbServiceBusProducerTemplate implements TbQue private final Gson gson = new Gson(); private final TbQueueAdmin admin; private final TbServiceBusSettings serviceBusSettings; - private final Map clients = new HashMap<>(); - private ExecutorService executorService; + private final Map clients = new ConcurrentHashMap<>(); + private final ExecutorService executorService; public TbServiceBusProducerTemplate(TbQueueAdmin admin, TbServiceBusSettings serviceBusSettings, String defaultTopic) { this.admin = admin; this.defaultTopic = defaultTopic; this.serviceBusSettings = serviceBusSettings; - executorService = Executors.newSingleThreadExecutor(); + executorService = Executors.newCachedThreadPool(); } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractParallelTbQueueConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractParallelTbQueueConsumerTemplate.java new file mode 100644 index 0000000000..bb83a79250 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractParallelTbQueueConsumerTemplate.java @@ -0,0 +1,53 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.common; + +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.queue.TbQueueMsg; + +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +@Slf4j +public abstract class AbstractParallelTbQueueConsumerTemplate extends AbstractTbQueueConsumerTemplate { + + protected ListeningExecutorService consumerExecutor; + + public AbstractParallelTbQueueConsumerTemplate(String topic) { + super(topic); + } + + protected void initNewExecutor(int threadPoolSize) { + if (consumerExecutor != null) { + consumerExecutor.shutdown(); + try { + consumerExecutor.awaitTermination(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + log.trace("Interrupted while waiting for consumer executor to stop"); + } + } + consumerExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadPoolSize)); + } + + protected void shutdownExecutor() { + if (consumerExecutor != null) { + consumerExecutor.shutdownNow(); + } + } + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java new file mode 100644 index 0000000000..c8cc545601 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java @@ -0,0 +1,152 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.common; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.queue.TbQueueConsumer; +import org.thingsboard.server.queue.TbQueueMsg; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + +@Slf4j +public abstract class AbstractTbQueueConsumerTemplate implements TbQueueConsumer { + + private volatile boolean subscribed; + protected volatile boolean stopped = false; + protected volatile Set partitions; + protected final Lock consumerLock = new ReentrantLock(); + + @Getter + private final String topic; + + public AbstractTbQueueConsumerTemplate(String topic) { + this.topic = topic; + } + + @Override + public void subscribe() { + consumerLock.lock(); + try { + partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true)); + subscribed = false; + } finally { + consumerLock.unlock(); + } + } + + @Override + public void subscribe(Set partitions) { + consumerLock.lock(); + try { + this.partitions = partitions; + subscribed = false; + } finally { + consumerLock.unlock(); + } + } + + @Override + public List poll(long durationInMillis) { + if (!subscribed && partitions == null) { + try { + Thread.sleep(durationInMillis); + } catch (InterruptedException e) { + log.debug("Failed to await subscription", e); + } + } else { + long pollStartTs = System.currentTimeMillis(); + consumerLock.lock(); + try { + if (!subscribed) { + List topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); + doSubscribe(topicNames); + subscribed = true; + } + + List records = doPoll(durationInMillis); + if (!records.isEmpty()) { + List result = new ArrayList<>(records.size()); + records.forEach(record -> { + try { + if (record != null) { + result.add(decode(record)); + } + } catch (IOException e) { + log.error("Failed decode record: [{}]", record); + throw new RuntimeException("Failed to decode record: ", e); + } + }); + return result; + } else { + long pollDuration = System.currentTimeMillis() - pollStartTs; + if (pollDuration < durationInMillis) { + try { + Thread.sleep(durationInMillis - pollDuration); + } catch (InterruptedException e) { + if (!stopped) { + log.error("Failed to wait.", e); + } + } + } + } + } finally { + consumerLock.unlock(); + } + } + return Collections.emptyList(); + } + + @Override + public void commit() { + consumerLock.lock(); + try { + doCommit(); + } finally { + consumerLock.unlock(); + } + } + + @Override + public void unsubscribe() { + stopped = true; + consumerLock.lock(); + try { + doUnsubscribe(); + } finally { + consumerLock.unlock(); + } + } + + abstract protected List doPoll(long durationInMillis); + + abstract protected T decode(R record) throws IOException; + + abstract protected void doSubscribe(List topicNames); + + abstract protected void doCommit(); + + abstract protected void doUnsubscribe(); + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java index 541337579d..1b5619fd26 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java @@ -16,16 +16,14 @@ package org.thingsboard.server.queue.kafka; import lombok.Builder; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.TbQueueAdmin; -import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueMsg; +import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate; import java.io.IOException; import java.time.Duration; @@ -33,26 +31,16 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Properties; -import java.util.Set; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; /** * Created by ashvayka on 24.09.18. */ @Slf4j -public class TbKafkaConsumerTemplate implements TbQueueConsumer { +public class TbKafkaConsumerTemplate extends AbstractTbQueueConsumerTemplate, T> { private final TbQueueAdmin admin; private final KafkaConsumer consumer; private final TbKafkaDecoder decoder; - private volatile boolean subscribed; - private volatile Set partitions; - private final Lock consumerLock; - - @Getter - private final String topic; @Builder private TbKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder decoder, @@ -60,6 +48,7 @@ public class TbKafkaConsumerTemplate implements TbQueueCon boolean autoCommit, int autoCommitIntervalMs, int maxPollRecords, TbQueueAdmin admin) { + super(topic); Properties props = settings.toProps(); props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); if (groupId != null) { @@ -75,86 +64,42 @@ public class TbKafkaConsumerTemplate implements TbQueueCon this.admin = admin; this.consumer = new KafkaConsumer<>(props); this.decoder = decoder; - this.topic = topic; - this.consumerLock = new ReentrantLock(); - } - - @Override - public void subscribe() { - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true)); - subscribed = false; } @Override - public void subscribe(Set partitions) { - this.partitions = partitions; - subscribed = false; + protected void doSubscribe(List topicNames) { + topicNames.forEach(admin::createTopicIfNotExists); + consumer.subscribe(topicNames); } @Override - public List poll(long durationInMillis) { - if (!subscribed && partitions == null) { - try { - Thread.sleep(durationInMillis); - } catch (InterruptedException e) { - log.debug("Failed to await subscription", e); - } + protected List> doPoll(long durationInMillis) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(durationInMillis)); + if (records.isEmpty()) { + return Collections.emptyList(); } else { - try { - consumerLock.lock(); - - if (!subscribed) { - List topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); - topicNames.forEach(admin::createTopicIfNotExists); - consumer.unsubscribe(); - consumer.subscribe(topicNames); - subscribed = true; - } - - ConsumerRecords records = consumer.poll(Duration.ofMillis(durationInMillis)); - if (records.count() > 0) { - List result = new ArrayList<>(); - records.forEach(record -> { - try { - result.add(decode(record)); - } catch (IOException e) { - log.error("Failed decode record: [{}]", record); - } - }); - return result; - } - } finally { - consumerLock.unlock(); - } + List> recordList = new ArrayList<>(256); + records.forEach(recordList::add); + return recordList; } - return Collections.emptyList(); } @Override - public void commit() { - try { - consumerLock.lock(); - consumer.commitAsync(); - } finally { - consumerLock.unlock(); - } + public T decode(ConsumerRecord record) throws IOException { + return decoder.decode(new KafkaTbQueueMsg(record)); } @Override - public void unsubscribe() { - try { - consumerLock.lock(); - if (consumer != null) { - consumer.unsubscribe(); - consumer.close(); - } - } finally { - consumerLock.unlock(); - } + protected void doCommit() { + consumer.commitAsync(); } - public T decode(ConsumerRecord record) throws IOException { - return decoder.decode(new KafkaTbQueueMsg(record)); + @Override + protected void doUnsubscribe() { + if (consumer != null) { + consumer.unsubscribe(); + consumer.close(); + } } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java index 495c895a91..b5b6126cd5 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java @@ -30,27 +30,25 @@ import com.google.pubsub.v1.PullResponse; import com.google.pubsub.v1.ReceivedMessage; import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.TbQueueAdmin; -import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.TbQueueMsgDecoder; +import org.thingsboard.server.queue.common.AbstractParallelTbQueueConsumerTemplate; import org.thingsboard.server.queue.common.DefaultTbQueueMsg; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.List; import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.stream.Collectors; @Slf4j -public class TbPubSubConsumerTemplate implements TbQueueConsumer { +public class TbPubSubConsumerTemplate extends AbstractParallelTbQueueConsumerTemplate { private final Gson gson = new Gson(); private final TbQueueAdmin admin; @@ -58,23 +56,18 @@ public class TbPubSubConsumerTemplate implements TbQueueCo private final TbQueueMsgDecoder decoder; private final TbPubSubSettings pubSubSettings; - private volatile boolean subscribed; - private volatile Set partitions; private volatile Set subscriptionNames; private final List acknowledgeRequests = new CopyOnWriteArrayList<>(); - private ExecutorService consumerExecutor; private final SubscriberStub subscriber; - private volatile boolean stopped; - private volatile int messagesPerTopic; public TbPubSubConsumerTemplate(TbQueueAdmin admin, TbPubSubSettings pubSubSettings, String topic, TbQueueMsgDecoder decoder) { + super(topic); this.admin = admin; this.pubSubSettings = pubSubSettings; this.topic = topic; this.decoder = decoder; - try { SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder() @@ -84,94 +77,50 @@ public class TbPubSubConsumerTemplate implements TbQueueCo .setMaxInboundMessageSize(pubSubSettings.getMaxMsgSize()) .build()) .build(); - this.subscriber = GrpcSubscriberStub.create(subscriberStubSettings); } catch (IOException e) { log.error("Failed to create subscriber.", e); throw new RuntimeException("Failed to create subscriber.", e); } - stopped = false; } @Override - public String getTopic() { - return topic; + protected List doPoll(long durationInMillis) { + try { + List messages = receiveMessages(); + if (!messages.isEmpty()) { + return messages.stream().map(ReceivedMessage::getMessage).collect(Collectors.toList()); + } + } catch (ExecutionException | InterruptedException e) { + if (stopped) { + log.info("[{}] Pub/Sub consumer is stopped.", topic); + } else { + log.error("Failed to receive messages", e); + } + } + return Collections.emptyList(); } @Override - public void subscribe() { - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true)); - subscribed = false; + protected void doSubscribe(List topicNames) { + subscriptionNames = new LinkedHashSet<>(topicNames); + subscriptionNames.forEach(admin::createTopicIfNotExists); + initNewExecutor(subscriptionNames.size() + 1); + messagesPerTopic = pubSubSettings.getMaxMessages() / subscriptionNames.size(); } @Override - public void subscribe(Set partitions) { - this.partitions = partitions; - subscribed = false; + protected void doCommit() { + acknowledgeRequests.forEach(subscriber.acknowledgeCallable()::futureCall); + acknowledgeRequests.clear(); } @Override - public void unsubscribe() { - stopped = true; - if (consumerExecutor != null) { - consumerExecutor.shutdownNow(); - } - + protected void doUnsubscribe() { if (subscriber != null) { subscriber.close(); } - } - - @Override - public List poll(long durationInMillis) { - if (!subscribed && partitions == null) { - try { - Thread.sleep(durationInMillis); - } catch (InterruptedException e) { - log.debug("Failed to await subscription", e); - } - } else { - if (!subscribed) { - subscriptionNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toSet()); - subscriptionNames.forEach(admin::createTopicIfNotExists); - - if (consumerExecutor != null) { - consumerExecutor.shutdown(); - } - - consumerExecutor = Executors.newFixedThreadPool(subscriptionNames.size()); - messagesPerTopic = pubSubSettings.getMaxMessages() / subscriptionNames.size(); - subscribed = true; - } - List messages; - try { - messages = receiveMessages(); - if (!messages.isEmpty()) { - List result = new ArrayList<>(); - messages.forEach(msg -> { - try { - result.add(decode(msg.getMessage())); - } catch (InvalidProtocolBufferException e) { - log.error("Failed decode record: [{}]", msg); - } - }); - return result; - } - } catch (ExecutionException | InterruptedException e) { - if (stopped) { - log.info("[{}] Pub/Sub consumer is stopped.", topic); - } else { - log.error("Failed to receive messages", e); - } - } - } - return Collections.emptyList(); - } - - @Override - public void commit() { - acknowledgeRequests.forEach(subscriber.acknowledgeCallable()::futureCall); - acknowledgeRequests.clear(); + shutdownExecutor(); } private List receiveMessages() throws ExecutionException, InterruptedException { @@ -180,7 +129,7 @@ public class TbPubSubConsumerTemplate implements TbQueueCo PullRequest pullRequest = PullRequest.newBuilder() .setMaxMessages(messagesPerTopic) - .setReturnImmediately(false) // return immediately if messages are not available +// .setReturnImmediately(false) // return immediately if messages are not available .setSubscription(subscriptionName) .build(); @@ -216,6 +165,7 @@ public class TbPubSubConsumerTemplate implements TbQueueCo return transform.get(); } + @Override public T decode(PubsubMessage message) throws InvalidProtocolBufferException { DefaultTbQueueMsg msg = gson.fromJson(message.getData().toStringUtf8(), DefaultTbQueueMsg.class); return decoder.decode(msg); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubProducerTemplate.java index 7a073616fd..ea713827e5 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubProducerTemplate.java @@ -49,7 +49,7 @@ public class TbPubSubProducerTemplate implements TbQueuePr private final Map publisherMap = new ConcurrentHashMap<>(); - private ExecutorService pubExecutor = Executors.newCachedThreadPool(); + private final ExecutorService pubExecutor = Executors.newCachedThreadPool(); public TbPubSubProducerTemplate(TbQueueAdmin admin, TbPubSubSettings pubSubSettings, String defaultTopic) { this.defaultTopic = defaultTopic; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqAdmin.java index 3bef6deb84..676ee5354f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqAdmin.java @@ -27,13 +27,11 @@ import java.util.concurrent.TimeoutException; @Slf4j public class TbRabbitMqAdmin implements TbQueueAdmin { - private final TbRabbitMqSettings rabbitMqSettings; private final Channel channel; private final Connection connection; private final Map arguments; public TbRabbitMqAdmin(TbRabbitMqSettings rabbitMqSettings, Map arguments) { - this.rabbitMqSettings = rabbitMqSettings; this.arguments = arguments; try { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqConsumerTemplate.java index 25d7719163..45dc9d6a05 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqConsumerTemplate.java @@ -23,9 +23,9 @@ import com.rabbitmq.client.GetResponse; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.TbQueueAdmin; -import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.TbQueueMsgDecoder; +import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate; import org.thingsboard.server.queue.common.DefaultTbQueueMsg; import java.io.IOException; @@ -37,33 +37,26 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @Slf4j -public class TbRabbitMqConsumerTemplate implements TbQueueConsumer { +public class TbRabbitMqConsumerTemplate extends AbstractTbQueueConsumerTemplate { private final Gson gson = new Gson(); private final TbQueueAdmin admin; - private final String topic; private final TbQueueMsgDecoder decoder; - private final TbRabbitMqSettings rabbitMqSettings; private final Channel channel; private final Connection connection; - private volatile Set partitions; - private volatile boolean subscribed; private volatile Set queues; - private volatile boolean stopped; public TbRabbitMqConsumerTemplate(TbQueueAdmin admin, TbRabbitMqSettings rabbitMqSettings, String topic, TbQueueMsgDecoder decoder) { + super(topic); this.admin = admin; this.decoder = decoder; - this.topic = topic; - this.rabbitMqSettings = rabbitMqSettings; try { connection = rabbitMqSettings.getConnectionFactory().newConnection(); } catch (IOException | TimeoutException e) { log.error("Failed to create connection.", e); throw new RuntimeException("Failed to create connection.", e); } - try { channel = connection.createChannel(); } catch (IOException e) { @@ -74,25 +67,42 @@ public class TbRabbitMqConsumerTemplate implements TbQueue } @Override - public String getTopic() { - return topic; + protected List doPoll(long durationInMillis) { + List result = queues.stream() + .map(queue -> { + try { + return channel.basicGet(queue, false); + } catch (IOException e) { + log.error("Failed to get messages from queue: [{}]", queue); + throw new RuntimeException("Failed to get messages from queue.", e); + } + }).filter(Objects::nonNull).collect(Collectors.toList()); + if (result.size() > 0) { + return result; + } else { + return Collections.emptyList(); + } } @Override - public void subscribe() { - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true)); - subscribed = false; + protected void doSubscribe(List topicNames) { + queues = partitions.stream() + .map(TopicPartitionInfo::getFullTopicName) + .collect(Collectors.toSet()); + queues.forEach(admin::createTopicIfNotExists); } @Override - public void subscribe(Set partitions) { - this.partitions = partitions; - subscribed = false; + protected void doCommit() { + try { + channel.basicAck(0, true); + } catch (IOException e) { + log.error("Failed to ack messages.", e); + } } @Override - public void unsubscribe() { - stopped = true; + protected void doUnsubscribe() { if (channel != null) { try { channel.close(); @@ -109,63 +119,6 @@ public class TbRabbitMqConsumerTemplate implements TbQueue } } - @Override - public List poll(long durationInMillis) { - if (!subscribed && partitions == null) { - try { - Thread.sleep(durationInMillis); - } catch (InterruptedException e) { - log.debug("Failed to await subscription", e); - } - } else { - if (!subscribed) { - queues = partitions.stream() - .map(TopicPartitionInfo::getFullTopicName) - .collect(Collectors.toSet()); - - queues.forEach(admin::createTopicIfNotExists); - subscribed = true; - } - - List result = queues.stream() - .map(queue -> { - try { - return channel.basicGet(queue, false); - } catch (IOException e) { - log.error("Failed to get messages from queue: [{}]", queue); - throw new RuntimeException("Failed to get messages from queue.", e); - } - }).filter(Objects::nonNull).map(message -> { - try { - return decode(message); - } catch (InvalidProtocolBufferException e) { - log.error("Failed to decode message: [{}].", message); - throw new RuntimeException("Failed to decode message.", e); - } - }).collect(Collectors.toList()); - if (result.size() > 0) { - return result; - } - } - try { - Thread.sleep(durationInMillis); - } catch (InterruptedException e) { - if (!stopped) { - log.error("Failed to wait.", e); - } - } - return Collections.emptyList(); - } - - @Override - public void commit() { - try { - channel.basicAck(0, true); - } catch (IOException e) { - log.error("Failed to ack messages.", e); - } - } - public T decode(GetResponse message) throws InvalidProtocolBufferException { DefaultTbQueueMsg msg = gson.fromJson(new String(message.getBody()), DefaultTbQueueMsg.class); return decoder.decode(msg); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqProducerTemplate.java index 91b46213a5..a58b817f78 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqProducerTemplate.java @@ -30,6 +30,8 @@ import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.DefaultTbQueueMsg; import java.io.IOException; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.TimeoutException; @@ -39,10 +41,12 @@ public class TbRabbitMqProducerTemplate implements TbQueue private final Gson gson = new Gson(); private final TbQueueAdmin admin; private final TbRabbitMqSettings rabbitMqSettings; - private ListeningExecutorService producerExecutor; + private final ListeningExecutorService producerExecutor; private final Channel channel; private final Connection connection; + private final Set topics = ConcurrentHashMap.newKeySet(); + public TbRabbitMqProducerTemplate(TbQueueAdmin admin, TbRabbitMqSettings rabbitMqSettings, String defaultTopic) { this.admin = admin; this.defaultTopic = defaultTopic; @@ -75,6 +79,7 @@ public class TbRabbitMqProducerTemplate implements TbQueue @Override public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) { + createTopicIfNotExist(tpi); AMQP.BasicProperties properties = new AMQP.BasicProperties(); try { channel.basicPublish(rabbitMqSettings.getExchangeName(), tpi.getFullTopicName(), properties, gson.toJson(new DefaultTbQueueMsg(msg)).getBytes()); @@ -110,4 +115,11 @@ public class TbRabbitMqProducerTemplate implements TbQueue } } + private void createTopicIfNotExist(TopicPartitionInfo tpi) { + if (topics.contains(tpi)) { + return; + } + admin.createTopicIfNotExists(tpi.getFullTopicName()); + topics.add(tpi); + } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsConsumerTemplate.java index b66cad1504..f4f279a02b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsConsumerTemplate.java @@ -25,21 +25,17 @@ import com.amazonaws.services.sqs.model.Message; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.Gson; import com.google.protobuf.InvalidProtocolBufferException; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.TbQueueAdmin; -import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.TbQueueMsgDecoder; +import org.thingsboard.server.queue.common.AbstractParallelTbQueueConsumerTemplate; import org.thingsboard.server.queue.common.DefaultTbQueueMsg; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -47,34 +43,28 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @Slf4j -public class TbAwsSqsConsumerTemplate implements TbQueueConsumer { +public class TbAwsSqsConsumerTemplate extends AbstractParallelTbQueueConsumerTemplate { private static final int MAX_NUM_MSGS = 10; private final Gson gson = new Gson(); private final TbQueueAdmin admin; private final AmazonSQS sqsClient; - private final String topic; private final TbQueueMsgDecoder decoder; private final TbAwsSqsSettings sqsSettings; private final List pendingMessages = new CopyOnWriteArrayList<>(); private volatile Set queueUrls; - private volatile Set partitions; - private ListeningExecutorService consumerExecutor; - private volatile boolean subscribed; - private volatile boolean stopped = false; public TbAwsSqsConsumerTemplate(TbQueueAdmin admin, TbAwsSqsSettings sqsSettings, String topic, TbQueueMsgDecoder decoder) { + super(topic); this.admin = admin; this.decoder = decoder; - this.topic = topic; this.sqsSettings = sqsSettings; AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey()); @@ -87,86 +77,60 @@ public class TbAwsSqsConsumerTemplate implements TbQueueCo } @Override - public String getTopic() { - return topic; + protected void doSubscribe(List topicNames) { + queueUrls = topicNames.stream().map(this::getQueueUrl).collect(Collectors.toSet()); + initNewExecutor(queueUrls.size() * sqsSettings.getThreadsPerTopic() + 1); } @Override - public void subscribe() { - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true)); - subscribed = false; + protected List doPoll(long durationInMillis) { + int duration = (int) TimeUnit.MILLISECONDS.toSeconds(durationInMillis); + List>> futureList = queueUrls + .stream() + .map(url -> poll(url, duration)) + .collect(Collectors.toList()); + ListenableFuture>> futureResult = Futures.allAsList(futureList); + try { + return futureResult.get().stream() + .flatMap(List::stream) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } catch (InterruptedException | ExecutionException e) { + if (stopped) { + log.info("[{}] Aws SQS consumer is stopped.", getTopic()); + } else { + log.error("Failed to pool messages.", e); + } + return Collections.emptyList(); + } } @Override - public void subscribe(Set partitions) { - this.partitions = partitions; - subscribed = false; + public T decode(Message message) throws InvalidProtocolBufferException { + DefaultTbQueueMsg msg = gson.fromJson(message.getBody(), DefaultTbQueueMsg.class); + return decoder.decode(msg); } @Override - public void unsubscribe() { - stopped = true; - - if (sqsClient != null) { - sqsClient.shutdown(); - } - if (consumerExecutor != null) { - consumerExecutor.shutdownNow(); - } + protected void doCommit() { + pendingMessages.forEach(msg -> + consumerExecutor.submit(() -> { + List entries = msg.getMessages() + .stream() + .map(message -> new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getReceiptHandle())) + .collect(Collectors.toList()); + sqsClient.deleteMessageBatch(msg.getUrl(), entries); + })); + pendingMessages.clear(); } @Override - public List poll(long durationInMillis) { - if (!subscribed && partitions == null) { - try { - Thread.sleep(durationInMillis); - } catch (InterruptedException e) { - log.debug("Failed to await subscription", e); - } - } else { - if (!subscribed) { - List topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); - queueUrls = topicNames.stream().map(this::getQueueUrl).collect(Collectors.toSet()); - - if (consumerExecutor != null) { - consumerExecutor.shutdown(); - } - - consumerExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(queueUrls.size() * sqsSettings.getThreadsPerTopic() + 1)); - subscribed = true; - } - - if (!pendingMessages.isEmpty()) { - log.warn("Present {} non committed messages.", pendingMessages.size()); - return Collections.emptyList(); - } - - List>> futureList = queueUrls - .stream() - .map(url -> poll(url, (int) TimeUnit.MILLISECONDS.toSeconds(durationInMillis))) - .collect(Collectors.toList()); - ListenableFuture>> futureResult = Futures.allAsList(futureList); - try { - return futureResult.get().stream() - .flatMap(List::stream) - .map(msg -> { - try { - return decode(msg); - } catch (IOException e) { - log.error("Failed to decode message: [{}]", msg); - return null; - } - }).filter(Objects::nonNull) - .collect(Collectors.toList()); - } catch (InterruptedException | ExecutionException e) { - if (stopped) { - log.info("[{}] Aws SQS consumer is stopped.", topic); - } else { - log.error("Failed to pool messages.", e); - } - } + protected void doUnsubscribe() { + stopped = true; + if (sqsClient != null) { + sqsClient.shutdown(); } - return Collections.emptyList(); + shutdownExecutor(); } private ListenableFuture> poll(String url, int waitTimeSeconds) { @@ -198,25 +162,6 @@ public class TbAwsSqsConsumerTemplate implements TbQueueCo }, consumerExecutor); } - @Override - public void commit() { - pendingMessages.forEach(msg -> - consumerExecutor.submit(() -> { - List entries = msg.getMessages() - .stream() - .map(message -> new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getReceiptHandle())) - .collect(Collectors.toList()); - sqsClient.deleteMessageBatch(msg.getUrl(), entries); - })); - - pendingMessages.clear(); - } - - public T decode(Message message) throws InvalidProtocolBufferException { - DefaultTbQueueMsg msg = gson.fromJson(message.getBody(), DefaultTbQueueMsg.class); - return decoder.decode(msg); - } - @Data private static class AwsSqsMsgWrapper { private final String url; diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 2f0d0fb3b3..9061f3e2de 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -28,7 +28,7 @@ services: ZOO_SERVERS: server.1=zookeeper:2888:3888;zookeeper:2181 kafka: restart: always - image: "wurstmeister/kafka:2.12-2.2.1" + image: "wurstmeister/kafka:2.12-2.3.0" ports: - "9092:9092" env_file: diff --git a/msa/js-executor/queue/awsSqsTemplate.js b/msa/js-executor/queue/awsSqsTemplate.js index 5f95de7d32..a74d9d5b57 100644 --- a/msa/js-executor/queue/awsSqsTemplate.js +++ b/msa/js-executor/queue/awsSqsTemplate.js @@ -28,7 +28,7 @@ const secretAccessKey = config.get('aws_sqs.secret_access_key'); const region = config.get('aws_sqs.region'); const AWS = require('aws-sdk'); const queueProperties = config.get('aws_sqs.queue_properties'); -const poolInterval = config.get('js.response_poll_interval'); +const pollInterval = config.get('js.response_poll_interval'); let queueAttributes = {FifoQueue: 'true'}; let sqsClient; @@ -52,7 +52,12 @@ function AwsSqsProducer() { queueUrls.set(responseTopic, responseQueueUrl); } - let params = {MessageBody: msgBody, QueueUrl: responseQueueUrl, MessageGroupId: 'js_eval', MessageDeduplicationId: uuid()}; + let params = { + MessageBody: msgBody, + QueueUrl: responseQueueUrl, + MessageGroupId: 'js_eval', + MessageDeduplicationId: uuid() + }; return new Promise((resolve, reject) => { sqsClient.sendMessage(params, function (err, data) { @@ -98,6 +103,7 @@ function AwsSqsProducer() { WaitTimeSeconds: poolInterval / 1000 }; while (!stopped) { + let pollStartTs = new Date().getTime(); const messages = await new Promise((resolve, reject) => { sqsClient.receiveMessage(params, function (err, data) { if (err) { @@ -130,6 +136,11 @@ function AwsSqsProducer() { //do nothing } }); + } else { + let pollDuration = new Date().getTime() - pollStartTs; + if (pollDuration < pollInterval) { + await sleep(pollInterval - pollDuration); + } } } } catch (e) { @@ -178,6 +189,12 @@ function parseQueueProperties() { }); } +function sleep(ms) { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} + process.on('exit', () => { stopped = true; logger.info('Aws Sqs client stopped.'); diff --git a/msa/js-executor/queue/rabbitmqTemplate.js b/msa/js-executor/queue/rabbitmqTemplate.js index 732206ff11..2aeb5c02db 100644 --- a/msa/js-executor/queue/rabbitmqTemplate.js +++ b/msa/js-executor/queue/rabbitmqTemplate.js @@ -27,22 +27,22 @@ const vhost = config.get('rabbitmq.virtual_host'); const username = config.get('rabbitmq.username'); const password = config.get('rabbitmq.password'); const queueProperties = config.get('rabbitmq.queue_properties'); -const poolInterval = config.get('js.response_poll_interval'); +const pollInterval = config.get('js.response_poll_interval'); const amqp = require('amqplib/callback_api'); -let queueParams = {durable: false, exclusive: false, autoDelete: false}; +let queueOptions = {durable: false, exclusive: false, autoDelete: false}; let connection; let channel; let stopped = false; -const responseTopics = []; +let queues = []; function RabbitMqProducer() { this.send = async (responseTopic, scriptId, rawResponse, headers) => { - if (!responseTopics.includes(responseTopic)) { + if (!queues.includes(responseTopic)) { await createQueue(responseTopic); - responseTopics.push(responseTopic); + queues.push(responseTopic); } let data = JSON.stringify( @@ -98,6 +98,7 @@ function RabbitMqProducer() { const messageProcessor = new JsInvokeMessageProcessor(new RabbitMqProducer()); while (!stopped) { + let pollStartTs = new Date().getTime(); let message = await new Promise((resolve, reject) => { channel.get(requestTopic, {}, function (err, msg) { if (err) { @@ -112,7 +113,10 @@ function RabbitMqProducer() { messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8'))); channel.ack(message); } else { - await sleep(poolInterval); + let pollDuration = new Date().getTime() - pollStartTs; + if (pollDuration < pollInterval) { + await sleep(pollInterval - pollDuration); + } } } } catch (e) { @@ -123,16 +127,18 @@ function RabbitMqProducer() { })(); function parseQueueProperties() { + let args = {}; const props = queueProperties.split(';'); props.forEach(p => { const delimiterPosition = p.indexOf(':'); - queueParams[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1); + args[p.substring(0, delimiterPosition)] = +p.substring(delimiterPosition + 1); }); + queueOptions['arguments'] = args; } -function createQueue(topic) { +async function createQueue(topic) { return new Promise((resolve, reject) => { - channel.assertQueue(topic, queueParams, function (err) { + channel.assertQueue(topic, queueOptions, function (err) { if (err) { reject(err); } else { diff --git a/msa/js-executor/queue/serviceBusTemplate.js b/msa/js-executor/queue/serviceBusTemplate.js index 20cf664940..99316930e3 100644 --- a/msa/js-executor/queue/serviceBusTemplate.js +++ b/msa/js-executor/queue/serviceBusTemplate.js @@ -140,6 +140,7 @@ function parseQueueProperties() { properties[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1); }); queueOptions = { + DuplicateDetection: 'false', MaxSizeInMegabytes: properties['maxSizeInMb'], DefaultMessageTimeToLive: `PT${properties['messageTimeToLiveInSec']}S`, LockDuration: `PT${properties['lockDurationInSec']}S`