Browse Source

EDQS consumer management refactoring

pull/13023/head
ViacheslavKlimov 1 year ago
parent
commit
c50d7a3988
  1. 5
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java
  2. 3
      application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java
  3. 3
      application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java
  4. 4
      application/src/main/resources/thingsboard.yml
  5. 3
      application/src/test/java/org/thingsboard/server/controller/EntityQueryControllerTest.java
  6. 15
      common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java
  7. 30
      common/edqs/src/main/java/org/thingsboard/server/edqs/repo/TenantRepo.java
  8. 48
      common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java
  9. 5
      common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java
  10. 4
      common/message/src/main/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfo.java
  11. 14
      common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java
  12. 9
      common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java
  13. 5
      common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerManagerTask.java
  14. 16
      common/queue/src/main/java/org/thingsboard/server/queue/common/state/KafkaQueueStateService.java
  15. 4
      common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsConfig.java
  16. 36
      common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueue.java
  17. 8
      common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueueFactory.java
  18. 23
      common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsQueueFactory.java
  19. 67
      common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java
  20. 12
      common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java
  21. 18
      common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java
  22. 3
      common/queue/src/main/java/org/thingsboard/server/queue/provider/EdqsClientQueueFactory.java
  23. 5
      common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java
  24. 7
      common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java
  25. 7
      common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java
  26. 14
      common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java
  27. 4
      edqs/src/main/resources/edqs.yml

5
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java

@ -97,7 +97,10 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
.scheduler(eventConsumer.getScheduler())
.taskExecutor(eventConsumer.getTaskExecutor())
.build();
super.stateService = new KafkaQueueStateService<>(eventConsumer, stateConsumer);
super.stateService = KafkaQueueStateService.<TbProtoQueueMsg<ToCalculatedFieldMsg>, TbProtoQueueMsg<CalculatedFieldStateProto>>builder()
.eventConsumer(eventConsumer)
.stateConsumer(stateConsumer)
.build();
this.stateProducer = (TbKafkaProducerTemplate<TbProtoQueueMsg<CalculatedFieldStateProto>>) queueFactory.createCalculatedFieldStateProducer();
}

3
application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java

