diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java index 617902bd3d..57cbace546 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java @@ -73,6 +73,7 @@ import org.thingsboard.server.queue.util.AfterStartUp; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -150,12 +151,10 @@ public class DefaultEdqsService implements EdqsService { executor.submit(() -> { try { EdqsSyncState syncState = getSyncState(); - if (edqsSyncService.isSyncNeeded() || syncState == null || syncState.getStatus() != EdqsSyncStatus.FINISHED) { - if (hashPartitionService.isSystemPartitionMine(ServiceType.TB_CORE)) { - processSystemRequest(ToCoreEdqsRequest.builder() - .syncRequest(new EdqsSyncRequest()) - .build()); - } + if (edqsSyncService.isSyncNeeded() || syncState == null) { + requestEdqsSync(new EdqsSyncRequest()); + } else if (syncState.getStatus() != EdqsSyncStatus.FINISHED) { + requestEdqsSync(new EdqsSyncRequest(syncState.getObjectTypes())); } else { // only if topic/RocksDB is not empty and sync is finished onSyncStatusUpdate(EdqsSyncStatus.FINISHED); @@ -166,11 +165,19 @@ public class DefaultEdqsService implements EdqsService { }); } + private void requestEdqsSync(EdqsSyncRequest syncRequest) { + if (hashPartitionService.isSystemPartitionMine(ServiceType.TB_CORE)) { + processSystemRequest(ToCoreEdqsRequest.builder() + .syncRequest(syncRequest) + .build()); + } + } + @Override public void processSystemRequest(ToCoreEdqsRequest request) { log.info("Processing system request {}", request); if (request.getSyncRequest() != null) { - saveSyncState(EdqsSyncStatus.REQUESTED); + saveSyncState(EdqsSyncStatus.REQUESTED, request.getSyncRequest().getObjectTypes()); } broadcast(ToCoreEdqsMsg.builder() .syncRequest(request.getSyncRequest()) @@ -198,7 +205,8 @@ public class DefaultEdqsService implements EdqsService { onSyncStatusUpdate(msg.getSyncStatus()); } - if (msg.getSyncRequest() != null) { + EdqsSyncRequest syncRequest = msg.getSyncRequest(); + if (syncRequest != null) { syncLock.lock(); try { EdqsSyncState syncState = getSyncState(); @@ -209,15 +217,15 @@ public class DefaultEdqsService implements EdqsService { return; } } - saveSyncState(EdqsSyncStatus.STARTED); + saveSyncState(EdqsSyncStatus.STARTED, syncRequest.getObjectTypes()); - edqsSyncService.sync(); + edqsSyncService.sync(syncRequest); saveSyncState(EdqsSyncStatus.FINISHED); broadcastSyncStatusUpdate(EdqsSyncStatus.FINISHED); } catch (Exception e) { log.error("Failed to complete sync", e); - saveSyncState(EdqsSyncStatus.FAILED); + saveSyncState(EdqsSyncStatus.FAILED, syncRequest.getObjectTypes()); broadcastSyncStatusUpdate(EdqsSyncStatus.FAILED); } finally { syncLock.unlock(); @@ -369,7 +377,12 @@ public class DefaultEdqsService implements EdqsService { @SneakyThrows private void saveSyncState(EdqsSyncStatus status) { - EdqsSyncState state = new EdqsSyncState(status); + saveSyncState(status, null); + } + + @SneakyThrows + private void saveSyncState(EdqsSyncStatus status, Set objectTypes) { + EdqsSyncState state = new EdqsSyncState(status, objectTypes); log.info("New EDQS sync state: {}", state); attributesService.save(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, AttributeScope.SERVER_SCOPE, new BaseAttributeKvEntry( new JsonDataEntry("edqsSyncState", JacksonUtil.toString(state)), @@ -383,11 +396,4 @@ public class DefaultEdqsService implements EdqsService { eventsProducer.stop(); } - @Data - @AllArgsConstructor - @NoArgsConstructor - private static class EdqsSyncState { - private EdqsSyncStatus status; - } - } diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java b/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java index 88f344d048..4395ed4f1c 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java @@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.ObjectType; import org.thingsboard.server.common.data.edqs.AttributeKv; import org.thingsboard.server.common.data.edqs.EdqsEventType; import org.thingsboard.server.common.data.edqs.EdqsObject; +import org.thingsboard.server.common.data.edqs.EdqsSyncRequest; import org.thingsboard.server.common.data.edqs.Entity; import org.thingsboard.server.common.data.edqs.LatestTsKv; import org.thingsboard.server.common.data.edqs.fields.EntityFields; @@ -83,21 +84,38 @@ public abstract class EdqsSyncService { public abstract boolean isSyncNeeded(); - public void sync() { + public void sync(EdqsSyncRequest syncRequest) { log.info("Synchronizing data to EDQS"); long startTs = System.currentTimeMillis(); counters.clear(); - syncTenantEntities(); - syncRelations(); - loadKeyDictionary(); - syncAttributes(); - syncLatestTimeseries(); + if (syncRequest.getObjectTypes() != null && !syncRequest.getObjectTypes().isEmpty()) { + log.info("Sync request for entity types: {}", syncRequest.getObjectTypes()); + for (ObjectType objectType : syncRequest.getObjectTypes()) { + syncObjectType(objectType); + } + } else { + log.info("Sync request for all entity types"); + syncTenantEntities(); + syncRelations(); + loadKeyDictionary(); + syncAttributes(); + syncLatestTimeseries(); + } counters.clear(); log.info("Finished synchronizing data to EDQS in {} ms", (System.currentTimeMillis() - startTs)); } + private void syncObjectType(ObjectType objectType) { + switch (objectType) { + case RELATION -> syncRelations(); + case ATTRIBUTE_KV -> syncAttributes(); + case LATEST_TS_KV -> syncLatestTimeseries(); + default -> syncTenantEntity(objectType); + } + } + private void process(TenantId tenantId, ObjectType type, EdqsObject object) { AtomicInteger counter = counters.computeIfAbsent(type, t -> new AtomicInteger()); if (counter.incrementAndGet() % 10000 == 0) { @@ -108,26 +126,30 @@ public abstract class EdqsSyncService { private void syncTenantEntities() { for (ObjectType type : edqsTenantTypes) { - log.info("Synchronizing {} entities to EDQS", type); - long ts = System.currentTimeMillis(); - EntityType entityType = type.toEntityType(); - Dao dao = entityDaoRegistry.getDao(entityType); - UUID lastId = UUID.fromString("00000000-0000-0000-0000-000000000000"); - while (true) { - var batch = dao.findNextBatch(lastId, entityBatchSize); - if (batch.isEmpty()) { - break; - } - for (EntityFields entityFields : batch) { - TenantId tenantId = TenantId.fromUUID(entityFields.getTenantId()); - entityInfoMap.put(entityFields.getId(), new EntityIdInfo(entityType, tenantId)); - process(tenantId, type, new Entity(entityType, entityFields)); - } - EntityFields lastRecord = batch.get(batch.size() - 1); - lastId = lastRecord.getId(); + syncTenantEntity(type); + } + } + + private void syncTenantEntity(ObjectType type) { + log.info("Synchronizing {} entities to EDQS", type); + long ts = System.currentTimeMillis(); + EntityType entityType = type.toEntityType(); + Dao dao = entityDaoRegistry.getDao(entityType); + UUID lastId = UUID.fromString("00000000-0000-0000-0000-000000000000"); + while (true) { + var batch = dao.findNextBatch(lastId, entityBatchSize); + if (batch.isEmpty()) { + break; + } + for (EntityFields entityFields : batch) { + TenantId tenantId = TenantId.fromUUID(entityFields.getTenantId()); + entityInfoMap.put(entityFields.getId(), new EntityIdInfo(entityType, tenantId)); + process(tenantId, type, new Entity(entityType, entityFields)); } - log.info("Finished synchronizing {} entities to EDQS in {} ms", type, (System.currentTimeMillis() - ts)); + EntityFields lastRecord = batch.get(batch.size() - 1); + lastId = lastRecord.getId(); } + log.info("Finished synchronizing {} entities to EDQS in {} ms", type, (System.currentTimeMillis() - ts)); } private void syncRelations() { diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncState.java b/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncState.java new file mode 100644 index 0000000000..07945cea8b --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncState.java @@ -0,0 +1,32 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edqs; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.thingsboard.server.common.data.ObjectType; +import org.thingsboard.server.common.data.edqs.EdqsState.EdqsSyncStatus; + +import java.util.Set; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class EdqsSyncState { + private EdqsSyncStatus status; + private Set objectTypes; +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsSyncRequest.java b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsSyncRequest.java index 12f7068c71..dc8292c55e 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsSyncRequest.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsSyncRequest.java @@ -16,9 +16,17 @@ package org.thingsboard.server.common.data.edqs; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; +import org.thingsboard.server.common.data.ObjectType; + +import java.util.Set; @Data +@AllArgsConstructor +@NoArgsConstructor @JsonIgnoreProperties public class EdqsSyncRequest { + Set objectTypes; }