diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java index 90d4056afc..3620ad3639 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java @@ -97,7 +97,10 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta .scheduler(eventConsumer.getScheduler()) .taskExecutor(eventConsumer.getTaskExecutor()) .build(); - super.stateService = new KafkaQueueStateService<>(eventConsumer, stateConsumer); + super.stateService = KafkaQueueStateService., TbProtoQueueMsg>builder() + .eventConsumer(eventConsumer) + .stateConsumer(stateConsumer) + .build(); this.stateProducer = (TbKafkaProducerTemplate>) queueFactory.createCalculatedFieldStateProducer(); } diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java index 7d5a0cb0fd..44ca548a68 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java @@ -59,7 +59,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; import org.thingsboard.server.queue.discovery.HashPartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TopicService; -import org.thingsboard.server.queue.edqs.EdqsQueue; import org.thingsboard.server.queue.environment.DistributedLock; import org.thingsboard.server.queue.environment.DistributedLockService; import org.thingsboard.server.queue.provider.EdqsClientQueueFactory; @@ -96,7 +95,7 @@ public class DefaultEdqsService implements EdqsService { private void init() { executor = ThingsBoardExecutors.newWorkStealingPool(12, getClass()); eventsProducer = EdqsProducer.builder() - .producer(queueFactory.createEdqsMsgProducer(EdqsQueue.EVENTS)) + .producer(queueFactory.createEdqsEventsProducer()) .partitionService(edqsPartitionService) .build(); syncLock = distributedLockService.getLock("edqs_sync"); diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java b/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java index ad7b7b970d..239fd9dc42 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java @@ -19,7 +19,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.edqs.EdqsConfig; -import org.thingsboard.server.queue.edqs.EdqsQueue; import org.thingsboard.server.queue.kafka.TbKafkaAdmin; import org.thingsboard.server.queue.kafka.TbKafkaSettings; @@ -37,7 +36,7 @@ public class KafkaEdqsSyncService extends EdqsSyncService { TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, Collections.emptyMap()); this.syncNeeded = kafkaAdmin.areAllTopicsEmpty(IntStream.range(0, edqsConfig.getPartitions()) .mapToObj(partition -> TopicPartitionInfo.builder() - .topic(EdqsQueue.EVENTS.getTopic()) + .topic(edqsConfig.getEventsTopic()) .partition(partition) .build().getFullTopicName()) .collect(Collectors.toSet())); diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 7737f9d027..7ede65e375 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1748,6 +1748,10 @@ queue: partitions: "${TB_EDQS_PARTITIONS:12}" # EDQS partitioning strategy: tenant (partition is resolved by tenant id) or none (no specific strategy, resolving by message key) partitioning_strategy: "${TB_EDQS_PARTITIONING_STRATEGY:tenant}" + # EDQS events topic + events_topic: "${TB_EDQS_EVENTS_TOPIC:edqs.events}" + # EDQS state topic + state_topic: "${TB_EDQS_STATE_TOPIC:edqs.state}" # EDQS requests topic requests_topic: "${TB_EDQS_REQUESTS_TOPIC:edqs.requests}" # EDQS responses topic diff --git a/application/src/test/java/org/thingsboard/server/controller/EntityQueryControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/EntityQueryControllerTest.java index 011399e883..90b7ea7c7b 100644 --- a/application/src/test/java/org/thingsboard/server/controller/EntityQueryControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/EntityQueryControllerTest.java @@ -17,8 +17,6 @@ package org.thingsboard.server.controller; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; -import org.awaitility.Awaitility; -import com.fasterxml.jackson.databind.node.ObjectNode; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -73,7 +71,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.function.BiPredicate; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java index fb15c3fc73..07575220eb 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java @@ -62,7 +62,6 @@ import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.edqs.EdqsComponent; import org.thingsboard.server.queue.edqs.EdqsConfig; import org.thingsboard.server.queue.edqs.EdqsConfig.EdqsPartitioningStrategy; -import org.thingsboard.server.queue.edqs.EdqsQueue; import org.thingsboard.server.queue.edqs.EdqsQueueFactory; import org.thingsboard.server.queue.util.AfterStartUp; @@ -123,8 +122,8 @@ public class EdqsProcessor implements TbQueueHandler, }; eventConsumer = PartitionedQueueConsumerManager.>create() - .queueKey(new QueueKey(ServiceType.EDQS, EdqsQueue.EVENTS.getTopic())) - .topic(EdqsQueue.EVENTS.getTopic()) + .queueKey(new QueueKey(ServiceType.EDQS, config.getEventsTopic())) + .topic(config.getEventsTopic()) .pollInterval(config.getPollInterval()) .msgPackProcessor((msgs, consumer, config) -> { for (TbProtoQueueMsg queueMsg : msgs) { @@ -133,14 +132,14 @@ public class EdqsProcessor implements TbQueueHandler, } try { ToEdqsMsg msg = queueMsg.getValue(); - process(msg, EdqsQueue.EVENTS); + process(msg, true); } catch (Exception t) { log.error("Failed to process message: {}", queueMsg, t); } } consumer.commit(); }) - .consumerCreator((config, partitionId) -> queueFactory.createEdqsMsgConsumer(EdqsQueue.EVENTS)) + .consumerCreator((config, partitionId) -> queueFactory.createEdqsEventsConsumer()) .queueAdmin(queueFactory.getEdqsQueueAdmin()) .consumerExecutor(consumersExecutor) .taskExecutor(taskExecutor) @@ -165,7 +164,7 @@ public class EdqsProcessor implements TbQueueHandler, try { Set newPartitions = event.getNewPartitions().get(new QueueKey(ServiceType.EDQS)); - stateService.process(withTopic(newPartitions, EdqsQueue.STATE.getTopic())); + stateService.process(withTopic(newPartitions, config.getStateTopic())); // eventsConsumer's partitions are updated by stateService responseTemplate.subscribe(withTopic(newPartitions, config.getRequestsTopic())); // TODO: we subscribe to partitions before we are ready. implement consumer-per-partition version for request template @@ -235,7 +234,7 @@ public class EdqsProcessor implements TbQueueHandler, return response; } - public void process(ToEdqsMsg edqsMsg, EdqsQueue queue) { + public void process(ToEdqsMsg edqsMsg, boolean backup) { log.trace("Processing message: {}", edqsMsg); if (edqsMsg.hasEventMsg()) { EdqsEventMsg eventMsg = edqsMsg.getEventMsg(); @@ -252,7 +251,7 @@ public class EdqsProcessor implements TbQueueHandler, } else if (!ObjectType.unversionedTypes.contains(objectType)) { log.warn("[{}] {} {} doesn't have version", tenantId, objectType, key); } - if (queue != EdqsQueue.STATE) { + if (backup) { stateService.save(tenantId, objectType, key, eventType, edqsMsg); } diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/repo/TenantRepo.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/repo/TenantRepo.java index 870574a786..9ef34ce129 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/repo/TenantRepo.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/repo/TenantRepo.java @@ -150,8 +150,9 @@ public class TenantRepo { } } else if (RelationTypeGroup.DASHBOARD.equals(entity.getTypeGroup())) { if (EntityRelation.CONTAINS_TYPE.equals(entity.getType()) && entity.getFrom().getEntityType() == EntityType.CUSTOMER) { - ((CustomerData) getEntityMap(EntityType.CUSTOMER).computeIfAbsent(entity.getFrom().getId(), CustomerData::new)) - .addOrUpdate(getEntityMap(EntityType.DASHBOARD).get(entity.getTo().getId())); + CustomerData customerData = (CustomerData) getOrCreate(entity.getFrom()); + EntityData dashboardData = getOrCreate(entity.getTo()); + customerData.addOrUpdate(dashboardData); } } } finally { @@ -170,8 +171,13 @@ public class TenantRepo { } } else if (RelationTypeGroup.DASHBOARD.equals(entityRelation.getTypeGroup())) { if (EntityRelation.CONTAINS_TYPE.equals(entityRelation.getType()) && entityRelation.getFrom().getEntityType() == EntityType.CUSTOMER) { - ((CustomerData) getEntityMap(EntityType.CUSTOMER).computeIfAbsent(entityRelation.getFrom().getId(), CustomerData::new)) - .remove(getEntityMap(EntityType.DASHBOARD).get(entityRelation.getTo().getId())); + CustomerData customerData = (CustomerData) get(entityRelation.getFrom()); + if (customerData != null) { + EntityData dashboardData = get(entityRelation.getTo()); + if (dashboardData != null) { + customerData.remove(dashboardData); + } + } } } } @@ -197,13 +203,13 @@ public class TenantRepo { entityData.setCustomerId(newCustomerId); if (entityIdMismatch(oldCustomerId, newCustomerId)) { if (oldCustomerId != null) { - CustomerData old = (CustomerData) getEntityMap(EntityType.CUSTOMER).get(oldCustomerId); + CustomerData old = (CustomerData) get(EntityType.CUSTOMER, oldCustomerId); if (old != null) { old.remove(entityData); } } if (newCustomerId != null) { - CustomerData newData = (CustomerData) getEntityMap(EntityType.CUSTOMER).computeIfAbsent(newCustomerId, CustomerData::new); + CustomerData newData = (CustomerData) getOrCreate(EntityType.CUSTOMER, newCustomerId); newData.addOrUpdate(entityData); } } @@ -217,7 +223,7 @@ public class TenantRepo { try { UUID entityId = entity.getFields().getId(); EntityType entityType = entity.getType(); - EntityData removed = getEntityMap(entityType).remove(entityId); + EntityData removed = get(entityType, entityId); if (removed != null) { if (removed.getFields() != null) { getEntitySet(entityType).remove(removed); @@ -225,7 +231,7 @@ public class TenantRepo { edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.fromEntityType(entityType), EdqsEventType.DELETED)); UUID customerId = removed.getCustomerId(); if (customerId != null) { - CustomerData customerData = (CustomerData) getEntityMap(EntityType.CUSTOMER).get(customerId); + CustomerData customerData = (CustomerData) get(EntityType.CUSTOMER, customerId); if (customerData != null) { customerData.remove(removed); } @@ -303,7 +309,11 @@ public class TenantRepo { } private EntityData get(EntityId entityId) { - return getEntityMap(entityId.getEntityType()).get(entityId.getId()); + return get(entityId.getEntityType(), entityId.getId()); + } + + private EntityData get(EntityType entityType, UUID entityId) { + return getEntityMap(entityType).get(entityId); } private EntityData constructEntityData(EntityType entityType, UUID id) { @@ -425,7 +435,7 @@ public class TenantRepo { EntityType entityType = entityId.getEntityType(); return switch (entityType) { case CUSTOMER, TENANT -> { - EntityFields fields = getEntityMap(entityType).get(entityId.getId()).getFields(); + EntityFields fields = get(entityId).getFields(); yield fields != null ? fields.getName() : ""; } default -> throw new RuntimeException("Unsupported entity type: " + entityType); diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java index 21515f1ad8..efdb1ead1c 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java @@ -37,10 +37,13 @@ import org.thingsboard.server.queue.common.state.KafkaQueueStateService; import org.thingsboard.server.queue.common.state.QueueStateService; import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.edqs.EdqsConfig; -import org.thingsboard.server.queue.edqs.EdqsQueue; -import org.thingsboard.server.queue.edqs.EdqsQueueFactory; import org.thingsboard.server.queue.edqs.KafkaEdqsComponent; +import org.thingsboard.server.queue.edqs.KafkaEdqsQueueFactory; +import org.thingsboard.server.queue.kafka.TbKafkaAdmin; +import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; @@ -55,7 +58,7 @@ public class KafkaEdqsStateService implements EdqsStateService { private final EdqsConfig config; private final EdqsPartitionService partitionService; - private final EdqsQueueFactory queueFactory; + private final KafkaEdqsQueueFactory queueFactory; @Autowired @Lazy private EdqsProcessor edqsProcessor; @@ -71,15 +74,16 @@ public class KafkaEdqsStateService implements EdqsStateService { @Override public void init(PartitionedQueueConsumerManager> eventConsumer) { + TbKafkaAdmin queueAdmin = queueFactory.getEdqsQueueAdmin(); stateConsumer = PartitionedQueueConsumerManager.>create() - .queueKey(new QueueKey(ServiceType.EDQS, EdqsQueue.STATE.getTopic())) - .topic(EdqsQueue.STATE.getTopic()) + .queueKey(new QueueKey(ServiceType.EDQS, config.getStateTopic())) + .topic(config.getStateTopic()) .pollInterval(config.getPollInterval()) .msgPackProcessor((msgs, consumer, config) -> { for (TbProtoQueueMsg queueMsg : msgs) { try { ToEdqsMsg msg = queueMsg.getValue(); - edqsProcessor.process(msg, EdqsQueue.STATE); + edqsProcessor.process(msg, false); if (stateReadCount.incrementAndGet() % 100000 == 0) { log.info("[state] Processed {} msgs", stateReadCount.get()); } @@ -89,15 +93,15 @@ public class KafkaEdqsStateService implements EdqsStateService { } consumer.commit(); }) - .consumerCreator((config, partitionId) -> queueFactory.createEdqsMsgConsumer(EdqsQueue.STATE, null)) // not using consumer group management - .queueAdmin(queueFactory.getEdqsQueueAdmin()) + .consumerCreator((config, partitionId) -> queueFactory.createEdqsStateConsumer()) + .queueAdmin(queueAdmin) .consumerExecutor(eventConsumer.getConsumerExecutor()) .taskExecutor(eventConsumer.getTaskExecutor()) .scheduler(eventConsumer.getScheduler()) .uncaughtErrorHandler(edqsProcessor.getErrorHandler()) .build(); - queueStateService = new KafkaQueueStateService<>(eventConsumer, stateConsumer); + TbKafkaConsumerTemplate> eventsToBackupKafkaConsumer = queueFactory.createEdqsEventsToBackupConsumer(); eventsToBackupConsumer = QueueConsumerManager.>builder() .name("edqs-events-to-backup-consumer") .pollInterval(config.getPollInterval()) @@ -135,15 +139,35 @@ public class KafkaEdqsStateService implements EdqsStateService { } consumer.commit(); }) - .consumerCreator(() -> queueFactory.createEdqsMsgConsumer(EdqsQueue.EVENTS, "events-to-backup-consumer-group")) // shared by all instances consumer group + .consumerCreator(() -> eventsToBackupKafkaConsumer) .consumerExecutor(eventConsumer.getConsumerExecutor()) .threadPrefix("edqs-events-to-backup") .build(); stateProducer = EdqsProducer.builder() - .producer(queueFactory.createEdqsMsgProducer(EdqsQueue.STATE)) + .producer(queueFactory.createEdqsStateProducer()) .partitionService(partitionService) .build(); + + queueStateService = KafkaQueueStateService., TbProtoQueueMsg>builder() + .eventConsumer(eventConsumer) + .stateConsumer(stateConsumer) + .eventsStartOffsetsProvider(() -> { + // taking start offsets for events topics from the events-to-backup consumer group, + // since eventConsumer doesn't use consumer group management and thus offset tracking + // (because we need to be able to consume the same topic-partition by multiple instances) + Map offsets = new HashMap<>(); + try { + queueAdmin.getConsumerGroupOffsets(eventsToBackupKafkaConsumer.getGroupId()) + .forEach((topicPartition, offsetAndMetadata) -> { + offsets.put(topicPartition.topic(), offsetAndMetadata.offset()); + }); + } catch (Exception e) { + log.error("Failed to get consumer group offsets for {}", eventsToBackupKafkaConsumer.getGroupId(), e); + } + return offsets; + }) + .build(); } @Override @@ -151,7 +175,7 @@ public class KafkaEdqsStateService implements EdqsStateService { if (queueStateService.getPartitions().isEmpty()) { Set allPartitions = IntStream.range(0, config.getPartitions()) .mapToObj(partition -> TopicPartitionInfo.builder() - .topic(EdqsQueue.EVENTS.getTopic()) + .topic(config.getEventsTopic()) .partition(partition) .build()) .collect(Collectors.toSet()); diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java index 383115ddf1..cde21edfaf 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java @@ -29,7 +29,6 @@ import org.thingsboard.server.edqs.util.EdqsRocksDb; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; -import org.thingsboard.server.queue.edqs.EdqsQueue; import org.thingsboard.server.queue.edqs.InMemoryEdqsComponent; import java.util.Set; @@ -61,14 +60,14 @@ public class LocalEdqsStateService implements EdqsStateService { try { ToEdqsMsg edqsMsg = ToEdqsMsg.parseFrom(value); log.trace("[{}] Restored msg from RocksDB: {}", key, edqsMsg); - processor.process(edqsMsg, EdqsQueue.STATE); + processor.process(edqsMsg, false); } catch (Exception e) { log.error("[{}] Failed to restore value", key, e); } }); log.info("Restore completed"); } - eventConsumer.update(withTopic(partitions, EdqsQueue.EVENTS.getTopic())); + eventConsumer.update(withTopic(partitions, eventConsumer.getTopic())); this.partitions = partitions; } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfo.java b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfo.java index f09826e9b6..b18debaf49 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfo.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfo.java @@ -90,9 +90,7 @@ public class TopicPartitionInfo { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; TopicPartitionInfo that = (TopicPartitionInfo) o; - return topic.equals(that.topic) && - Objects.equals(tenantId, that.tenantId) && - Objects.equals(partition, that.partition) && + return Objects.equals(partition, that.partition) && fullTopicName.equals(that.fullTopicName); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java index 14394bbbe9..ef9728344c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java @@ -26,6 +26,7 @@ import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.UpdateConfigTask; import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.UpdatePartitionsTask; import org.thingsboard.server.queue.discovery.QueueKey; +import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; import java.util.Collection; import java.util.Collections; @@ -43,6 +44,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.function.Function; @Slf4j public class MainQueueConsumerManager { @@ -296,7 +298,7 @@ public class MainQueueConsumerManager removedPartitions) { @@ -304,13 +306,19 @@ public class MainQueueConsumerManager Optional.ofNullable(consumers.remove(tpi)).ifPresent(TbQueueConsumerTask::awaitCompletion)); } - protected void addPartitions(Set partitions, Consumer onStop) { + protected void addPartitions(Set partitions, Consumer onStop, Function startOffsetProvider) { partitions.forEach(tpi -> { Integer partitionId = tpi.getPartition().orElse(-1); String key = queueKey + "-" + partitionId; Runnable callback = onStop != null ? () -> onStop.accept(tpi) : null; - TbQueueConsumerTask consumer = new TbQueueConsumerTask<>(key, () -> consumerCreator.apply(config, partitionId), callback); + TbQueueConsumerTask consumer = new TbQueueConsumerTask<>(key, () -> { + TbQueueConsumer queueConsumer = consumerCreator.apply(config, partitionId); + if (startOffsetProvider != null && queueConsumer instanceof TbKafkaConsumerTemplate kafkaConsumer) { + kafkaConsumer.setStartOffsetProvider(startOffsetProvider); + } + return queueConsumer; + }, callback); consumers.put(tpi, consumer); consumer.subscribe(Set.of(tpi)); launchConsumer(consumer); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java index f25a98adf4..0de1e53753 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java @@ -33,6 +33,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.function.Function; @Slf4j public class PartitionedQueueConsumerManager extends MainQueueConsumerManager { @@ -57,7 +58,7 @@ public class PartitionedQueueConsumerManager extends MainQ protected void processTask(TbQueueConsumerManagerTask task) { if (task instanceof AddPartitionsTask addPartitionsTask) { log.info("[{}] Added partitions: {}", queueKey, addPartitionsTask.partitions()); - consumerWrapper.addPartitions(addPartitionsTask.partitions(), addPartitionsTask.onStop()); + consumerWrapper.addPartitions(addPartitionsTask.partitions(), addPartitionsTask.onStop(), addPartitionsTask.startOffsetProvider()); } else if (task instanceof RemovePartitionsTask removePartitionsTask) { log.info("[{}] Removed partitions: {}", queueKey, removePartitionsTask.partitions()); consumerWrapper.removePartitions(removePartitionsTask.partitions()); @@ -76,11 +77,11 @@ public class PartitionedQueueConsumerManager extends MainQ } public void addPartitions(Set partitions) { - addPartitions(partitions, null); + addPartitions(partitions, null, null); } - public void addPartitions(Set partitions, Consumer onStop) { - addTask(new AddPartitionsTask(partitions, onStop)); + public void addPartitions(Set partitions, Consumer onStop, Function startOffsetProvider) { + addTask(new AddPartitionsTask(partitions, onStop, startOffsetProvider)); } public void removePartitions(Set partitions) { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerManagerTask.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerManagerTask.java index e0dd9b808b..a287a391af 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerManagerTask.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/TbQueueConsumerManagerTask.java @@ -20,6 +20,7 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import java.util.Set; import java.util.function.Consumer; +import java.util.function.Function; public interface TbQueueConsumerManagerTask { @@ -46,7 +47,9 @@ public interface TbQueueConsumerManagerTask { } } - record AddPartitionsTask(Set partitions, Consumer onStop) implements TbQueueConsumerManagerTask { + record AddPartitionsTask(Set partitions, + Consumer onStop, + Function startOffsetProvider) implements TbQueueConsumerManagerTask { @Override public QueueTaskType getType() { return QueueTaskType.ADD_PARTITIONS; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/state/KafkaQueueStateService.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/state/KafkaQueueStateService.java index 9adc6bb996..bf02afe86c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/state/KafkaQueueStateService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/state/KafkaQueueStateService.java @@ -15,13 +15,16 @@ */ package org.thingsboard.server.queue.common.state; +import lombok.Builder; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; import org.thingsboard.server.queue.discovery.QueueKey; +import java.util.Map; import java.util.Set; +import java.util.function.Supplier; import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTopic; @@ -29,14 +32,21 @@ import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTop public class KafkaQueueStateService extends QueueStateService { private final PartitionedQueueConsumerManager stateConsumer; + private final Supplier> eventsStartOffsetsProvider; - public KafkaQueueStateService(PartitionedQueueConsumerManager eventConsumer, PartitionedQueueConsumerManager stateConsumer) { + @Builder + public KafkaQueueStateService(PartitionedQueueConsumerManager eventConsumer, + PartitionedQueueConsumerManager stateConsumer, + Supplier> eventsStartOffsetsProvider) { super(eventConsumer); this.stateConsumer = stateConsumer; + this.eventsStartOffsetsProvider = eventsStartOffsetsProvider; } @Override protected void addPartitions(QueueKey queueKey, Set partitions) { + Map eventsStartOffsets = eventsStartOffsetsProvider != null ? eventsStartOffsetsProvider.get() : null; // remembering the offsets before subscribing to states + Set statePartitions = withTopic(partitions, stateConsumer.getTopic()); partitionsInProgress.addAll(statePartitions); stateConsumer.addPartitions(statePartitions, statePartition -> { @@ -51,12 +61,12 @@ public class KafkaQueueStateService TopicPartitionInfo eventPartition = statePartition.withTopic(eventConsumer.getTopic()); if (this.partitions.get(queueKey).contains(eventPartition)) { - eventConsumer.addPartitions(Set.of(eventPartition)); + eventConsumer.addPartitions(Set.of(eventPartition), null, eventsStartOffsets != null ? eventsStartOffsets::get : null); } } finally { readLock.unlock(); } - }); + }, null); } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsConfig.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsConfig.java index e4e1e81815..401b451f59 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsConfig.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsConfig.java @@ -30,6 +30,10 @@ public class EdqsConfig { @Value("#{'${queue.edqs.partitioning_strategy:tenant}'.toUpperCase()}") private EdqsPartitioningStrategy partitioningStrategy; + @Value("${queue.edqs.events_topic:edqs.events}") + private String eventsTopic; + @Value("${queue.edqs.state_topic:edqs.state}") + private String stateTopic; @Value("${queue.edqs.requests_topic:edqs.requests}") private String requestsTopic; @Value("${queue.edqs.responses_topic:edqs.responses}") diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueue.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueue.java deleted file mode 100644 index d859b50994..0000000000 --- a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueue.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Copyright © 2016-2025 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.queue.edqs; - -import lombok.Getter; - -@Getter -public enum EdqsQueue { - - EVENTS("edqs.events", false, false), - STATE("edqs.state", true, true); - - private final String topic; - private final boolean readFromBeginning; - private final boolean stopWhenRead; - - EdqsQueue(String topic, boolean readFromBeginning, boolean stopWhenRead) { - this.topic = topic; - this.readFromBeginning = readFromBeginning; - this.stopWhenRead = stopWhenRead; - } - -} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueueFactory.java index b5541c740b..5c0d68779a 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueueFactory.java @@ -25,11 +25,13 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; public interface EdqsQueueFactory { - TbQueueConsumer> createEdqsMsgConsumer(EdqsQueue queue); + TbQueueConsumer> createEdqsEventsConsumer(); - TbQueueConsumer> createEdqsMsgConsumer(EdqsQueue queue, String group); + TbQueueConsumer> createEdqsEventsToBackupConsumer(); - TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue); + TbQueueConsumer> createEdqsStateConsumer(); + + TbQueueProducer> createEdqsStateProducer(); TbQueueResponseTemplate, TbProtoQueueMsg> createEdqsResponseTemplate(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsQueueFactory.java index 0801399c14..8c670e66c0 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsQueueFactory.java @@ -43,24 +43,23 @@ public class InMemoryEdqsQueueFactory implements EdqsQueueFactory { private final TbQueueAdmin queueAdmin; @Override - public TbQueueConsumer> createEdqsMsgConsumer(EdqsQueue queue) { - if (queue == EdqsQueue.STATE) { - throw new UnsupportedOperationException(); - } - return new InMemoryTbQueueConsumer<>(storage, queue.getTopic()); + public TbQueueConsumer> createEdqsEventsConsumer() { + return new InMemoryTbQueueConsumer<>(storage, edqsConfig.getEventsTopic()); } @Override - public TbQueueConsumer> createEdqsMsgConsumer(EdqsQueue queue, String group) { - return createEdqsMsgConsumer(queue); + public TbQueueConsumer> createEdqsEventsToBackupConsumer() { + throw new UnsupportedOperationException(); } @Override - public TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue) { - if (queue == EdqsQueue.STATE) { - throw new UnsupportedOperationException(); - } - return new InMemoryTbQueueProducer<>(storage, queue.getTopic()); + public TbQueueConsumer> createEdqsStateConsumer() { + throw new UnsupportedOperationException(); + } + + @Override + public TbQueueProducer> createEdqsStateProducer() { + throw new UnsupportedOperationException(); } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java index 6fdab28133..ab88943b10 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java @@ -19,11 +19,8 @@ import org.springframework.stereotype.Component; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.common.stats.StatsType; -import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; -import org.thingsboard.server.queue.TbQueueAdmin; -import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueResponseTemplate; import org.thingsboard.server.queue.common.DefaultTbQueueResponseTemplate; @@ -71,54 +68,68 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory { } @Override - public TbQueueConsumer> createEdqsMsgConsumer(EdqsQueue queue) { - String consumerGroup = "edqs-" + queue.name().toLowerCase() + "-consumer-group-" + serviceInfoProvider.getServiceId(); - return createEdqsMsgConsumer(queue, consumerGroup); + public TbKafkaConsumerTemplate> createEdqsEventsConsumer() { + return createEdqsMsgConsumer(edqsConfig.getEventsTopic(), + "edqs-events-" + consumerCounter.getAndIncrement() + "-consumer-" + serviceInfoProvider.getServiceId(), + null, // not using consumer group management, offsets from the edqs-events-to-backup-consumer-group are used (see KafkaEdqsStateService) + false, edqsEventsAdmin); } @Override - public TbQueueConsumer> createEdqsMsgConsumer(EdqsQueue queue, String group) { + public TbKafkaConsumerTemplate> createEdqsEventsToBackupConsumer() { + return createEdqsMsgConsumer(edqsConfig.getEventsTopic(), + "edqs-events-to-backup-consumer-" + serviceInfoProvider.getServiceId(), + "edqs-events-to-backup-consumer-group", + false, edqsEventsAdmin); + } + + @Override + public TbKafkaConsumerTemplate> createEdqsStateConsumer() { + return createEdqsMsgConsumer(edqsConfig.getStateTopic(), + "edqs-state-" + consumerCounter.getAndIncrement() + "-consumer-" + serviceInfoProvider.getServiceId(), + null, // not using consumer group management + true, edqsStateAdmin); + } + + public TbKafkaConsumerTemplate> createEdqsMsgConsumer(String topic, String clientId, String group, boolean readFullAndStop, TbKafkaAdmin admin) { return TbKafkaConsumerTemplate.>builder() .settings(kafkaSettings) - .topic(topicService.buildTopicName(queue.getTopic())) - .readFromBeginning(queue.isReadFromBeginning()) - .stopWhenRead(queue.isStopWhenRead()) - .clientId("edqs-" + queue.name().toLowerCase() + "-" + consumerCounter.getAndIncrement() + "-consumer-" + serviceInfoProvider.getServiceId()) + .topic(topicService.buildTopicName(topic)) + .readFromBeginning(readFullAndStop) + .stopWhenRead(readFullAndStop) + .clientId(clientId) .groupId(topicService.buildTopicName(group)) .decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdqsMsg.parseFrom(msg.getData()), msg.getHeaders())) - .admin(queue == EdqsQueue.STATE ? edqsStateAdmin : edqsEventsAdmin) + .admin(admin) .statsService(consumerStatsService) .build(); } @Override - public TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue) { + public TbQueueProducer> createEdqsStateProducer() { return TbKafkaProducerTemplate.>builder() - .clientId("edqs-" + queue.name().toLowerCase() + "-producer-" + serviceInfoProvider.getServiceId()) - .defaultTopic(topicService.buildTopicName(queue.getTopic())) + .clientId("edqs-state-producer-" + serviceInfoProvider.getServiceId()) + .defaultTopic(topicService.buildTopicName(edqsConfig.getStateTopic())) .settings(kafkaSettings) - .admin(queue == EdqsQueue.STATE ? edqsStateAdmin : edqsEventsAdmin) + .admin(edqsStateAdmin) .build(); } @Override public TbQueueResponseTemplate, TbProtoQueueMsg> createEdqsResponseTemplate() { - var requestConsumer = TbKafkaConsumerTemplate.>builder() - .settings(kafkaSettings) - .topic(topicService.buildTopicName(edqsConfig.getRequestsTopic())) - .clientId("edqs-requests-consumer-" + serviceInfoProvider.getServiceId()) - .groupId(topicService.buildTopicName("edqs-requests-consumer-group")) - .decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToEdqsMsg.parseFrom(msg.getData()), msg.getHeaders())) - .admin(edqsRequestsAdmin) - .statsService(consumerStatsService); + var requestConsumer = createEdqsMsgConsumer(edqsConfig.getRequestsTopic(), + "edqs-requests-consumer-" + serviceInfoProvider.getServiceId(), + "edqs-requests-consumer-group", + false, edqsRequestsAdmin); var responseProducer = TbKafkaProducerTemplate.>builder() .settings(kafkaSettings) .clientId("edqs-response-producer-" + serviceInfoProvider.getServiceId()) .defaultTopic(topicService.buildTopicName(edqsConfig.getResponsesTopic())) - .admin(edqsRequestsAdmin); + .admin(edqsRequestsAdmin) + .build(); return DefaultTbQueueResponseTemplate., TbProtoQueueMsg>builder() - .requestTemplate(requestConsumer.build()) - .responseTemplate(responseProducer.build()) + .requestTemplate(requestConsumer) + .responseTemplate(responseProducer) .maxPendingRequests(edqsConfig.getMaxPendingRequests()) .requestTimeout(edqsConfig.getMaxRequestTimeout()) .pollInterval(edqsConfig.getPollInterval()) @@ -128,7 +139,7 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory { } @Override - public TbQueueAdmin getEdqsQueueAdmin() { + public TbKafkaAdmin getEdqsQueueAdmin() { return edqsEventsAdmin; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java index 6835b44da7..37f881b49c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java @@ -16,6 +16,7 @@ package org.thingsboard.server.queue.kafka; import lombok.Getter; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.ListOffsetsResult; @@ -159,8 +160,7 @@ public class TbKafkaAdmin implements TbQueueAdmin { if (partitionId == null) { return; } - Map oldOffsets = - settings.getAdminClient().listConsumerGroupOffsets(fatGroupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); + Map oldOffsets = getConsumerGroupOffsets(fatGroupId); if (oldOffsets.isEmpty()) { return; } @@ -171,8 +171,7 @@ public class TbKafkaAdmin implements TbQueueAdmin { continue; } var om = consumerOffset.getValue(); - Map newOffsets = - settings.getAdminClient().listConsumerGroupOffsets(newGroupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); + Map newOffsets = getConsumerGroupOffsets(newGroupId); var existingOffset = newOffsets.get(tp); if (existingOffset == null) { @@ -189,6 +188,11 @@ public class TbKafkaAdmin implements TbQueueAdmin { } } + @SneakyThrows + public Map getConsumerGroupOffsets(String groupId) { + return settings.getAdminClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); + } + public boolean isTopicEmpty(String topic) { return areAllTopicsEmpty(Set.of(topic)); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java index 8c4ea788c6..3abed8475a 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java @@ -16,6 +16,8 @@ package org.thingsboard.server.queue.kafka; import lombok.Builder; +import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; @@ -39,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -53,8 +56,11 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue private final TbKafkaDecoder decoder; private final TbKafkaConsumerStatsService statsService; + @Getter private final String groupId; + @Setter + private Function startOffsetProvider; private final boolean readFromBeginning; // reset offset to beginning private final boolean stopWhenRead; // stop consuming when reached end offset remembered on start private int readCount; @@ -185,9 +191,21 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue private void onPartitionsAssigned(Collection partitions) { if (readFromBeginning) { + log.debug("Seeking to beginning for {}", partitions); consumer.seekToBeginning(partitions); + } else if (startOffsetProvider != null) { + partitions.forEach(topicPartition -> { + Long offset = startOffsetProvider.apply(topicPartition.topic()); + if (offset != null) { + log.debug("Seeking to offset {} for {}", offset, topicPartition); + consumer.seek(topicPartition, offset); + } else { + log.info("No start offset provided for {}", topicPartition); + } + }); } if (stopWhenRead) { + log.debug("Getting end offsets for {}", partitions); endOffsets = consumer.endOffsets(partitions).entrySet().stream() .filter(entry -> entry.getValue() > 0) .collect(Collectors.toMap(entry -> entry.getKey().partition(), Map.Entry::getValue)); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/EdqsClientQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/EdqsClientQueueFactory.java index 95be49f82b..f2309f7224 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/EdqsClientQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/EdqsClientQueueFactory.java @@ -20,11 +20,10 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.edqs.EdqsQueue; public interface EdqsClientQueueFactory { - TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue); + TbQueueProducer> createEdqsEventsProducer(); TbQueueRequestTemplate, TbProtoQueueMsg> createEdqsRequestTemplate(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java index bd8b4bd4f8..89d83af826 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java @@ -37,7 +37,6 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.edqs.EdqsConfig; -import org.thingsboard.server.queue.edqs.EdqsQueue; import org.thingsboard.server.queue.memory.InMemoryStorage; import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer; import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer; @@ -239,8 +238,8 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE } @Override - public TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue) { - return new InMemoryTbQueueProducer<>(storage, queue.getTopic()); + public TbQueueProducer> createEdqsEventsProducer() { + return new InMemoryTbQueueProducer<>(storage, edqsConfig.getEventsTopic()); } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index c50344a23f..dcebe085b3 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -54,7 +54,6 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.edqs.EdqsConfig; -import org.thingsboard.server.queue.edqs.EdqsQueue; import org.thingsboard.server.queue.kafka.TbKafkaAdmin; import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService; import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; @@ -590,10 +589,10 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi } @Override - public TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue) { + public TbQueueProducer> createEdqsEventsProducer() { return TbKafkaProducerTemplate.>builder() - .clientId("edqs-producer-" + queue.name().toLowerCase() + "-" + serviceInfoProvider.getServiceId()) - .defaultTopic(topicService.buildTopicName(queue.getTopic())) + .clientId("edqs-events-producer-" + serviceInfoProvider.getServiceId()) + .defaultTopic(topicService.buildTopicName(edqsConfig.getEventsTopic())) .settings(kafkaSettings) .admin(edqsEventsAdmin) .build(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java index 2a1dc4171f..e9c42b0022 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java @@ -52,7 +52,6 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.edqs.EdqsConfig; -import org.thingsboard.server.queue.edqs.EdqsQueue; import org.thingsboard.server.queue.kafka.TbKafkaAdmin; import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService; import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; @@ -480,10 +479,10 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { } @Override - public TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue) { + public TbQueueProducer> createEdqsEventsProducer() { return TbKafkaProducerTemplate.>builder() - .clientId("edqs-producer-" + queue.name().toLowerCase() + "-" + serviceInfoProvider.getServiceId()) - .defaultTopic(topicService.buildTopicName(queue.getTopic())) + .clientId("edqs-events-producer-" + serviceInfoProvider.getServiceId()) + .defaultTopic(topicService.buildTopicName(edqsConfig.getEventsTopic())) .settings(kafkaSettings) .admin(edqsEventsAdmin) .build(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index 2e342b0dd2..d43ef5c9ac 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -48,7 +48,7 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TopicService; -import org.thingsboard.server.queue.edqs.EdqsQueue; +import org.thingsboard.server.queue.edqs.EdqsConfig; import org.thingsboard.server.queue.kafka.TbKafkaAdmin; import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService; import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; @@ -79,6 +79,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { private final TbQueueTransportNotificationSettings transportNotificationSettings; private final TbQueueEdgeSettings edgeSettings; private final TbQueueCalculatedFieldSettings calculatedFieldSettings; + private final EdqsConfig edqsConfig; private final TbQueueAdmin coreAdmin; private final TbKafkaAdmin ruleEngineAdmin; @@ -101,7 +102,9 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbKafkaConsumerStatsService consumerStatsService, TbQueueTransportNotificationSettings transportNotificationSettings, - TbQueueEdgeSettings edgeSettings, TbQueueCalculatedFieldSettings calculatedFieldSettings, + TbQueueEdgeSettings edgeSettings, + TbQueueCalculatedFieldSettings calculatedFieldSettings, + EdqsConfig edqsConfig, TbKafkaTopicConfigs kafkaTopicConfigs) { this.topicService = topicService; this.kafkaSettings = kafkaSettings; @@ -113,6 +116,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { this.transportNotificationSettings = transportNotificationSettings; this.edgeSettings = edgeSettings; this.calculatedFieldSettings = calculatedFieldSettings; + this.edqsConfig = edqsConfig; this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs()); this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs()); @@ -385,10 +389,10 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { } @Override - public TbQueueProducer> createEdqsMsgProducer(EdqsQueue queue) { + public TbQueueProducer> createEdqsEventsProducer() { return TbKafkaProducerTemplate.>builder() - .clientId("edqs-producer-" + queue.name().toLowerCase() + "-" + serviceInfoProvider.getServiceId()) - .defaultTopic(topicService.buildTopicName(queue.getTopic())) + .clientId("edqs-events-producer-" + serviceInfoProvider.getServiceId()) + .defaultTopic(topicService.buildTopicName(edqsConfig.getEventsTopic())) .settings(kafkaSettings) .admin(edqsEventsAdmin) .build(); diff --git a/edqs/src/main/resources/edqs.yml b/edqs/src/main/resources/edqs.yml index 05d942ff23..353d76391b 100644 --- a/edqs/src/main/resources/edqs.yml +++ b/edqs/src/main/resources/edqs.yml @@ -57,6 +57,10 @@ queue: partitions: "${TB_EDQS_PARTITIONS:12}" # EDQS partitioning strategy: tenant (partitions are resolved and distributed by tenant id) or none (partitions are resolved by message key; each instance has all the partitions) partitioning_strategy: "${TB_EDQS_PARTITIONING_STRATEGY:tenant}" + # EDQS events topic + events_topic: "${TB_EDQS_EVENTS_TOPIC:edqs.events}" + # EDQS state topic + state_topic: "${TB_EDQS_STATE_TOPIC:edqs.state}" # EDQS requests topic requests_topic: "${TB_EDQS_REQUESTS_TOPIC:edqs.requests}" # EDQS responses topic