Browse Source

added ability to sync EDQS entities by entity type

pull/14595/head
dashevchenko 6 months ago
parent
commit
f19540271f
  1. 44
      application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java
  2. 70
      application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java
  3. 32
      application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncState.java
  4. 8
      common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsSyncRequest.java

44
application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java

@ -73,6 +73,7 @@ import org.thingsboard.server.queue.util.AfterStartUp;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -150,12 +151,10 @@ public class DefaultEdqsService implements EdqsService {
executor.submit(() -> {
try {
EdqsSyncState syncState = getSyncState();
if (edqsSyncService.isSyncNeeded() || syncState == null || syncState.getStatus() != EdqsSyncStatus.FINISHED) {
if (hashPartitionService.isSystemPartitionMine(ServiceType.TB_CORE)) {
processSystemRequest(ToCoreEdqsRequest.builder()
.syncRequest(new EdqsSyncRequest())
.build());
}
if (edqsSyncService.isSyncNeeded() || syncState == null) {
requestEdqsSync(new EdqsSyncRequest());
} else if (syncState.getStatus() != EdqsSyncStatus.FINISHED) {
requestEdqsSync(new EdqsSyncRequest(syncState.getObjectTypes()));
} else {
// only if topic/RocksDB is not empty and sync is finished
onSyncStatusUpdate(EdqsSyncStatus.FINISHED);
@ -166,11 +165,19 @@ public class DefaultEdqsService implements EdqsService {
});
}
private void requestEdqsSync(EdqsSyncRequest syncRequest) {
if (hashPartitionService.isSystemPartitionMine(ServiceType.TB_CORE)) {
processSystemRequest(ToCoreEdqsRequest.builder()
.syncRequest(syncRequest)
.build());
}
}
@Override
public void processSystemRequest(ToCoreEdqsRequest request) {
log.info("Processing system request {}", request);
if (request.getSyncRequest() != null) {
saveSyncState(EdqsSyncStatus.REQUESTED);
saveSyncState(EdqsSyncStatus.REQUESTED, request.getSyncRequest().getObjectTypes());
}
broadcast(ToCoreEdqsMsg.builder()
.syncRequest(request.getSyncRequest())
@ -198,7 +205,8 @@ public class DefaultEdqsService implements EdqsService {
onSyncStatusUpdate(msg.getSyncStatus());
}
if (msg.getSyncRequest() != null) {
EdqsSyncRequest syncRequest = msg.getSyncRequest();
if (syncRequest != null) {
syncLock.lock();
try {
EdqsSyncState syncState = getSyncState();
@ -209,15 +217,15 @@ public class DefaultEdqsService implements EdqsService {
return;
}
}
saveSyncState(EdqsSyncStatus.STARTED);
saveSyncState(EdqsSyncStatus.STARTED, syncRequest.getObjectTypes());
edqsSyncService.sync();
edqsSyncService.sync(syncRequest);
saveSyncState(EdqsSyncStatus.FINISHED);
broadcastSyncStatusUpdate(EdqsSyncStatus.FINISHED);
} catch (Exception e) {
log.error("Failed to complete sync", e);
saveSyncState(EdqsSyncStatus.FAILED);
saveSyncState(EdqsSyncStatus.FAILED, syncRequest.getObjectTypes());
broadcastSyncStatusUpdate(EdqsSyncStatus.FAILED);
} finally {
syncLock.unlock();
@ -369,7 +377,12 @@ public class DefaultEdqsService implements EdqsService {
@SneakyThrows
private void saveSyncState(EdqsSyncStatus status) {
EdqsSyncState state = new EdqsSyncState(status);
saveSyncState(status, null);
}
@SneakyThrows
private void saveSyncState(EdqsSyncStatus status, Set<ObjectType> objectTypes) {
EdqsSyncState state = new EdqsSyncState(status, objectTypes);
log.info("New EDQS sync state: {}", state);
attributesService.save(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, AttributeScope.SERVER_SCOPE, new BaseAttributeKvEntry(
new JsonDataEntry("edqsSyncState", JacksonUtil.toString(state)),
@ -383,11 +396,4 @@ public class DefaultEdqsService implements EdqsService {
eventsProducer.stop();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
private static class EdqsSyncState {
private EdqsSyncStatus status;
}
}

70
application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java

@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.ObjectType;
import org.thingsboard.server.common.data.edqs.AttributeKv;
import org.thingsboard.server.common.data.edqs.EdqsEventType;
import org.thingsboard.server.common.data.edqs.EdqsObject;
import org.thingsboard.server.common.data.edqs.EdqsSyncRequest;
import org.thingsboard.server.common.data.edqs.Entity;
import org.thingsboard.server.common.data.edqs.LatestTsKv;
import org.thingsboard.server.common.data.edqs.fields.EntityFields;
@ -83,21 +84,38 @@ public abstract class EdqsSyncService {
public abstract boolean isSyncNeeded();
public void sync() {
public void sync(EdqsSyncRequest syncRequest) {
log.info("Synchronizing data to EDQS");
long startTs = System.currentTimeMillis();
counters.clear();
syncTenantEntities();
syncRelations();
loadKeyDictionary();
syncAttributes();
syncLatestTimeseries();
if (syncRequest.getObjectTypes() != null && !syncRequest.getObjectTypes().isEmpty()) {
log.info("Sync request for entity types: {}", syncRequest.getObjectTypes());
for (ObjectType objectType : syncRequest.getObjectTypes()) {
syncObjectType(objectType);
}
} else {
log.info("Sync request for all entity types");
syncTenantEntities();
syncRelations();
loadKeyDictionary();
syncAttributes();
syncLatestTimeseries();
}
counters.clear();
log.info("Finished synchronizing data to EDQS in {} ms", (System.currentTimeMillis() - startTs));
}
private void syncObjectType(ObjectType objectType) {
switch (objectType) {
case RELATION -> syncRelations();
case ATTRIBUTE_KV -> syncAttributes();
case LATEST_TS_KV -> syncLatestTimeseries();
default -> syncTenantEntity(objectType);
}
}
private void process(TenantId tenantId, ObjectType type, EdqsObject object) {
AtomicInteger counter = counters.computeIfAbsent(type, t -> new AtomicInteger());
if (counter.incrementAndGet() % 10000 == 0) {
@ -108,26 +126,30 @@ public abstract class EdqsSyncService {
private void syncTenantEntities() {
for (ObjectType type : edqsTenantTypes) {
log.info("Synchronizing {} entities to EDQS", type);
long ts = System.currentTimeMillis();
EntityType entityType = type.toEntityType();
Dao<?> dao = entityDaoRegistry.getDao(entityType);
UUID lastId = UUID.fromString("00000000-0000-0000-0000-000000000000");
while (true) {
var batch = dao.findNextBatch(lastId, entityBatchSize);
if (batch.isEmpty()) {
break;
}
for (EntityFields entityFields : batch) {
TenantId tenantId = TenantId.fromUUID(entityFields.getTenantId());
entityInfoMap.put(entityFields.getId(), new EntityIdInfo(entityType, tenantId));
process(tenantId, type, new Entity(entityType, entityFields));
}
EntityFields lastRecord = batch.get(batch.size() - 1);
lastId = lastRecord.getId();
syncTenantEntity(type);
}
}
private void syncTenantEntity(ObjectType type) {
log.info("Synchronizing {} entities to EDQS", type);
long ts = System.currentTimeMillis();
EntityType entityType = type.toEntityType();
Dao<?> dao = entityDaoRegistry.getDao(entityType);
UUID lastId = UUID.fromString("00000000-0000-0000-0000-000000000000");
while (true) {
var batch = dao.findNextBatch(lastId, entityBatchSize);
if (batch.isEmpty()) {
break;
}
for (EntityFields entityFields : batch) {
TenantId tenantId = TenantId.fromUUID(entityFields.getTenantId());
entityInfoMap.put(entityFields.getId(), new EntityIdInfo(entityType, tenantId));
process(tenantId, type, new Entity(entityType, entityFields));
}
log.info("Finished synchronizing {} entities to EDQS in {} ms", type, (System.currentTimeMillis() - ts));
EntityFields lastRecord = batch.get(batch.size() - 1);
lastId = lastRecord.getId();
}
log.info("Finished synchronizing {} entities to EDQS in {} ms", type, (System.currentTimeMillis() - ts));
}
private void syncRelations() {

32
application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncState.java

@ -0,0 +1,32 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edqs;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.thingsboard.server.common.data.ObjectType;
import org.thingsboard.server.common.data.edqs.EdqsState.EdqsSyncStatus;
import java.util.Set;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class EdqsSyncState {
private EdqsSyncStatus status;
private Set<ObjectType> objectTypes;
}

8
common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsSyncRequest.java

@ -16,9 +16,17 @@
package org.thingsboard.server.common.data.edqs;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.thingsboard.server.common.data.ObjectType;
import java.util.Set;
@Data
@AllArgsConstructor
@NoArgsConstructor
@JsonIgnoreProperties
public class EdqsSyncRequest {
Set<ObjectType> objectTypes;
}

Loading…
Cancel
Save