Browse Source

Consumer group per isolated tenant's queue; remove sleeping for Kafka consumer

pull/8988/head
ViacheslavKlimov 3 years ago
parent
commit
55775f2815
  1. 8
      common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java
  2. 6
      common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java
  3. 2
      common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java
  4. 2
      common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java

8
common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java

@ -103,7 +103,9 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
consumerLock.unlock();
}
if (records.isEmpty()) { return sleepAndReturnEmpty(startNanos, durationInMillis); }
if (records.isEmpty() && !isLongPollingSupported()) {
return sleepAndReturnEmpty(startNanos, durationInMillis);
}
return decodeRecords(records);
}
@ -189,4 +191,8 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
abstract protected void doUnsubscribe();
protected boolean isLongPollingSupported() {
return false;
}
}

6
common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java

@ -122,4 +122,10 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
statsService.unregisterClientGroup(groupId);
}
}
@Override
public boolean isLongPollingSupported() {
return true;
}
}

2
common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java

@ -187,7 +187,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(configuration.getTopic());
consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
consumerBuilder.groupId("re-" + queueName + "-consumer");
consumerBuilder.groupId("re-" + queueName + (!configuration.getTenantId().isSysTenantId() ? "-" + configuration.getTenantId() : "") + "-consumer");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(ruleEngineAdmin);
consumerBuilder.statsService(consumerStatsService);

2
common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java

@ -166,7 +166,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(configuration.getTopic());
consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
consumerBuilder.groupId("re-" + queueName + "-consumer");
consumerBuilder.groupId("re-" + queueName + (!configuration.getTenantId().isSysTenantId() ? "-" + configuration.getTenantId() : "") + "-consumer");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(ruleEngineAdmin);
consumerBuilder.statsService(consumerStatsService);

Loading…
Cancel
Save