@ -59,7 +59,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.discovery.HashPartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.edqs.EdqsQueue;
import org.thingsboard.server.queue.environment.DistributedLock;
import org.thingsboard.server.queue.environment.DistributedLockService;
import org.thingsboard.server.queue.provider.EdqsClientQueueFactory;
@ -96,7 +95,7 @@ public class DefaultEdqsService implements EdqsService {
private void init() {
executor = ThingsBoardExecutors.newWorkStealingPool(12, getClass());
eventsProducer = EdqsProducer.builder()
.producer(queueFactory.createEdqsMsgProducer(EdqsQueue.EVENTS))
.producer(queueFactory.createEdqsEventsProducer())
.partitionService(edqsPartitionService)
.build();
syncLock = distributedLockService.getLock("edqs_sync");

3
application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java

@ -19,7 +19,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.edqs.EdqsQueue;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
@ -37,7 +36,7 @@ public class KafkaEdqsSyncService extends EdqsSyncService {
TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, Collections.emptyMap());
this.syncNeeded = kafkaAdmin.areAllTopicsEmpty(IntStream.range(0, edqsConfig.getPartitions())
.mapToObj(partition -> TopicPartitionInfo.builder()
.topic(EdqsQueue.EVENTS.getTopic())
.topic(edqsConfig.getEventsTopic())
.partition(partition)
.build().getFullTopicName())
.collect(Collectors.toSet()));

4
application/src/main/resources/thingsboard.yml

@ -1748,6 +1748,10 @@ queue:
partitions: "${TB_EDQS_PARTITIONS:12}"
# EDQS partitioning strategy: tenant (partition is resolved by tenant id) or none (no specific strategy, resolving by message key)
partitioning_strategy: "${TB_EDQS_PARTITIONING_STRATEGY:tenant}"
# EDQS events topic
events_topic: "${TB_EDQS_EVENTS_TOPIC:edqs.events}"
# EDQS state topic
state_topic: "${TB_EDQS_STATE_TOPIC:edqs.state}"
# EDQS requests topic
requests_topic: "${TB_EDQS_REQUESTS_TOPIC:edqs.requests}"
# EDQS responses topic

3
application/src/test/java/org/thingsboard/server/controller/EntityQueryControllerTest.java

@ -17,8 +17,6 @@ package org.thingsboard.server.controller;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import org.awaitility.Awaitility;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -73,7 +71,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;

15
common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java

@ -62,7 +62,6 @@ import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.edqs.EdqsComponent;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.edqs.EdqsConfig.EdqsPartitioningStrategy;
import org.thingsboard.server.queue.edqs.EdqsQueue;
import org.thingsboard.server.queue.edqs.EdqsQueueFactory;
import org.thingsboard.server.queue.util.AfterStartUp;
@ -123,8 +122,8 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
};
eventConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToEdqsMsg>>create()
.queueKey(new QueueKey(ServiceType.EDQS, EdqsQueue.EVENTS.getTopic()))
.topic(EdqsQueue.EVENTS.getTopic())
.queueKey(new QueueKey(ServiceType.EDQS, config.getEventsTopic()))
.topic(config.getEventsTopic())
.pollInterval(config.getPollInterval())
.msgPackProcessor((msgs, consumer, config) -> {
for (TbProtoQueueMsg<ToEdqsMsg> queueMsg : msgs) {
@ -133,14 +132,14 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
}
try {
ToEdqsMsg msg = queueMsg.getValue();
process(msg, EdqsQueue.EVENTS);
process(msg, true);
} catch (Exception t) {
log.error("Failed to process message: {}", queueMsg, t);
}
}
consumer.commit();
})
.consumerCreator((config, partitionId) -> queueFactory.createEdqsMsgConsumer(EdqsQueue.EVENTS))
.consumerCreator((config, partitionId) -> queueFactory.createEdqsEventsConsumer())
.queueAdmin(queueFactory.getEdqsQueueAdmin())
.consumerExecutor(consumersExecutor)
.taskExecutor(taskExecutor)
@ -165,7 +164,7 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
try {
Set<TopicPartitionInfo> newPartitions = event.getNewPartitions().get(new QueueKey(ServiceType.EDQS));
stateService.process(withTopic(newPartitions, EdqsQueue.STATE.getTopic()));
stateService.process(withTopic(newPartitions, config.getStateTopic()));
// eventsConsumer's partitions are updated by stateService
responseTemplate.subscribe(withTopic(newPartitions, config.getRequestsTopic())); // TODO: we subscribe to partitions before we are ready. implement consumer-per-partition version for request template
@ -235,7 +234,7 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
return response;
}
public void process(ToEdqsMsg edqsMsg, EdqsQueue queue) {
public void process(ToEdqsMsg edqsMsg, boolean backup) {
log.trace("Processing message: {}", edqsMsg);
if (edqsMsg.hasEventMsg()) {
EdqsEventMsg eventMsg = edqsMsg.getEventMsg();
@ -252,7 +251,7 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
} else if (!ObjectType.unversionedTypes.contains(objectType)) {
log.warn("[{}] {} {} doesn't have version", tenantId, objectType, key);
}
if (queue != EdqsQueue.STATE) {
if (backup) {
stateService.save(tenantId, objectType, key, eventType, edqsMsg);
}

30
common/edqs/src/main/java/org/thingsboard/server/edqs/repo/TenantRepo.java

@ -150,8 +150,9 @@ public class TenantRepo {
}
} else if (RelationTypeGroup.DASHBOARD.equals(entity.getTypeGroup())) {
if (EntityRelation.CONTAINS_TYPE.equals(entity.getType()) && entity.getFrom().getEntityType() == EntityType.CUSTOMER) {
((CustomerData) getEntityMap(EntityType.CUSTOMER).computeIfAbsent(entity.getFrom().getId(), CustomerData::new))
.addOrUpdate(getEntityMap(EntityType.DASHBOARD).get(entity.getTo().getId()));
CustomerData customerData = (CustomerData) getOrCreate(entity.getFrom());
EntityData<?> dashboardData = getOrCreate(entity.getTo());
customerData.addOrUpdate(dashboardData);
}
}
} finally {
@ -170,8 +171,13 @@ public class TenantRepo {
}
} else if (RelationTypeGroup.DASHBOARD.equals(entityRelation.getTypeGroup())) {
if (EntityRelation.CONTAINS_TYPE.equals(entityRelation.getType()) && entityRelation.getFrom().getEntityType() == EntityType.CUSTOMER) {
((CustomerData) getEntityMap(EntityType.CUSTOMER).computeIfAbsent(entityRelation.getFrom().getId(), CustomerData::new))
.remove(getEntityMap(EntityType.DASHBOARD).get(entityRelation.getTo().getId()));
CustomerData customerData = (CustomerData) get(entityRelation.getFrom());
if (customerData != null) {
EntityData<?> dashboardData = get(entityRelation.getTo());
if (dashboardData != null) {
customerData.remove(dashboardData);
}
}
}
}
}
@ -197,13 +203,13 @@ public class TenantRepo {
entityData.setCustomerId(newCustomerId);
if (entityIdMismatch(oldCustomerId, newCustomerId)) {
if (oldCustomerId != null) {
CustomerData old = (CustomerData) getEntityMap(EntityType.CUSTOMER).get(oldCustomerId);
CustomerData old = (CustomerData) get(EntityType.CUSTOMER, oldCustomerId);
if (old != null) {
old.remove(entityData);
}
}
if (newCustomerId != null) {
CustomerData newData = (CustomerData) getEntityMap(EntityType.CUSTOMER).computeIfAbsent(newCustomerId, CustomerData::new);
CustomerData newData = (CustomerData) getOrCreate(EntityType.CUSTOMER, newCustomerId);
newData.addOrUpdate(entityData);
}
}
@ -217,7 +223,7 @@ public class TenantRepo {
try {
UUID entityId = entity.getFields().getId();
EntityType entityType = entity.getType();
EntityData<?> removed = getEntityMap(entityType).remove(entityId);
EntityData<?> removed = get(entityType, entityId);
if (removed != null) {
if (removed.getFields() != null) {
getEntitySet(entityType).remove(removed);
@ -225,7 +231,7 @@ public class TenantRepo {
edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.fromEntityType(entityType), EdqsEventType.DELETED));
UUID customerId = removed.getCustomerId();
if (customerId != null) {
CustomerData customerData = (CustomerData) getEntityMap(EntityType.CUSTOMER).get(customerId);
CustomerData customerData = (CustomerData) get(EntityType.CUSTOMER, customerId);
if (customerData != null) {
customerData.remove(removed);
}
@ -303,7 +309,11 @@ public class TenantRepo {
}
private EntityData<?> get(EntityId entityId) {
return getEntityMap(entityId.getEntityType()).get(entityId.getId());
return get(entityId.getEntityType(), entityId.getId());
}
private EntityData<?> get(EntityType entityType, UUID entityId) {
return getEntityMap(entityType).get(entityId);
}
private EntityData<?> constructEntityData(EntityType entityType, UUID id) {
@ -425,7 +435,7 @@ public class TenantRepo {
EntityType entityType = entityId.getEntityType();
return switch (entityType) {
case CUSTOMER, TENANT -> {
EntityFields fields = getEntityMap(entityType).get(entityId.getId()).getFields();
EntityFields fields = get(entityId).getFields();
yield fields != null ? fields.getName() : "";
}
default -> throw new RuntimeException("Unsupported entity type: " + entityType);

48
common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java

@ -37,10 +37,13 @@ import org.thingsboard.server.queue.common.state.KafkaQueueStateService;
import org.thingsboard.server.queue.common.state.QueueStateService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.edqs.EdqsQueue;
import org.thingsboard.server.queue.edqs.EdqsQueueFactory;
import org.thingsboard.server.queue.edqs.KafkaEdqsComponent;
import org.thingsboard.server.queue.edqs.KafkaEdqsQueueFactory;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
@ -55,7 +58,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
private final EdqsConfig config;
private final EdqsPartitionService partitionService;
private final EdqsQueueFactory queueFactory;
private final KafkaEdqsQueueFactory queueFactory;
@Autowired @Lazy
private EdqsProcessor edqsProcessor;
@ -71,15 +74,16 @@ public class KafkaEdqsStateService implements EdqsStateService {
@Override
public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer) {
TbKafkaAdmin queueAdmin = queueFactory.getEdqsQueueAdmin();
stateConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToEdqsMsg>>create()
.queueKey(new QueueKey(ServiceType.EDQS, EdqsQueue.STATE.getTopic()))
.topic(EdqsQueue.STATE.getTopic())
.queueKey(new QueueKey(ServiceType.EDQS, config.getStateTopic()))
.topic(config.getStateTopic())
.pollInterval(config.getPollInterval())
.msgPackProcessor((msgs, consumer, config) -> {
for (TbProtoQueueMsg<ToEdqsMsg> queueMsg : msgs) {
try {
ToEdqsMsg msg = queueMsg.getValue();
edqsProcessor.process(msg, EdqsQueue.STATE);
edqsProcessor.process(msg, false);
if (stateReadCount.incrementAndGet() % 100000 == 0) {
log.info("[state] Processed {} msgs", stateReadCount.get());
}
@ -89,15 +93,15 @@ public class KafkaEdqsStateService implements EdqsStateService {
}
consumer.commit();
})
.consumerCreator((config, partitionId) -> queueFactory.createEdqsMsgConsumer(EdqsQueue.STATE, null)) // not using consumer group management
.queueAdmin(queueFactory.getEdqsQueueAdmin())
.consumerCreator((config, partitionId) -> queueFactory.createEdqsStateConsumer())
.queueAdmin(queueAdmin)
.consumerExecutor(eventConsumer.getConsumerExecutor())
.taskExecutor(eventConsumer.getTaskExecutor())
.scheduler(eventConsumer.getScheduler())
.uncaughtErrorHandler(edqsProcessor.getErrorHandler())
.build();
queueStateService = new KafkaQueueStateService<>(eventConsumer, stateConsumer);
TbKafkaConsumerTemplate<TbProtoQueueMsg<ToEdqsMsg>> eventsToBackupKafkaConsumer = queueFactory.createEdqsEventsToBackupConsumer();
eventsToBackupConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToEdqsMsg>>builder()
.name("edqs-events-to-backup-consumer")
.pollInterval(config.getPollInterval())
@ -135,15 +139,35 @@ public class KafkaEdqsStateService implements EdqsStateService {
}
consumer.commit();
})
.consumerCreator(() -> queueFactory.createEdqsMsgConsumer(EdqsQueue.EVENTS, "events-to-backup-consumer-group")) // shared by all instances consumer group
.consumerCreator(() -> eventsToBackupKafkaConsumer)
.consumerExecutor(eventConsumer.getConsumerExecutor())
.threadPrefix("edqs-events-to-backup")
.build();
stateProducer = EdqsProducer.builder()
.producer(queueFactory.createEdqsMsgProducer(EdqsQueue.STATE))
.producer(queueFactory.createEdqsStateProducer())
.partitionService(partitionService)
.build();
queueStateService = KafkaQueueStateService.<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<ToEdqsMsg>>builder()
.eventConsumer(eventConsumer)
.stateConsumer(stateConsumer)
.eventsStartOffsetsProvider(() -> {
// taking start offsets for events topics from the events-to-backup consumer group,
// since eventConsumer doesn't use consumer group management and thus offset tracking
// (because we need to be able to consume the same topic-partition by multiple instances)
Map<String, Long> offsets = new HashMap<>();
try {
queueAdmin.getConsumerGroupOffsets(eventsToBackupKafkaConsumer.getGroupId())
.forEach((topicPartition, offsetAndMetadata) -> {
offsets.put(topicPartition.topic(), offsetAndMetadata.offset());
});
} catch (Exception e) {
log.error("Failed to get consumer group offsets for {}", eventsToBackupKafkaConsumer.getGroupId(), e);
}
return offsets;
})
.build();
}
@Override
@ -151,7 +175,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
if (queueStateService.getPartitions().isEmpty()) {
Set<TopicPartitionInfo> allPartitions = IntStream.range(0, config.getPartitions())
.mapToObj(partition -> TopicPartitionInfo.builder()
.topic(EdqsQueue.EVENTS.getTopic())
.topic(config.getEventsTopic())
.partition(partition)
.build())
.collect(Collectors.toSet());

5
common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java

@ -29,7 +29,6 @@ import org.thingsboard.server.edqs.util.EdqsRocksDb;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.edqs.EdqsQueue;
import org.thingsboard.server.queue.edqs.InMemoryEdqsComponent;
import java.util.Set;
@ -61,14 +60,14 @@ public class LocalEdqsStateService implements EdqsStateService {
try {
ToEdqsMsg edqsMsg = ToEdqsMsg.parseFrom(value);
log.trace("[{}] Restored msg from RocksDB: {}", key, edqsMsg);
processor.process(edqsMsg, EdqsQueue.STATE);
processor.process(edqsMsg, false);
} catch (Exception e) {
log.error("[{}] Failed to restore value", key, e);
}
});
log.info("Restore completed");
}
eventConsumer.update(withTopic(partitions, EdqsQueue.EVENTS.getTopic()));
eventConsumer.update(withTopic(partitions, eventConsumer.getTopic()));
this.partitions = partitions;
}

4
common/message/src/main/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfo.java

@ -90,9 +90,7 @@ public class TopicPartitionInfo {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TopicPartitionInfo that = (TopicPartitionInfo) o;
return topic.equals(that.topic) &&
Objects.equals(tenantId, that.tenantId) &&
Objects.equals(partition, that.partition) &&
return Objects.equals(partition, that.partition) &&
fullTopicName.equals(that.fullTopicName);
}

14
common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java

@ -26,6 +26,7 @@ import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.UpdateConfigTask;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.UpdatePartitionsTask;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
import java.util.Collection;
import java.util.Collections;
@ -43,6 +44,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
@Slf4j
public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfig> {
@ -296,7 +298,7 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
log.info("[{}] Added partitions: {}, removed partitions: {}", queueKey, addedPartitions, removedPartitions);
removePartitions(removedPartitions);
addPartitions(addedPartitions, null);
addPartitions(addedPartitions, null, null);
}
protected void removePartitions(Set<TopicPartitionInfo> removedPartitions) {
@ -304,13 +306,19 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
removedPartitions.forEach((tpi) -> Optional.ofNullable(consumers.remove(tpi)).ifPresent(TbQueueConsumerTask::awaitCompletion));
}
protected void addPartitions(Set<TopicPartitionInfo> partitions, Consumer<TopicPartitionInfo> onStop) {
protected void addPartitions(Set<TopicPartitionInfo> partitions, Consumer<TopicPartitionInfo> onStop, Function<String, Long> startOffsetProvider) {
partitions.forEach(tpi -> {
Integer partitionId = tpi.getPartition().orElse(-1);
String key = queueKey + "-" + partitionId;
Runnable callback = onStop != null ? () -> onStop.accept(tpi) : null;
TbQueueConsumerTask<M> consumer = new TbQueueConsumerTask<>(key, () -> consumerCreator.apply(config, partitionId), callback);
TbQueueConsumerTask<M> consumer = new TbQueueConsumerTask<>(key, () -> {
TbQueueConsumer<M> queueConsumer = consumerCreator.apply(config, partitionId);
if (startOffsetProvider != null && queueConsumer instanceof TbKafkaConsumerTemplate<M> kafkaConsumer) {
kafkaConsumer.setStartOffsetProvider(startOffsetProvider);
}
return queueConsumer;
}, callback);
consumers.put(tpi, consumer);
consumer.subscribe(Set.of(tpi));
launchConsumer(consumer);

9
common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java

@ -33,6 +33,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
@Slf4j
public class PartitionedQueueConsumerManager<M extends TbQueueMsg> extends MainQueueConsumerManager<M, QueueConfig> {
@ -57,7 +58,7 @@ public class PartitionedQueueConsumerManager<M extends TbQueueMsg> extends MainQ
protected void processTask(TbQueueConsumerManagerTask task) {
if (task instanceof AddPartitionsTask addPartitionsTask) {
log.info("[{}] Added partitions: {}", queueKey, addPartitionsTask.partitions());
consumerWrapper.addPartitions(addPartitionsTask.partitions(), addPartitionsTask.onStop());
consumerWrapper.addPartitions(addPartitionsTask.partitions(), addPartitionsTask.onStop(), addPartitionsTask.startOffsetProvider());
} else if (task instanceof RemovePartitionsTask removePartitionsTask) {
log.info("[{}] Removed partitions: {}", queueKey, removePartitionsTask.partitions());
consumerWrapper.removePartitions(removePartitionsTask.partitions());
@ -76,11 +77,11 @@ public class PartitionedQueueConsumerManager<M extends TbQueueMsg> extends MainQ
}
public void addPartitions(Set<TopicPartitionInfo> partitions) {
addPartitions(partitions, null);
addPartitions(partitions, null, null);
}
public void addPartitions(Set<TopicPartitionInfo> partitions, Consumer<TopicPartitionInfo> onStop) {
addTask(new AddPartitionsTask(partitions, onStop));
public void addPartitions(Set<TopicPartitionInfo> partitions, Consumer<TopicPartitionInfo> onStop, Function<String, Long> startOffsetProvider) {
addTask(new AddPartitionsTask(partitions, onStop, startOffsetProvider));
}
public void removePartitions(Set<TopicPartitionInfo> partitions) {

5
common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerManagerTask.java

@ -20,6 +20,7 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
public interface TbQueueConsumerManagerTask {
@ -46,7 +47,9 @@ public interface TbQueueConsumerManagerTask {
}
}
record AddPartitionsTask(Set<TopicPartitionInfo> partitions, Consumer<TopicPartitionInfo> onStop) implements TbQueueConsumerManagerTask {
record AddPartitionsTask(Set<TopicPartitionInfo> partitions,
Consumer<TopicPartitionInfo> onStop,
Function<String, Long> startOffsetProvider) implements TbQueueConsumerManagerTask {
@Override
public QueueTaskType getType() {
return QueueTaskType.ADD_PARTITIONS;

16
common/queue/src/main/java/org/thingsboard/server/queue/common/state/KafkaQueueStateService.java

@ -15,13 +15,16 @@
*/
package org.thingsboard.server.queue.common.state;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.discovery.QueueKey;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTopic;
@ -29,14 +32,21 @@ import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTop
public class KafkaQueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> extends QueueStateService<E, S> {
private final PartitionedQueueConsumerManager<S> stateConsumer;
private final Supplier<Map<String, Long>> eventsStartOffsetsProvider;
public KafkaQueueStateService(PartitionedQueueConsumerManager<E> eventConsumer, PartitionedQueueConsumerManager<S> stateConsumer) {
@Builder
public KafkaQueueStateService(PartitionedQueueConsumerManager<E> eventConsumer,
PartitionedQueueConsumerManager<S> stateConsumer,
Supplier<Map<String, Long>> eventsStartOffsetsProvider) {
super(eventConsumer);
this.stateConsumer = stateConsumer;
this.eventsStartOffsetsProvider = eventsStartOffsetsProvider;
}
@Override
protected void addPartitions(QueueKey queueKey, Set<TopicPartitionInfo> partitions) {
Map<String, Long> eventsStartOffsets = eventsStartOffsetsProvider != null ? eventsStartOffsetsProvider.get() : null; // remembering the offsets before subscribing to states
Set<TopicPartitionInfo> statePartitions = withTopic(partitions, stateConsumer.getTopic());
partitionsInProgress.addAll(statePartitions);
stateConsumer.addPartitions(statePartitions, statePartition -> {
@ -51,12 +61,12 @@ public class KafkaQueueStateService<E extends TbQueueMsg, S extends TbQueueMsg>
TopicPartitionInfo eventPartition = statePartition.withTopic(eventConsumer.getTopic());
if (this.partitions.get(queueKey).contains(eventPartition)) {
eventConsumer.addPartitions(Set.of(eventPartition));
eventConsumer.addPartitions(Set.of(eventPartition), null, eventsStartOffsets != null ? eventsStartOffsets::get : null);
}
} finally {
readLock.unlock();
}
});
}, null);
}
@Override

4
common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsConfig.java

@ -30,6 +30,10 @@ public class EdqsConfig {
@Value("#{'${queue.edqs.partitioning_strategy:tenant}'.toUpperCase()}")
private EdqsPartitioningStrategy partitioningStrategy;
@Value("${queue.edqs.events_topic:edqs.events}")
private String eventsTopic;
@Value("${queue.edqs.state_topic:edqs.state}")
private String stateTopic;
@Value("${queue.edqs.requests_topic:edqs.requests}")
private String requestsTopic;
@Value("${queue.edqs.responses_topic:edqs.responses}")

36
common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueue.java

@ -1,36 +0,0 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.edqs;
import lombok.Getter;
@Getter
public enum EdqsQueue {
EVENTS("edqs.events", false, false),
STATE("edqs.state", true, true);
private final String topic;
private final boolean readFromBeginning;
private final boolean stopWhenRead;
EdqsQueue(String topic, boolean readFromBeginning, boolean stopWhenRead) {
this.topic = topic;
this.readFromBeginning = readFromBeginning;
this.stopWhenRead = stopWhenRead;
}
}

8
common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueueFactory.java

@ -25,11 +25,13 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
public interface EdqsQueueFactory {
TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgConsumer(EdqsQueue queue);
TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsEventsConsumer();
TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgConsumer(EdqsQueue queue, String group);
TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsEventsToBackupConsumer();
TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgProducer(EdqsQueue queue);
TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsStateConsumer();
TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsStateProducer();
TbQueueResponseTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> createEdqsResponseTemplate();

23
common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsQueueFactory.java

@ -43,24 +43,23 @@ public class InMemoryEdqsQueueFactory implements EdqsQueueFactory {
private final TbQueueAdmin queueAdmin;
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgConsumer(EdqsQueue queue) {
if (queue == EdqsQueue.STATE) {
throw new UnsupportedOperationException();
}
return new InMemoryTbQueueConsumer<>(storage, queue.getTopic());
public TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsEventsConsumer() {
return new InMemoryTbQueueConsumer<>(storage, edqsConfig.getEventsTopic());
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgConsumer(EdqsQueue queue, String group) {
return createEdqsMsgConsumer(queue);
public TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsEventsToBackupConsumer() {
throw new UnsupportedOperationException();
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgProducer(EdqsQueue queue) {
if (queue == EdqsQueue.STATE) {
throw new UnsupportedOperationException();
}
return new InMemoryTbQueueProducer<>(storage, queue.getTopic());
public TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsStateConsumer() {
throw new UnsupportedOperationException();
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsStateProducer() {
throw new UnsupportedOperationException();
}
@Override

67
common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java

@ -19,11 +19,8 @@ import org.springframework.stereotype.Component;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.stats.StatsType;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueResponseTemplate;
import org.thingsboard.server.queue.common.DefaultTbQueueResponseTemplate;
@ -71,54 +68,68 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory {
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgConsumer(EdqsQueue queue) {
String consumerGroup = "edqs-" + queue.name().toLowerCase() + "-consumer-group-" + serviceInfoProvider.getServiceId();
return createEdqsMsgConsumer(queue, consumerGroup);
public TbKafkaConsumerTemplate<TbProtoQueueMsg<ToEdqsMsg>> createEdqsEventsConsumer() {
return createEdqsMsgConsumer(edqsConfig.getEventsTopic(),
"edqs-events-" + consumerCounter.getAndIncrement() + "-consumer-" + serviceInfoProvider.getServiceId(),
null, // not using consumer group management, offsets from the edqs-events-to-backup-consumer-group are used (see KafkaEdqsStateService)
false, edqsEventsAdmin);
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgConsumer(EdqsQueue queue, String group) {
public TbKafkaConsumerTemplate<TbProtoQueueMsg<ToEdqsMsg>> createEdqsEventsToBackupConsumer() {
return createEdqsMsgConsumer(edqsConfig.getEventsTopic(),
"edqs-events-to-backup-consumer-" + serviceInfoProvider.getServiceId(),
"edqs-events-to-backup-consumer-group",
false, edqsEventsAdmin);
}
@Override
public TbKafkaConsumerTemplate<TbProtoQueueMsg<ToEdqsMsg>> createEdqsStateConsumer() {
return createEdqsMsgConsumer(edqsConfig.getStateTopic(),
"edqs-state-" + consumerCounter.getAndIncrement() + "-consumer-" + serviceInfoProvider.getServiceId(),
null, // not using consumer group management
true, edqsStateAdmin);
}
public TbKafkaConsumerTemplate<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgConsumer(String topic, String clientId, String group, boolean readFullAndStop, TbKafkaAdmin admin) {
return TbKafkaConsumerTemplate.<TbProtoQueueMsg<ToEdqsMsg>>builder()
.settings(kafkaSettings)
.topic(topicService.buildTopicName(queue.getTopic()))
.readFromBeginning(queue.isReadFromBeginning())
.stopWhenRead(queue.isStopWhenRead())
.clientId("edqs-" + queue.name().toLowerCase() + "-" + consumerCounter.getAndIncrement() + "-consumer-" + serviceInfoProvider.getServiceId())
.topic(topicService.buildTopicName(topic))
.readFromBeginning(readFullAndStop)
.stopWhenRead(readFullAndStop)
.clientId(clientId)
.groupId(topicService.buildTopicName(group))
.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdqsMsg.parseFrom(msg.getData()), msg.getHeaders()))
.admin(queue == EdqsQueue.STATE ? edqsStateAdmin : edqsEventsAdmin)
.admin(admin)
.statsService(consumerStatsService)
.build();
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgProducer(EdqsQueue queue) {
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsStateProducer() {
return TbKafkaProducerTemplate.<TbProtoQueueMsg<ToEdqsMsg>>builder()
.clientId("edqs-" + queue.name().toLowerCase() + "-producer-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(queue.getTopic()))
.clientId("edqs-state-producer-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(edqsConfig.getStateTopic()))
.settings(kafkaSettings)
.admin(queue == EdqsQueue.STATE ? edqsStateAdmin : edqsEventsAdmin)
.admin(edqsStateAdmin)
.build();
}
@Override
public TbQueueResponseTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> createEdqsResponseTemplate() {
var requestConsumer = TbKafkaConsumerTemplate.<TbProtoQueueMsg<TransportProtos.ToEdqsMsg>>builder()
.settings(kafkaSettings)
.topic(topicService.buildTopicName(edqsConfig.getRequestsTopic()))
.clientId("edqs-requests-consumer-" + serviceInfoProvider.getServiceId())
.groupId(topicService.buildTopicName("edqs-requests-consumer-group"))
.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToEdqsMsg.parseFrom(msg.getData()), msg.getHeaders()))
.admin(edqsRequestsAdmin)
.statsService(consumerStatsService);
var requestConsumer = createEdqsMsgConsumer(edqsConfig.getRequestsTopic(),
"edqs-requests-consumer-" + serviceInfoProvider.getServiceId(),
"edqs-requests-consumer-group",
false, edqsRequestsAdmin);
var responseProducer = TbKafkaProducerTemplate.<TbProtoQueueMsg<FromEdqsMsg>>builder()
.settings(kafkaSettings)
.clientId("edqs-response-producer-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(edqsConfig.getResponsesTopic()))
.admin(edqsRequestsAdmin);
.admin(edqsRequestsAdmin)
.build();
return DefaultTbQueueResponseTemplate.<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>>builder()
.requestTemplate(requestConsumer.build())
.responseTemplate(responseProducer.build())
.requestTemplate(requestConsumer)
.responseTemplate(responseProducer)
.maxPendingRequests(edqsConfig.getMaxPendingRequests())
.requestTimeout(edqsConfig.getMaxRequestTimeout())
.pollInterval(edqsConfig.getPollInterval())
@ -128,7 +139,7 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory {
}
@Override
public TbQueueAdmin getEdqsQueueAdmin() {
public TbKafkaAdmin getEdqsQueueAdmin() {
return edqsEventsAdmin;
}

12
common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java

@ -16,6 +16,7 @@
package org.thingsboard.server.queue.kafka;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
@ -159,8 +160,7 @@ public class TbKafkaAdmin implements TbQueueAdmin {
if (partitionId == null) {
return;
}
Map<TopicPartition, OffsetAndMetadata> oldOffsets =
settings.getAdminClient().listConsumerGroupOffsets(fatGroupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
Map<TopicPartition, OffsetAndMetadata> oldOffsets = getConsumerGroupOffsets(fatGroupId);
if (oldOffsets.isEmpty()) {
return;
}
@ -171,8 +171,7 @@ public class TbKafkaAdmin implements TbQueueAdmin {
continue;
}
var om = consumerOffset.getValue();
Map<TopicPartition, OffsetAndMetadata> newOffsets =
settings.getAdminClient().listConsumerGroupOffsets(newGroupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
Map<TopicPartition, OffsetAndMetadata> newOffsets = getConsumerGroupOffsets(newGroupId);
var existingOffset = newOffsets.get(tp);
if (existingOffset == null) {
@ -189,6 +188,11 @@ public class TbKafkaAdmin implements TbQueueAdmin {
}
}
@SneakyThrows
public Map<TopicPartition, OffsetAndMetadata> getConsumerGroupOffsets(String groupId) {
return settings.getAdminClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
}
public boolean isTopicEmpty(String topic) {
return areAllTopicsEmpty(Set.of(topic));
}

18
common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java

@ -16,6 +16,8 @@
package org.thingsboard.server.queue.kafka;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@ -39,6 +41,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -53,8 +56,11 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
private final TbKafkaDecoder<T> decoder;
private final TbKafkaConsumerStatsService statsService;
@Getter
private final String groupId;
@Setter
private Function<String, Long> startOffsetProvider;
private final boolean readFromBeginning; // reset offset to beginning
private final boolean stopWhenRead; // stop consuming when reached end offset remembered on start
private int readCount;
@ -185,9 +191,21 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
private void onPartitionsAssigned(Collection<TopicPartition> partitions) {
if (readFromBeginning) {
log.debug("Seeking to beginning for {}", partitions);
consumer.seekToBeginning(partitions);
} else if (startOffsetProvider != null) {
partitions.forEach(topicPartition -> {
Long offset = startOffsetProvider.apply(topicPartition.topic());
if (offset != null) {
log.debug("Seeking to offset {} for {}", offset, topicPartition);
consumer.seek(topicPartition, offset);
} else {
log.info("No start offset provided for {}", topicPartition);
}
});
}
if (stopWhenRead) {
log.debug("Getting end offsets for {}", partitions);
endOffsets = consumer.endOffsets(partitions).entrySet().stream()
.filter(entry -> entry.getValue() > 0)
.collect(Collectors.toMap(entry -> entry.getKey().partition(), Map.Entry::getValue));

3
common/queue/src/main/java/org/thingsboard/server/queue/provider/EdqsClientQueueFactory.java

@ -20,11 +20,10 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.edqs.EdqsQueue;
public interface EdqsClientQueueFactory {
TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgProducer(EdqsQueue queue);
TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsEventsProducer();
TbQueueRequestTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> createEdqsRequestTemplate();

5
common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java

@ -37,7 +37,6 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.edqs.EdqsQueue;
import org.thingsboard.server.queue.memory.InMemoryStorage;
import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer;
import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer;
@ -239,8 +238,8 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgProducer(EdqsQueue queue) {
return new InMemoryTbQueueProducer<>(storage, queue.getTopic());
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsEventsProducer() {
return new InMemoryTbQueueProducer<>(storage, edqsConfig.getEventsTopic());
}
@Override

7
common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java

@ -54,7 +54,6 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.edqs.EdqsQueue;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
@ -590,10 +589,10 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgProducer(EdqsQueue queue) {
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsEventsProducer() {
return TbKafkaProducerTemplate.<TbProtoQueueMsg<ToEdqsMsg>>builder()
.clientId("edqs-producer-" + queue.name().toLowerCase() + "-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(queue.getTopic()))
.clientId("edqs-events-producer-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(edqsConfig.getEventsTopic()))
.settings(kafkaSettings)
.admin(edqsEventsAdmin)
.build();

7
common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java

@ -52,7 +52,6 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.edqs.EdqsQueue;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
@ -480,10 +479,10 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgProducer(EdqsQueue queue) {
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsEventsProducer() {
return TbKafkaProducerTemplate.<TbProtoQueueMsg<ToEdqsMsg>>builder()
.clientId("edqs-producer-" + queue.name().toLowerCase() + "-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(queue.getTopic()))
.clientId("edqs-events-producer-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(edqsConfig.getEventsTopic()))
.settings(kafkaSettings)
.admin(edqsEventsAdmin)
.build();

14
common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java

@ -48,7 +48,7 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.edqs.EdqsQueue;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
@ -79,6 +79,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueEdgeSettings edgeSettings;
private final TbQueueCalculatedFieldSettings calculatedFieldSettings;
private final EdqsConfig edqsConfig;
private final TbQueueAdmin coreAdmin;
private final TbKafkaAdmin ruleEngineAdmin;
@ -101,7 +102,9 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbKafkaConsumerStatsService consumerStatsService,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbQueueEdgeSettings edgeSettings, TbQueueCalculatedFieldSettings calculatedFieldSettings,
TbQueueEdgeSettings edgeSettings,
TbQueueCalculatedFieldSettings calculatedFieldSettings,
EdqsConfig edqsConfig,
TbKafkaTopicConfigs kafkaTopicConfigs) {
this.topicService = topicService;
this.kafkaSettings = kafkaSettings;
@ -113,6 +116,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
this.transportNotificationSettings = transportNotificationSettings;
this.edgeSettings = edgeSettings;
this.calculatedFieldSettings = calculatedFieldSettings;
this.edqsConfig = edqsConfig;
this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
@ -385,10 +389,10 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsMsgProducer(EdqsQueue queue) {
public TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsEventsProducer() {
return TbKafkaProducerTemplate.<TbProtoQueueMsg<ToEdqsMsg>>builder()
.clientId("edqs-producer-" + queue.name().toLowerCase() + "-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(queue.getTopic()))
.clientId("edqs-events-producer-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(edqsConfig.getEventsTopic()))
.settings(kafkaSettings)
.admin(edqsEventsAdmin)
.build();

4
edqs/src/main/resources/edqs.yml

@ -57,6 +57,10 @@ queue:
partitions: "${TB_EDQS_PARTITIONS:12}"
# EDQS partitioning strategy: tenant (partitions are resolved and distributed by tenant id) or none (partitions are resolved by message key; each instance has all the partitions)
partitioning_strategy: "${TB_EDQS_PARTITIONING_STRATEGY:tenant}"
# EDQS events topic
events_topic: "${TB_EDQS_EVENTS_TOPIC:edqs.events}"
# EDQS state topic
state_topic: "${TB_EDQS_STATE_TOPIC:edqs.state}"
# EDQS requests topic
requests_topic: "${TB_EDQS_REQUESTS_TOPIC:edqs.requests}"
# EDQS responses topic

Loading…
Cancel
Save