Browse Source

Log topics list on unsubscribe

pull/8988/head
ViacheslavKlimov 3 years ago
parent
commit
fa61783ac6
  1. 2
      application/src/main/resources/thingsboard.yml
  2. 4
      common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java
  3. 1
      common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java

2
application/src/main/resources/thingsboard.yml

@ -1233,7 +1233,7 @@ queue:
pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRY_PAUSE:5}" # Time in seconds to wait in consumer thread before retries;
max-pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_MAX_RETRY_PAUSE:5}" # Max allowed time in seconds for pause between retries.
# Delay between Queue update/delete and actual topic deletion. The delay is for Rule Engines to have time to unsubscribe from the topics, and for other services to stop publishing
topic_deletion_delay: "${TB_QUEUE_RULE_ENGINE_TOPIC_DELETION_DELAY_SECS:60}"
topic_deletion_delay: "${TB_QUEUE_RULE_ENGINE_TOPIC_DELETION_DELAY_SECS:180}"
transport:
# For high priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}"

4
common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java

@ -162,7 +162,9 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
@Override
public void unsubscribe() {
log.info("unsubscribe topic and stop consumer {}", getTopic());
log.info("Unsubscribing from topics and stopping consumer for topics {}", partitions.stream()
.map(TopicPartitionInfo::getFullTopicName)
.collect(Collectors.joining(", ")));
stopped = true;
consumerLock.lock();
try {

1
common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java

@ -114,7 +114,6 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
@Override
protected void doUnsubscribe() {
log.info("unsubscribe topic and close consumer for topic {}", getTopic());
if (consumer != null) {
consumer.unsubscribe();
consumer.close();

Loading…
Cancel
Save