diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java index 9e3d38cf32..72b8c2252a 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java @@ -17,7 +17,6 @@ package org.thingsboard.server.service.entitiy.queue; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.TenantProfile; @@ -29,7 +28,6 @@ import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfi import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.dao.queue.QueueService; import org.thingsboard.server.queue.TbQueueAdmin; -import org.thingsboard.server.queue.scheduler.SchedulerComponent; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.entitiy.AbstractTbEntityService; @@ -37,7 +35,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @Slf4j @@ -49,10 +46,6 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb private final QueueService queueService; private final TbClusterService tbClusterService; private final TbQueueAdmin tbQueueAdmin; - private final SchedulerComponent scheduler; - - @Value("${queue.rule-engine.topic_deletion_delay:60}") - private int topicDeletionDelay; @Override public Queue saveQueue(Queue queue) { @@ -121,14 +114,7 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb } else { log.info("Removed [{}] partitions from [{}] queue", oldPartitions - currentPartitions, queue.getName()); tbClusterService.onQueueChange(queue); - - scheduler.schedule(() -> { - for (int i = currentPartitions; i < oldPartitions; i++) { - String fullTopicName = new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(); - log.info("Removed partition [{}]", fullTopicName); - tbQueueAdmin.deleteTopic(fullTopicName); - } - }, topicDeletionDelay, TimeUnit.SECONDS); + // TODO: move all the messages left in old partitions and delete topics } } else if (!oldQueue.equals(queue)) { tbClusterService.onQueueChange(queue); @@ -137,21 +123,7 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb private void onQueueDeleted(Queue queue) { tbClusterService.onQueueDelete(queue); - // queueStatsService.deleteQueueStatsByQueueId(tenantId, queueId); - - scheduler.schedule(() -> { - for (int i = 0; i < queue.getPartitions(); i++) { - String fullTopicName = new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(); - log.info("Deleting queue [{}]", fullTopicName); - try { - tbQueueAdmin.deleteTopic(fullTopicName); - } catch (Exception e) { - log.error("Failed to delete queue [{}]", fullTopicName); - } - } - }, topicDeletionDelay, TimeUnit.SECONDS); - notificationEntityService.notifySendMsgToEdgeService(queue.getTenantId(), queue.getId(), EdgeEventActionType.DELETED); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 76631aaa95..5a2db8019e 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -591,11 +591,6 @@ public class DefaultTbClusterService implements TbClusterService { tbTransportServices.removeAll(tbCoreServices); tbCoreServices.removeAll(tbRuleEngineServices); - for (String ruleEngineServiceId : tbRuleEngineServices) { - TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, ruleEngineServiceId); - producerProvider.getRuleEngineNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), ruleEngineMsg), null); - toRuleEngineNfs.incrementAndGet(); - } for (String coreServiceId : tbCoreServices) { TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, coreServiceId); producerProvider.getTbCoreNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), coreMsg), null); @@ -606,5 +601,10 @@ public class DefaultTbClusterService implements TbClusterService { producerProvider.getTransportNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), transportMsg), null); toTransportNfs.incrementAndGet(); } + for (String ruleEngineServiceId : tbRuleEngineServices) { + TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, ruleEngineServiceId); + producerProvider.getRuleEngineNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), ruleEngineMsg), null); + toRuleEngineNfs.incrementAndGet(); + } } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index 598b41035f..ffcbc6ff60 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -23,11 +23,14 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.QueueId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.data.rpc.RpcError; import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.gen.MsgProtos; import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; import org.thingsboard.server.common.msg.queue.RuleEngineException; import org.thingsboard.server.common.msg.queue.RuleNodeInfo; @@ -42,12 +45,14 @@ import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; +import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; +import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory; import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.queue.util.TbRuleEngineComponent; @@ -107,13 +112,14 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< private final TbRuleEngineDeviceRpcService tbDeviceRpcService; private final TbServiceInfoProvider serviceInfoProvider; private final QueueService queueService; - // private final TenantId tenantId; + private final TbQueueProducerProvider producerProvider; + private final TbQueueAdmin queueAdmin; private final ConcurrentMap>> consumers = new ConcurrentHashMap<>(); private final ConcurrentMap consumerConfigurations = new ConcurrentHashMap<>(); private final ConcurrentMap consumerStats = new ConcurrentHashMap<>(); private final ConcurrentMap topicsConsumerPerPartition = new ConcurrentHashMap<>(); final ExecutorService submitExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-submit")); - final ScheduledExecutorService repartitionExecutor = Executors.newScheduledThreadPool(1, ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-repartition")); + final ScheduledExecutorService repartitionExecutor = Executors.newScheduledThreadPool(2, ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-repartition")); public DefaultTbRuleEngineConsumerService(TbRuleEngineProcessingStrategyFactory processingStrategyFactory, TbRuleEngineSubmitStrategyFactory submitStrategyFactory, @@ -128,7 +134,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< TbTenantProfileCache tenantProfileCache, TbApiUsageStateService apiUsageStateService, PartitionService partitionService, ApplicationEventPublisher eventPublisher, - TbServiceInfoProvider serviceInfoProvider, QueueService queueService) { + TbServiceInfoProvider serviceInfoProvider, QueueService queueService, + TbQueueProducerProvider producerProvider, TbQueueAdmin queueAdmin) { super(actorContext, encodingService, tenantProfileCache, deviceProfileCache, assetProfileCache, apiUsageStateService, partitionService, eventPublisher, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer(), Optional.empty()); this.statisticsService = statisticsService; this.tbRuleEngineQueueFactory = tbRuleEngineQueueFactory; @@ -138,6 +145,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< this.statsFactory = statsFactory; this.serviceInfoProvider = serviceInfoProvider; this.queueService = queueService; + this.producerProvider = producerProvider; + this.queueAdmin = queueAdmin; } @PostConstruct @@ -230,7 +239,6 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< launchConsumer(consumer, consumerConfigurations.get(queueKey), consumerStats.get(queueKey), "" + queueKey + "-" + tpi.getPartition().orElse(-999999)); consumer.subscribe(Collections.singleton(tpi)); }); - } finally { tbTopicWithConsumerPerPartition.getLock().unlock(); } @@ -264,7 +272,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< void consumerLoop(TbQueueConsumer> consumer, org.thingsboard.server.common.data.queue.Queue configuration, TbRuleEngineConsumerStats stats, String threadSuffix) { updateCurrentThreadName(threadSuffix); - while (!stopped && !consumer.isStopped()) { + while (!stopped && !consumer.isStopped() && !consumer.isDeleted()) { try { List> msgs = consumer.poll(configuration.getPollInterval()); if (msgs.isEmpty()) { @@ -314,6 +322,10 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< } } } + + if (consumer.isDeleted()) { + processQueueDeletion(configuration, consumer); + } log.info("TB Rule Engine Consumer stopped."); } @@ -448,22 +460,22 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB())); QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId); + partitionService.removeQueue(queueDeleteMsg); Queue queue = consumerConfigurations.remove(queueKey); if (queue != null) { if (queue.isConsumerPerPartition()) { TbTopicWithConsumerPerPartition tbTopicWithConsumerPerPartition = topicsConsumerPerPartition.remove(queueKey); if (tbTopicWithConsumerPerPartition != null) { - tbTopicWithConsumerPerPartition.getConsumers().values().forEach(TbQueueConsumer::unsubscribe); + tbTopicWithConsumerPerPartition.getConsumers().values().forEach(TbQueueConsumer::onQueueDelete); tbTopicWithConsumerPerPartition.getConsumers().clear(); } } else { TbQueueConsumer> consumer = consumers.remove(queueKey); if (consumer != null) { - consumer.unsubscribe(); + consumer.onQueueDelete(); } } } - partitionService.removeQueue(queueDeleteMsg); } private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) { @@ -482,6 +494,49 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< actorContext.tell(msg); } + private void processQueueDeletion(Queue queue, TbQueueConsumer> consumer) { + long startTs = System.currentTimeMillis(); + long timeout = TimeUnit.SECONDS.toMillis(30); + try { + int n = 0; + while ((System.currentTimeMillis() - startTs <= timeout)) { + List> msgs = consumer.poll(queue.getPollInterval()); + if (!msgs.isEmpty()) { + for (TbProtoQueueMsg msg : msgs) { + try { + MsgProtos.TbMsgProto tbMsgProto = MsgProtos.TbMsgProto.parseFrom(msg.getValue().getTbMsg().toByteArray()); + EntityId originator = EntityIdFactory.getByTypeAndUuid(tbMsgProto.getEntityType(), new UUID(tbMsgProto.getEntityIdMSB(), tbMsgProto.getEntityIdLSB())); + + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, queue.getName(), TenantId.SYS_TENANT_ID, originator); + producerProvider.getRuleEngineMsgProducer().send(tpi, msg, null); + n++; + } catch (Throwable e) { + log.debug("Failed to move message to system {}: {}", consumer.getTopic(), msg, e); + } + } + consumer.commit(); + } else { + break; + } + } + if (n > 0) { + log.info("Moved {} messages from {} to system {}", n, consumer.getFullTopicNames(), consumer.getTopic()); + } + + consumer.unsubscribe(); + for (String topic : consumer.getFullTopicNames()) { + try { + queueAdmin.deleteTopic(topic); + log.info("Deleted topic {}", topic); + } catch (Exception e) { + log.error("Failed to delete topic {} after unsubscribing", topic, e); + } + } + } catch (Exception e) { + log.error("Failed to process deletion of {} ({})", consumer.getTopic(), queue.getTenantId(), e); + } + } + @Scheduled(fixedDelayString = "${queue.rule-engine.stats.print-interval-ms}") public void printStats() { if (statsEnabled) { diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 96ee2793ed..faf1ebcbea 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1233,8 +1233,6 @@ queue: failure-percentage: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRY_PAUSE:5}" # Time in seconds to wait in consumer thread before retries; max-pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_MAX_RETRY_PAUSE:5}" # Max allowed time in seconds for pause between retries. - # Delay between Queue update/delete and actual topic deletion. The delay is for Rule Engines to have time to unsubscribe from the topics, and for other services to stop publishing - topic_deletion_delay: "${TB_QUEUE_RULE_ENGINE_TOPIC_DELETION_DELAY_SECS:180}" transport: # For high priority notifications that require minimum latency and processing time notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}" diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueConsumer.java b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueConsumer.java index 04439fc85d..73bea7642f 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueConsumer.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueConsumer.java @@ -36,4 +36,10 @@ public interface TbQueueConsumer { boolean isStopped(); + void onQueueDelete(); + + boolean isDeleted(); + + List getFullTopicNames(); + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java index 5b0e84c2ed..8e91c1d134 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java @@ -44,6 +44,7 @@ public abstract class AbstractTbQueueConsumerTemplate i protected volatile Set partitions; protected final ReentrantLock consumerLock = new ReentrantLock(); //NonfairSync final Queue> subscribeQueue = new ConcurrentLinkedQueue<>(); + protected volatile boolean deleted = false; @Getter private final String topic; @@ -94,7 +95,7 @@ public abstract class AbstractTbQueueConsumerTemplate i partitions = subscribeQueue.poll(); } if (!subscribed) { - List topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); + List topicNames = getFullTopicNames(); doSubscribe(topicNames); subscribed = true; } @@ -191,6 +192,21 @@ public abstract class AbstractTbQueueConsumerTemplate i abstract protected void doUnsubscribe(); + @Override + public void onQueueDelete() { + deleted = true; + } + + @Override + public boolean isDeleted() { + return deleted; + } + + @Override + public List getFullTopicNames() { + return partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); + } + protected boolean isLongPollingSupported() { return false; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java index d2a54128e3..55f1721c46 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java @@ -159,6 +159,7 @@ public class TbKafkaSettings { props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java index 081202315e..a642d7e9cf 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java @@ -103,4 +103,18 @@ public class InMemoryTbQueueConsumer implements TbQueueCon return stopped; } + @Override + public void onQueueDelete() { + } + + @Override + public boolean isDeleted() { + return false; + } + + @Override + public List getFullTopicNames() { + return partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); + } + }