diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 2c09221148..163c325f52 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1233,7 +1233,7 @@ queue: pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRY_PAUSE:5}" # Time in seconds to wait in consumer thread before retries; max-pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_MAX_RETRY_PAUSE:5}" # Max allowed time in seconds for pause between retries. # Delay between Queue update/delete and actual topic deletion. The delay is for Rule Engines to have time to unsubscribe from the topics, and for other services to stop publishing - topic_deletion_delay: "${TB_QUEUE_RULE_ENGINE_TOPIC_DELETION_DELAY_SECS:60}" + topic_deletion_delay: "${TB_QUEUE_RULE_ENGINE_TOPIC_DELETION_DELAY_SECS:180}" transport: # For high priority notifications that require minimum latency and processing time notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}" diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java index 86146dabd1..a152e0c466 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java @@ -162,7 +162,9 @@ public abstract class AbstractTbQueueConsumerTemplate i @Override public void unsubscribe() { - log.info("unsubscribe topic and stop consumer {}", getTopic()); + log.info("Unsubscribing from topics and stopping consumer for topics {}", partitions.stream() + .map(TopicPartitionInfo::getFullTopicName) + .collect(Collectors.joining(", "))); stopped = true; consumerLock.lock(); try { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java index c17a563d46..00bb7aa541 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java @@ -114,7 +114,6 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue @Override protected void doUnsubscribe() { - log.info("unsubscribe topic and close consumer for topic {}", getTopic()); if (consumer != null) { consumer.unsubscribe(); consumer.close();