From d6b516e708d449b0c82b381502f8a6bc087d2e9c Mon Sep 17 00:00:00 2001 From: Viacheslav Klimov Date: Wed, 22 Jun 2022 18:06:50 +0300 Subject: [PATCH 01/12] VC queue message chunking --- .../impl/BaseEntityImportService.java | 1 + .../DefaultEntitiesVersionControlService.java | 18 +-- .../DefaultGitVersionControlQueueService.java | 117 ++++++++++++++---- .../src/main/resources/thingsboard.yml | 1 + common/cluster-api/src/main/proto/queue.proto | 16 ++- .../server/common/data/StringUtils.java | 6 + .../common/data/sync/vc/EntityDataDiff.java | 1 - .../data/sync/vc/VersionedEntityInfo.java | 1 - .../common/util/CollectionsUtil.java | 8 ++ .../DefaultClusterVersionControlService.java | 60 +++++++-- .../server/service/sync/vc/PendingCommit.java | 13 +- .../src/main/resources/tb-vc-executor.yml | 1 + ui-ngx/src/app/shared/models/vc.models.ts | 1 - 13 files changed, 195 insertions(+), 49 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java index 26c5a869f9..285401e3bd 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java @@ -160,6 +160,7 @@ public abstract class BaseEntityImportService importResult; try { importResult = exportImportService.importEntity(ctx, entityData); @@ -391,6 +392,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont ctx.getImportedEntities().computeIfAbsent(entityType, t -> new HashSet<>()) .add(importResult.getSavedEntity().getId()); } + log.debug("Imported {} pack for tenant {}", entityType, ctx.getTenantId()); offset += limit; } while (entityDataList.size() == limit); } @@ -456,18 +458,20 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont EntityId externalId = ((ExportableEntity) entity).getExternalId(); if (externalId == null) externalId = entityId; - return transformAsync(gitServiceQueue.getEntity(user.getTenantId(), versionId, externalId), + return transform(gitServiceQueue.getEntity(user.getTenantId(), versionId, externalId), otherVersion -> { SimpleEntitiesExportCtx ctx = new SimpleEntitiesExportCtx(user, null, null, EntityExportSettings.builder() .exportRelations(otherVersion.hasRelations()) .exportAttributes(otherVersion.hasAttributes()) .exportCredentials(otherVersion.hasCredentials()) .build()); - EntityExportData currentVersion = exportImportService.exportEntity(ctx, entityId); - return transform(gitServiceQueue.getContentsDiff(user.getTenantId(), - JacksonUtil.toPrettyString(currentVersion.sort()), - JacksonUtil.toPrettyString(otherVersion.sort())), - rawDiff -> new EntityDataDiff(currentVersion, otherVersion, rawDiff), MoreExecutors.directExecutor()); + EntityExportData currentVersion; + try { + currentVersion = exportImportService.exportEntity(ctx, entityId); + } catch (ThingsboardException e) { + throw new RuntimeException(e); + } + return new EntityDataDiff(currentVersion.sort(), otherVersion.sort()); }, MoreExecutors.directExecutor()); } diff --git a/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java b/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java index 99246d282f..a1c79a2aac 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java @@ -15,7 +15,10 @@ */ package org.thingsboard.server.service.sync.vc; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.ByteString; import lombok.SneakyThrows; @@ -23,6 +26,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.CollectionsUtil; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.EntityType; @@ -70,12 +74,15 @@ import org.thingsboard.server.service.sync.vc.data.VersionsDiffGitRequest; import org.thingsboard.server.service.sync.vc.data.VoidGitRequest; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -92,9 +99,12 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu private final SchedulerComponent scheduler; private final Map> pendingRequestMap = new HashMap<>(); + private final Map> chunkedMsgs = new ConcurrentHashMap<>(); @Value("${queue.vc.request-timeout:60000}") private int requestTimeout; + @Value("${queue.vc.msg-chunk-size:500000}") + private int msgChunkSize; public DefaultGitVersionControlQueueService(TbServiceInfoProvider serviceInfoProvider, TbClusterService clusterService, DataDecodingEncodingService encodingService, @@ -114,24 +124,39 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu CommitGitRequest commit = new CommitGitRequest(user.getTenantId(), request); registerAndSend(commit, builder -> builder.setCommitRequest( buildCommitRequest(commit).setPrepareMsg(getCommitPrepareMsg(user, request)).build() - ).build(), wrap(future, commit)); + ).build(), wrap(future, commit, commit.getRequestId())); return future; } + @SuppressWarnings("UnstableApiUsage") @Override public ListenableFuture addToCommit(CommitGitRequest commit, EntityExportData> entityData) { - SettableFuture future = SettableFuture.create(); - String path = getRelativePath(entityData.getEntityType(), entityData.getExternalId()); String entityDataJson = JacksonUtil.toPrettyString(entityData.sort()); - registerAndSend(commit, builder -> builder.setCommitRequest( - buildCommitRequest(commit).setAddMsg( - TransportProtos.AddMsg.newBuilder() - .setRelativePath(path).setEntityDataJson(entityDataJson).build() - ).build() - ).build(), wrap(future, null)); - return future; + Iterable entityDataChunks = StringUtils.split(entityDataJson, msgChunkSize); + String chunkedMsgId = UUID.randomUUID().toString(); + int chunksCount = Iterables.size(entityDataChunks); + + AtomicInteger chunkIndex = new AtomicInteger(); + List> futures = new ArrayList<>(); + entityDataChunks.forEach(chunk -> { + SettableFuture chunkFuture = SettableFuture.create(); + log.trace("[{}] sending chunk {} for 'addToCommit'", chunkedMsgId, chunkIndex.get()); + registerAndSend(commit, builder -> builder.setCommitRequest( + buildCommitRequest(commit).setAddMsg( + TransportProtos.AddMsg.newBuilder() + .setRelativePath(path).setEntityDataJsonChunk(chunk) + .setChunkedMsgId(chunkedMsgId).setChunkIndex(chunkIndex.getAndIncrement()) + .setChunksCount(chunksCount).build() + ).build() + ).build(), wrap(chunkFuture, null, commit.getRequestId())); + futures.add(chunkFuture); + }); + return Futures.transform(Futures.allAsList(futures), r -> { + log.trace("[{}] sent all chunks for 'addToCommit'", chunkedMsgId); + return null; + }, MoreExecutors.directExecutor()); } @Override @@ -144,7 +169,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu buildCommitRequest(commit).setDeleteMsg( TransportProtos.DeleteMsg.newBuilder().setRelativePath(path).build() ).build() - ).build(), wrap(future, null)); + ).build(), wrap(future, null, commit.getRequestId())); return future; } @@ -220,7 +245,6 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu @Override public ListenableFuture> listEntitiesAtVersion(TenantId tenantId, String branch, String versionId, EntityType entityType) { return listEntitiesAtVersion(tenantId, ListEntitiesRequestMsg.newBuilder() - .setBranchName(branch) .setVersionId(versionId) .setEntityType(entityType.name()) .build()); @@ -229,7 +253,6 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu @Override public ListenableFuture> listEntitiesAtVersion(TenantId tenantId, String branch, String versionId) { return listEntitiesAtVersion(tenantId, ListEntitiesRequestMsg.newBuilder() - .setBranchName(branch) .setVersionId(versionId) .build()); } @@ -289,9 +312,11 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu var requestBody = enrichFunction.apply(newRequestProto(request, settings)); log.trace("[{}][{}] PUSHING request: {}", request.getTenantId(), request.getRequestId(), requestBody); clusterService.pushMsgToVersionControl(request.getTenantId(), requestBody, callback); - request.setTimeoutTask(scheduler.schedule(() -> { - processTimeout(request.getRequestId()); - }, requestTimeout, TimeUnit.MILLISECONDS)); + if (request.getTimeoutTask() == null) { + request.setTimeoutTask(scheduler.schedule(() -> { + processTimeout(request.getRequestId()); + }, requestTimeout, TimeUnit.MILLISECONDS)); + } } else { throw new RuntimeException("Future is already done!"); } @@ -355,15 +380,15 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu @Override public void processResponse(VersionControlResponseMsg vcResponseMsg) { UUID requestId = new UUID(vcResponseMsg.getRequestIdMSB(), vcResponseMsg.getRequestIdLSB()); - PendingGitRequest request = pendingRequestMap.remove(requestId); + PendingGitRequest request = pendingRequestMap.get(requestId); if (request == null) { log.debug("[{}] received stale response: {}", requestId, vcResponseMsg); return; } else { log.debug("[{}] processing response: {}", requestId, vcResponseMsg); - request.getTimeoutTask().cancel(true); } var future = request.getFuture(); + boolean completed = true; if (!StringUtils.isEmpty(vcResponseMsg.getError())) { future.setException(new RuntimeException(vcResponseMsg.getError())); } else { @@ -391,12 +416,39 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu var listVersionsResponse = vcResponseMsg.getListVersionsResponse(); ((ListVersionsGitRequest) request).getFuture().set(toPageData(listVersionsResponse)); } else if (vcResponseMsg.hasEntityContentResponse()) { - var data = vcResponseMsg.getEntityContentResponse().getData(); - ((EntityContentGitRequest) request).getFuture().set(toData(data)); + TransportProtos.EntityContentResponseMsg responseMsg = vcResponseMsg.getEntityContentResponse(); + String[] msgChunks = chunkedMsgs.computeIfAbsent(requestId, id -> new HashMap<>()) + .computeIfAbsent(responseMsg.getChunkedMsgId(), id -> new String[responseMsg.getChunksCount()]); + msgChunks[responseMsg.getChunkIndex()] = responseMsg.getData(); + log.trace("[{}] received chunk {} for 'getEntity'", responseMsg.getChunkedMsgId(), responseMsg.getChunkIndex()); + if (CollectionsUtil.countNonNull(msgChunks) == responseMsg.getChunksCount()) { + log.trace("[{}] collected all chunks for 'getEntity'", responseMsg.getChunkedMsgId()); + String data = String.join("", msgChunks); + ((EntityContentGitRequest) request).getFuture().set(toData(data)); + } else { + completed = false; + } } else if (vcResponseMsg.hasEntitiesContentResponse()) { - var dataList = vcResponseMsg.getEntitiesContentResponse().getDataList(); - ((EntitiesContentGitRequest) request).getFuture() - .set(dataList.stream().map(this::toData).collect(Collectors.toList())); + TransportProtos.EntitiesContentResponseMsg responseMsg = vcResponseMsg.getEntitiesContentResponse(); + TransportProtos.EntityContentResponseMsg item = responseMsg.getItem(); + if (responseMsg.getItemsCount() > 0) { + Map chunkedItems = chunkedMsgs.computeIfAbsent(requestId, id -> new HashMap<>()); + String[] itemChunks = chunkedItems.computeIfAbsent(item.getChunkedMsgId(), id -> { + return new String[item.getChunksCount()]; + }); + itemChunks[item.getChunkIndex()] = item.getData(); + if (chunkedItems.size() == responseMsg.getItemsCount() && chunkedItems.values().stream() + .allMatch(chunks -> CollectionsUtil.countNonNull(chunks) == chunks.length)) { + ((EntitiesContentGitRequest) request).getFuture().set(chunkedItems.values().stream() + .map(chunks -> String.join("", chunks)) + .map(this::toData) + .collect(Collectors.toList())); + } else { + completed = false; + } + } else { + ((EntitiesContentGitRequest) request).getFuture().set(Collections.emptyList()); + } } else if (vcResponseMsg.hasVersionsDiffResponse()) { TransportProtos.VersionsDiffResponseMsg diffResponse = vcResponseMsg.getVersionsDiffResponse(); List entityVersionsDiffList = diffResponse.getDiffList().stream() @@ -416,16 +468,29 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu ((ContentsDiffGitRequest) request).getFuture().set(diff); } } + if (completed) { + removePendingRequest(requestId); + } } private void processTimeout(UUID requestId) { - PendingGitRequest pendingRequest = pendingRequestMap.remove(requestId); + PendingGitRequest pendingRequest = removePendingRequest(requestId); if (pendingRequest != null) { log.debug("[{}] request timed out ({} ms}", requestId, requestTimeout); pendingRequest.getFuture().setException(new TimeoutException("Request timed out")); } } + private PendingGitRequest removePendingRequest(UUID requestId) { + PendingGitRequest pendingRequest = pendingRequestMap.remove(requestId); + if (pendingRequest != null && pendingRequest.getTimeoutTask() != null) { + pendingRequest.getTimeoutTask().cancel(true); + pendingRequest.setTimeoutTask(null); + } + chunkedMsgs.remove(requestId); + return pendingRequest; + } + private PageData toPageData(TransportProtos.ListVersionsResponseMsg listVersionsResponse) { var listVersions = listVersionsResponse.getVersionsList().stream().map(this::getEntityVersion).collect(Collectors.toList()); return new PageData<>(listVersions, listVersionsResponse.getTotalPages(), listVersionsResponse.getTotalElements(), listVersionsResponse.getHasNext()); @@ -458,16 +523,18 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu }; } - private static TbQueueCallback wrap(SettableFuture future, T value) { + private TbQueueCallback wrap(SettableFuture future, T value, UUID requestId) { return new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { future.set(value); + removePendingRequest(requestId); } @Override public void onFailure(Throwable t) { future.setException(t); + removePendingRequest(requestId); } }; } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 07233bb241..645daa58e7 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1024,6 +1024,7 @@ queue: poll-interval: "${TB_QUEUE_VC_INTERVAL_MS:25}" pack-processing-timeout: "${TB_QUEUE_VC_PACK_PROCESSING_TIMEOUT_MS:60000}" request-timeout: "${TB_QUEUE_VC_REQUEST_TIMEOUT:60000}" + msg-chunk-size: "${TB_QUEUE_VC_MSG_CHUNK_SIZE:500000}" js: # JS Eval request topic request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js_eval.requests}" diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index 3079978bce..6ab364012d 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -707,7 +707,10 @@ message PrepareMsg { message AddMsg { string relativePath = 1; - string entityDataJson = 2; + string entityDataJsonChunk = 2; + string chunkedMsgId = 3; + int32 chunkIndex = 4; + int32 chunksCount = 5; } message DeleteMsg { @@ -747,9 +750,8 @@ message ListVersionsResponseMsg { } message ListEntitiesRequestMsg { - string branchName = 1; - string versionId = 2; - string entityType = 3; + string versionId = 1; + string entityType = 2; } message VersionedEntityInfoProto { @@ -778,6 +780,9 @@ message EntityContentRequestMsg { message EntityContentResponseMsg { string data = 1; + string chunkedMsgId = 2; + int32 chunkIndex = 3; + int32 chunksCount = 4; } message EntitiesContentRequestMsg { @@ -788,7 +793,8 @@ message EntitiesContentRequestMsg { } message EntitiesContentResponseMsg { - repeated string data = 1; + EntityContentResponseMsg item = 1; + int32 itemsCount = 2; } message VersionsDiffRequestMsg { diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/StringUtils.java b/common/data/src/main/java/org/thingsboard/server/common/data/StringUtils.java index ff3d3c0fd4..f76757f522 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/StringUtils.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/StringUtils.java @@ -15,6 +15,8 @@ */ package org.thingsboard.server.common.data; +import com.google.common.base.Splitter; + import static org.apache.commons.lang3.StringUtils.repeat; public class StringUtils { @@ -92,4 +94,8 @@ public class StringUtils { return input.substring(0, startIndexInclusive) + obfuscatedPart + input.substring(endIndexExclusive); } + public static Iterable split(String value, int maxPartSize) { + return Splitter.fixedLength(maxPartSize).split(value); + } + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/EntityDataDiff.java b/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/EntityDataDiff.java index 800583b0b8..dae2b99106 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/EntityDataDiff.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/EntityDataDiff.java @@ -24,5 +24,4 @@ import org.thingsboard.server.common.data.sync.ie.EntityExportData; public class EntityDataDiff { private EntityExportData currentVersion; private EntityExportData otherVersion; - private String rawDiff; } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/VersionedEntityInfo.java b/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/VersionedEntityInfo.java index fd278cde1f..ce8c91f4ae 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/VersionedEntityInfo.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/VersionedEntityInfo.java @@ -23,7 +23,6 @@ import org.thingsboard.server.common.data.id.EntityId; @NoArgsConstructor public class VersionedEntityInfo { private EntityId externalId; - // etc.. public VersionedEntityInfo(EntityId externalId) { this.externalId = externalId; diff --git a/common/util/src/main/java/org/thingsboard/common/util/CollectionsUtil.java b/common/util/src/main/java/org/thingsboard/common/util/CollectionsUtil.java index 7e39f9a273..adcffb8d21 100644 --- a/common/util/src/main/java/org/thingsboard/common/util/CollectionsUtil.java +++ b/common/util/src/main/java/org/thingsboard/common/util/CollectionsUtil.java @@ -39,4 +39,12 @@ public class CollectionsUtil { return isNotEmpty(collection) && collection.contains(element); } + public static int countNonNull(T[] array) { + int count = 0; + for (T t : array) { + if (t != null) count++; + } + return count; + } + } diff --git a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java index b28618372e..20188d8145 100644 --- a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java +++ b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.sync.vc; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -28,6 +29,7 @@ import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.event.EventListener; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.CollectionsUtil; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.StringUtils; @@ -88,6 +90,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; @@ -122,6 +125,8 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe private long packProcessingTimeout; @Value("${vc.git.io_pool_size:3}") private int ioPoolSize; + @Value("${queue.vc.msg-chunk-size:500000}") + private int msgChunkSize; //We need to manually manage the threads since tasks for particular tenant need to be processed sequentially. private final List ioThreads = new ArrayList<>(); @@ -269,19 +274,49 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe String path = getRelativePath(entityType, null); var ids = vcService.listEntitiesAtVersion(ctx.getTenantId(), request.getVersionId(), path) .stream().skip(request.getOffset()).limit(request.getLimit()).collect(Collectors.toList()); - var response = EntitiesContentResponseMsg.newBuilder(); - for (VersionedEntityInfo info : ids) { - var data = vcService.getFileContentAtCommit(ctx.getTenantId(), - getRelativePath(info.getExternalId().getEntityType(), info.getExternalId().getId().toString()), request.getVersionId()); - response.addData(data); + if (!ids.isEmpty()) { + for (VersionedEntityInfo info : ids) { + var data = vcService.getFileContentAtCommit(ctx.getTenantId(), + getRelativePath(info.getExternalId().getEntityType(), info.getExternalId().getId().toString()), request.getVersionId()); + + Iterable dataChunks = StringUtils.split(data, msgChunkSize); + String chunkedMsgId = UUID.randomUUID().toString(); + int chunksCount = Iterables.size(dataChunks); + AtomicInteger chunkIndex = new AtomicInteger(); + dataChunks.forEach(chunk -> { + EntitiesContentResponseMsg.Builder response = EntitiesContentResponseMsg.newBuilder() + .setItemsCount(ids.size()) + .setItem(EntityContentResponseMsg.newBuilder() + .setData(chunk) + .setChunkedMsgId(chunkedMsgId) + .setChunksCount(chunksCount) + .setChunkIndex(chunkIndex.getAndIncrement()) + .build()); + reply(ctx, Optional.empty(), builder -> builder.setEntitiesContentResponse(response)); + }); + } + } else { + reply(ctx, Optional.empty(), builder -> builder.setEntitiesContentResponse( + EntitiesContentResponseMsg.newBuilder() + .setItemsCount(0))); } - reply(ctx, Optional.empty(), builder -> builder.setEntitiesContentResponse(response)); } private void handleEntityContentRequest(VersionControlRequestCtx ctx, EntityContentRequestMsg request) throws IOException { String path = getRelativePath(EntityType.valueOf(request.getEntityType()), new UUID(request.getEntityIdMSB(), request.getEntityIdLSB()).toString()); String data = vcService.getFileContentAtCommit(ctx.getTenantId(), path, request.getVersionId()); - reply(ctx, Optional.empty(), builder -> builder.setEntityContentResponse(EntityContentResponseMsg.newBuilder().setData(data))); + + Iterable dataChunks = StringUtils.split(data, msgChunkSize); + String chunkedMsgId = UUID.randomUUID().toString(); + int chunksCount = Iterables.size(dataChunks); + + AtomicInteger chunkIndex = new AtomicInteger(); + dataChunks.forEach(chunk -> { + log.trace("[{}] sending chunk {} for 'getEntity'", chunkedMsgId, chunkIndex.get()); + reply(ctx, Optional.empty(), builder -> builder.setEntityContentResponse(EntityContentResponseMsg.newBuilder() + .setData(chunk).setChunkedMsgId(chunkedMsgId).setChunksCount(chunksCount) + .setChunkIndex(chunkIndex.getAndIncrement()))); + }); } private void handleListVersions(VersionControlRequestCtx ctx, ListVersionsRequestMsg request) throws Exception { @@ -412,7 +447,16 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe } private void addToCommit(VersionControlRequestCtx ctx, PendingCommit commit, AddMsg addMsg) throws IOException { - vcService.add(commit, addMsg.getRelativePath(), addMsg.getEntityDataJson()); + log.trace("[{}] received chunk {} for 'addToCommit'", addMsg.getChunkedMsgId(), addMsg.getChunkIndex()); + Map chunkedMsgs = commit.getChunkedMsgs(); + String[] msgChunks = chunkedMsgs.computeIfAbsent(addMsg.getChunkedMsgId(), id -> new String[addMsg.getChunksCount()]); + msgChunks[addMsg.getChunkIndex()] = addMsg.getEntityDataJsonChunk(); + if (CollectionsUtil.countNonNull(msgChunks) == msgChunks.length) { + log.trace("[{}] collected all chunks for 'addToCommit'", addMsg.getChunkedMsgId()); + String entityDataJson = String.join("", msgChunks); + chunkedMsgs.remove(addMsg.getChunkedMsgId()); + vcService.add(commit, addMsg.getRelativePath(), entityDataJson); + } } private void doAbortCurrentCommit(TenantId tenantId, PendingCommit current) { diff --git a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/PendingCommit.java b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/PendingCommit.java index ccd5fc685e..20b1991c9f 100644 --- a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/PendingCommit.java +++ b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/PendingCommit.java @@ -18,7 +18,9 @@ package org.thingsboard.server.service.sync.vc; import lombok.Data; import org.thingsboard.server.common.data.id.TenantId; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; @Data public class PendingCommit { @@ -31,9 +33,10 @@ public class PendingCommit { private String versionName; private String authorName; - private String authorEmail; + private Map chunkedMsgs; + public PendingCommit(TenantId tenantId, String nodeId, UUID txId, String branch, String versionName, String authorName, String authorEmail) { this.tenantId = tenantId; this.nodeId = nodeId; @@ -44,4 +47,12 @@ public class PendingCommit { this.authorEmail = authorEmail; this.workingBranch = txId.toString(); } + + public Map getChunkedMsgs() { + if (chunkedMsgs == null) { + chunkedMsgs = new ConcurrentHashMap<>(); + } + return chunkedMsgs; + } + } diff --git a/msa/vc-executor/src/main/resources/tb-vc-executor.yml b/msa/vc-executor/src/main/resources/tb-vc-executor.yml index a97c4cfd37..1344ea194a 100644 --- a/msa/vc-executor/src/main/resources/tb-vc-executor.yml +++ b/msa/vc-executor/src/main/resources/tb-vc-executor.yml @@ -168,6 +168,7 @@ queue: partitions: "${TB_QUEUE_VC_PARTITIONS:10}" poll-interval: "${TB_QUEUE_VC_INTERVAL_MS:25}" pack-processing-timeout: "${TB_QUEUE_VC_PACK_PROCESSING_TIMEOUT_MS:60000}" + msg-chunk-size: "${TB_QUEUE_VC_MSG_CHUNK_SIZE:500000}" vc: # Pool size for handling export tasks diff --git a/ui-ngx/src/app/shared/models/vc.models.ts b/ui-ngx/src/app/shared/models/vc.models.ts index 98f66f74a9..aa41efba63 100644 --- a/ui-ngx/src/app/shared/models/vc.models.ts +++ b/ui-ngx/src/app/shared/models/vc.models.ts @@ -228,7 +228,6 @@ export interface RuleChainExportData extends EntityExportData { export interface EntityDataDiff { currentVersion: EntityExportData; otherVersion: EntityExportData; - rawDiff: string; } export function entityExportDataToJsonString(data: EntityExportData): string { From 7b8d39f66190a8abe91423935f164e846ed90826 Mon Sep 17 00:00:00 2001 From: Viacheslav Klimov Date: Thu, 23 Jun 2022 12:42:03 +0300 Subject: [PATCH 02/12] Undo e.setExternalId(null) in cleanupForComparison --- .../service/sync/ie/importing/impl/BaseEntityImportService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java index 285401e3bd..26c5a869f9 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java @@ -160,7 +160,6 @@ public abstract class BaseEntityImportService Date: Thu, 23 Jun 2022 16:24:09 +0300 Subject: [PATCH 03/12] Fix rule chain comparing on import --- .../service/sync/ie/exporting/impl/BaseEntityExportService.java | 2 -- .../service/sync/ie/importing/impl/RuleChainImportService.java | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/exporting/impl/BaseEntityExportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/exporting/impl/BaseEntityExportService.java index 169b7f273c..01626368dc 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/exporting/impl/BaseEntityExportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/exporting/impl/BaseEntityExportService.java @@ -41,8 +41,6 @@ public abstract class BaseEntityExportService(); } - ; - public abstract Set getSupportedEntityTypes(); protected void replaceUuidsRecursively(EntitiesExportCtx ctx, JsonNode node, Set skipFieldsSet) { diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/RuleChainImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/RuleChainImportService.java index dadea35e68..c66f4a6b08 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/RuleChainImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/RuleChainImportService.java @@ -119,7 +119,7 @@ public class RuleChainImportService extends BaseEntityImportService Date: Thu, 23 Jun 2022 17:00:55 +0300 Subject: [PATCH 04/12] Always update widgets on import --- .../sync/ie/importing/impl/WidgetsBundleImportService.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/WidgetsBundleImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/WidgetsBundleImportService.java index 550f00952a..f76b3cb9c5 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/WidgetsBundleImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/WidgetsBundleImportService.java @@ -86,6 +86,11 @@ public class WidgetsBundleImportService extends BaseEntityImportService Date: Thu, 23 Jun 2022 17:28:26 +0300 Subject: [PATCH 05/12] Not update defaultQueueId of device profile on import --- .../sync/ie/importing/impl/DeviceProfileImportService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/DeviceProfileImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/DeviceProfileImportService.java index 26bde001f5..1e26940be4 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/DeviceProfileImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/DeviceProfileImportService.java @@ -52,6 +52,7 @@ public class DeviceProfileImportService extends BaseEntityImportService Date: Thu, 23 Jun 2022 18:34:38 +0300 Subject: [PATCH 06/12] Fix comparing of dashboard to existing one on import --- .../sync/ie/importing/impl/DashboardImportService.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/DashboardImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/DashboardImportService.java index 2534ffc1b2..f5e550b433 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/DashboardImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/DashboardImportService.java @@ -117,6 +117,11 @@ public class DashboardImportService extends BaseEntityImportService exportData, Dashboard prepared, Dashboard existing) { + return super.compare(ctx, exportData, prepared, existing) || !prepared.getConfiguration().equals(existing.getConfiguration()); + } + @Override protected void onEntitySaved(SecurityUser user, Dashboard savedDashboard, Dashboard oldDashboard) throws ThingsboardException { super.onEntitySaved(user, savedDashboard, oldDashboard); From 60deed2a73f1596268c37f83e90c26c612c29f5d Mon Sep 17 00:00:00 2001 From: Viacheslav Klimov Date: Fri, 24 Jun 2022 13:14:51 +0300 Subject: [PATCH 07/12] Use TbNotificationEntityService in VC services --- .../entitiy/DefaultTbNotificationEntityService.java | 2 +- .../service/entitiy/TbNotificationEntityService.java | 2 +- .../entitiy/tenant/DefaultTbTenantService.java | 2 +- .../sync/ie/DefaultEntitiesExportImportService.java | 9 ++++----- .../sync/ie/importing/impl/AssetImportService.java | 11 ----------- .../ie/importing/impl/BaseEntityImportService.java | 5 ++--- .../sync/ie/importing/impl/CustomerImportService.java | 8 -------- .../ie/importing/impl/DashboardImportService.java | 8 -------- .../sync/ie/importing/impl/DeviceImportService.java | 5 +++-- .../ie/importing/impl/DeviceProfileImportService.java | 4 +--- .../ie/importing/impl/EntityViewImportService.java | 6 +++--- .../ie/importing/impl/RuleChainImportService.java | 4 +++- .../ie/importing/impl/WidgetsBundleImportService.java | 1 - 13 files changed, 19 insertions(+), 48 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java index 7ee7696f1a..76477c974b 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java @@ -128,7 +128,7 @@ public class DefaultTbNotificationEntityService implements TbNotificationEntityS } @Override - public void notifyCreateOruUpdateTenant(Tenant tenant, ComponentLifecycleEvent event) { + public void notifyCreateOrUpdateTenant(Tenant tenant, ComponentLifecycleEvent event) { tbClusterService.onTenantChange(tenant, null); tbClusterService.broadcastEntityStateChangeEvent(tenant.getId(), tenant.getId(), event); } diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/TbNotificationEntityService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/TbNotificationEntityService.java index c1131761dc..5ce91046d6 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/TbNotificationEntityService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/TbNotificationEntityService.java @@ -71,7 +71,7 @@ public interface TbNotificationEntityService { E entity, ActionType actionType, SecurityUser user, Object... additionalInfo); - void notifyCreateOruUpdateTenant(Tenant tenant, ComponentLifecycleEvent event); + void notifyCreateOrUpdateTenant(Tenant tenant, ComponentLifecycleEvent event); void notifyDeleteTenant(Tenant tenant); diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/tenant/DefaultTbTenantService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/tenant/DefaultTbTenantService.java index 22baed8366..d3c12cb3f8 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/tenant/DefaultTbTenantService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/tenant/DefaultTbTenantService.java @@ -54,7 +54,7 @@ public class DefaultTbTenantService extends AbstractTbEntityService implements T installScripts.createDefaultEdgeRuleChains(savedTenant.getId()); } tenantProfileCache.evict(savedTenant.getId()); - notificationEntityService.notifyCreateOruUpdateTenant(savedTenant, created ? + notificationEntityService.notifyCreateOrUpdateTenant(savedTenant, created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED); TenantProfile oldTenantProfile = oldTenant != null ? tenantProfileService.findTenantProfileById(TenantId.SYS_TENANT_ID, oldTenant.getTenantProfileId()) : null; diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/DefaultEntitiesExportImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/DefaultEntitiesExportImportService.java index b2abe2a581..2b96ebd4ed 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/DefaultEntitiesExportImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/DefaultEntitiesExportImportService.java @@ -34,6 +34,7 @@ import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.action.EntityActionService; import org.thingsboard.server.service.apiusage.RateLimitService; +import org.thingsboard.server.service.entitiy.TbNotificationEntityService; import org.thingsboard.server.service.sync.ie.exporting.EntityExportService; import org.thingsboard.server.service.sync.ie.exporting.impl.BaseEntityExportService; import org.thingsboard.server.service.sync.ie.exporting.impl.DefaultEntityExportService; @@ -57,9 +58,9 @@ public class DefaultEntitiesExportImportService implements EntitiesExportImportS private final Map> exportServices = new HashMap<>(); private final Map> importServices = new HashMap<>(); - private final EntityActionService entityActionService; private final RelationService relationService; private final RateLimitService rateLimitService; + private final TbNotificationEntityService entityNotificationService; protected static final List SUPPORTED_ENTITY_TYPES = List.of( EntityType.CUSTOMER, EntityType.ASSET, EntityType.RULE_CHAIN, @@ -109,10 +110,8 @@ public class DefaultEntitiesExportImportService implements EntitiesExportImportS relationService.saveRelations(ctx.getTenantId(), new ArrayList<>(ctx.getRelations())); for (EntityRelation relation : ctx.getRelations()) { - entityActionService.logEntityAction(ctx.getUser(), relation.getFrom(), null, null, - ActionType.RELATION_ADD_OR_UPDATE, null, relation); - entityActionService.logEntityAction(ctx.getUser(), relation.getTo(), null, null, - ActionType.RELATION_ADD_OR_UPDATE, null, relation); + entityNotificationService.notifyCreateOrUpdateOrDeleteRelation(ctx.getTenantId(), null, + relation, ctx.getUser(), ActionType.RELATION_ADD_OR_UPDATE, null, relation); } } diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/AssetImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/AssetImportService.java index f2e61b8f2d..3225668004 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/AssetImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/AssetImportService.java @@ -19,14 +19,11 @@ import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.asset.Asset; -import org.thingsboard.server.common.data.edge.EdgeEventActionType; -import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.sync.ie.EntityExportData; import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.queue.util.TbCoreComponent; -import org.thingsboard.server.service.security.model.SecurityUser; import org.thingsboard.server.service.sync.vc.data.EntitiesImportCtx; @Service @@ -52,14 +49,6 @@ public class AssetImportService extends BaseEntityImportService Date: Fri, 24 Jun 2022 13:57:06 +0300 Subject: [PATCH 08/12] Fix event for deleted relations on import --- .../sync/ie/importing/impl/BaseEntityImportService.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java index 7caea242f8..d491571272 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java @@ -212,10 +212,8 @@ public abstract class BaseEntityImportService { - entityActionService.logEntityAction(ctx.getUser(), existingRelation.getFrom(), null, null, - ActionType.RELATION_DELETED, null, existingRelation); - entityActionService.logEntityAction(ctx.getUser(), existingRelation.getTo(), null, null, - ActionType.RELATION_DELETED, null, existingRelation); + entityNotificationService.notifyCreateOrUpdateOrDeleteRelation(tenantId, null, + existingRelation, ctx.getUser(), ActionType.RELATION_DELETED, null, existingRelation); }); } else if (Objects.equal(relation.getAdditionalInfo(), existingRelation.getAdditionalInfo())) { relationsMap.remove(relation); From 8dc799709cf536ad9a3a2054bb13ec72a17ca581 Mon Sep 17 00:00:00 2001 From: Viacheslav Klimov Date: Fri, 24 Jun 2022 17:01:27 +0300 Subject: [PATCH 09/12] Fix entity relations import --- .../exporting/impl/DefaultEntityExportService.java | 8 ++++---- .../ie/importing/impl/BaseEntityImportService.java | 13 +++++++------ 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/exporting/impl/DefaultEntityExportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/exporting/impl/DefaultEntityExportService.java index 7e236370df..a00a7c09aa 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/exporting/impl/DefaultEntityExportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/exporting/impl/DefaultEntityExportService.java @@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.sync.ie.AttributeExportData; import org.thingsboard.server.common.data.sync.ie.EntityExportData; import org.thingsboard.server.dao.attributes.AttributesService; -import org.thingsboard.server.dao.relation.RelationService; +import org.thingsboard.server.dao.relation.RelationDao; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.sync.ie.exporting.EntityExportService; import org.thingsboard.server.service.sync.ie.exporting.ExportableEntitiesService; @@ -55,7 +55,7 @@ public class DefaultEntityExportService exportRelations(EntitiesExportCtx ctx, E entity) throws ThingsboardException { List relations = new ArrayList<>(); - List inboundRelations = relationService.findByTo(ctx.getTenantId(), entity.getId(), RelationTypeGroup.COMMON); + List inboundRelations = relationDao.findAllByTo(ctx.getTenantId(), entity.getId(), RelationTypeGroup.COMMON); relations.addAll(inboundRelations); - List outboundRelations = relationService.findByFrom(ctx.getTenantId(), entity.getId(), RelationTypeGroup.COMMON); + List outboundRelations = relationDao.findAllByFrom(ctx.getTenantId(), entity.getId(), RelationTypeGroup.COMMON); relations.addAll(outboundRelations); return relations; } diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java index d491571272..50349d0d1e 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java @@ -26,10 +26,8 @@ import org.springframework.context.annotation.Lazy; import org.springframework.transaction.annotation.Transactional; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.cluster.TbClusterService; -import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.ExportableEntity; -import org.thingsboard.server.common.data.HasCustomerId; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.EntityId; @@ -49,13 +47,13 @@ import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.sync.ie.AttributeExportData; import org.thingsboard.server.common.data.sync.ie.EntityExportData; import org.thingsboard.server.common.data.sync.ie.EntityImportResult; +import org.thingsboard.server.dao.relation.RelationDao; import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.service.action.EntityActionService; import org.thingsboard.server.service.entitiy.TbNotificationEntityService; import org.thingsboard.server.service.security.model.SecurityUser; import org.thingsboard.server.service.sync.ie.exporting.ExportableEntitiesService; import org.thingsboard.server.service.sync.ie.importing.EntityImportService; -import org.thingsboard.server.service.sync.vc.data.EntitiesExportCtx; import org.thingsboard.server.service.sync.vc.data.EntitiesImportCtx; import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; @@ -79,6 +77,8 @@ public abstract class BaseEntityImportService existingRelations = new ArrayList<>(); - existingRelations.addAll(relationService.findByTo(tenantId, entity.getId(), RelationTypeGroup.COMMON)); - existingRelations.addAll(relationService.findByFrom(tenantId, entity.getId(), RelationTypeGroup.COMMON)); + existingRelations.addAll(relationDao.findAllByTo(tenantId, entity.getId(), RelationTypeGroup.COMMON)); + existingRelations.addAll(relationDao.findAllByFrom(tenantId, entity.getId(), RelationTypeGroup.COMMON)); + // dao is used here instead of service to avoid getting cached values, because relationService.deleteRelation will evict value from cache only after transaction is committed for (EntityRelation existingRelation : existingRelations) { EntityRelation relation = relationsMap.get(existingRelation); if (relation == null) { importResult.setUpdatedRelatedEntities(true); - relationService.deleteRelation(tenantId, existingRelation); + relationService.deleteRelation(ctx.getTenantId(), existingRelation.getFrom(), existingRelation.getTo(), existingRelation.getType(), existingRelation.getTypeGroup()); importResult.addSendEventsCallback(() -> { entityNotificationService.notifyCreateOrUpdateOrDeleteRelation(tenantId, null, existingRelation, ctx.getUser(), ActionType.RELATION_DELETED, null, existingRelation); From 850aed8b82f55443c1e1998d01dff56997736536 Mon Sep 17 00:00:00 2001 From: Viacheslav Klimov Date: Fri, 24 Jun 2022 17:12:57 +0300 Subject: [PATCH 10/12] Add Swagger api params description for EntitiesVersionControlController --- .../controller/ControllerConstants.java | 1 + .../EntitiesVersionControlController.java | 27 +++++++++++++++---- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java b/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java index 6a616671eb..a425b0968a 100644 --- a/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java +++ b/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java @@ -138,6 +138,7 @@ public class ControllerConstants { protected static final String ENTITY_VERSION_TEXT_SEARCH_DESCRIPTION = "The case insensitive 'substring' filter based on the entity version name."; protected static final String VERSION_ID_PARAM_DESCRIPTION = "Version id, for example fd82625bdd7d6131cf8027b44ee967012ecaf990. Represents commit hash."; + protected static final String BRANCH_PARAM_DESCRIPTION = "The name of the working branch, for example 'master'"; protected static final String MARKDOWN_CODE_BLOCK_START = "```json\n"; protected static final String MARKDOWN_CODE_BLOCK_END = "\n```"; diff --git a/application/src/main/java/org/thingsboard/server/controller/EntitiesVersionControlController.java b/application/src/main/java/org/thingsboard/server/controller/EntitiesVersionControlController.java index dfefce4b12..2e37bcbe59 100644 --- a/application/src/main/java/org/thingsboard/server/controller/EntitiesVersionControlController.java +++ b/application/src/main/java/org/thingsboard/server/controller/EntitiesVersionControlController.java @@ -58,6 +58,9 @@ import java.util.List; import java.util.UUID; import java.util.stream.Collectors; +import static org.thingsboard.server.controller.ControllerConstants.BRANCH_PARAM_DESCRIPTION; +import static org.thingsboard.server.controller.ControllerConstants.ENTITY_ID_PARAM_DESCRIPTION; +import static org.thingsboard.server.controller.ControllerConstants.ENTITY_TYPE_PARAM_DESCRIPTION; import static org.thingsboard.server.controller.ControllerConstants.MARKDOWN_CODE_BLOCK_END; import static org.thingsboard.server.controller.ControllerConstants.MARKDOWN_CODE_BLOCK_START; import static org.thingsboard.server.controller.ControllerConstants.NEW_LINE; @@ -230,8 +233,11 @@ public class EntitiesVersionControlController extends BaseController { MARKDOWN_CODE_BLOCK_END + TENANT_AUTHORITY_PARAGRAPH) @GetMapping(value = "/version/{entityType}/{externalEntityUuid}", params = {"branch", "pageSize", "page"}) - public DeferredResult> listEntityVersions(@PathVariable EntityType entityType, + public DeferredResult> listEntityVersions(@ApiParam(value = ENTITY_TYPE_PARAM_DESCRIPTION, required = true) + @PathVariable EntityType entityType, + @ApiParam(value = "A string value representing external entity id. This is `externalId` property of an entity, or otherwise if not set - simply id of this entity.") @PathVariable UUID externalEntityUuid, + @ApiParam(value = BRANCH_PARAM_DESCRIPTION) @RequestParam String branch, @ApiParam(value = PAGE_SIZE_DESCRIPTION, required = true) @RequestParam int pageSize, @@ -256,7 +262,9 @@ public class EntitiesVersionControlController extends BaseController { "The response structure is the same as for `listEntityVersions` API method." + TENANT_AUTHORITY_PARAGRAPH) @GetMapping(value = "/version/{entityType}", params = {"branch", "pageSize", "page"}) - public DeferredResult> listEntityTypeVersions(@PathVariable EntityType entityType, + public DeferredResult> listEntityTypeVersions(@ApiParam(value = ENTITY_TYPE_PARAM_DESCRIPTION, required = true) + @PathVariable EntityType entityType, + @ApiParam(value = BRANCH_PARAM_DESCRIPTION, required = true) @RequestParam String branch, @ApiParam(value = PAGE_SIZE_DESCRIPTION, required = true) @RequestParam int pageSize, @@ -279,7 +287,8 @@ public class EntitiesVersionControlController extends BaseController { "The response format is the same as for `listEntityVersions` API method." + TENANT_AUTHORITY_PARAGRAPH) @GetMapping(value = "/version", params = {"branch", "pageSize", "page"}) - public DeferredResult> listVersions(@RequestParam String branch, + public DeferredResult> listVersions(@ApiParam(value = BRANCH_PARAM_DESCRIPTION, required = true) + @RequestParam String branch, @ApiParam(value = PAGE_SIZE_DESCRIPTION, required = true) @RequestParam int pageSize, @ApiParam(value = PAGE_NUMBER_DESCRIPTION, required = true) @@ -302,9 +311,11 @@ public class EntitiesVersionControlController extends BaseController { "Entities order will be the same as in the repository." + TENANT_AUTHORITY_PARAGRAPH) @GetMapping(value = "/entity/{entityType}/{versionId}", params = {"branch"}) - public DeferredResult> listEntitiesAtVersion(@PathVariable EntityType entityType, + public DeferredResult> listEntitiesAtVersion(@ApiParam(value = ENTITY_TYPE_PARAM_DESCRIPTION, required = true) + @PathVariable EntityType entityType, @ApiParam(value = VERSION_ID_PARAM_DESCRIPTION, required = true) @PathVariable String versionId, + @ApiParam(value = BRANCH_PARAM_DESCRIPTION, required = true) @RequestParam String branch) throws Exception { accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ); return wrapFuture(versionControlService.listEntitiesAtVersion(getTenantId(), branch, versionId, entityType)); @@ -318,6 +329,7 @@ public class EntitiesVersionControlController extends BaseController { @GetMapping(value = "/entity/{versionId}", params = {"branch"}) public DeferredResult> listAllEntitiesAtVersion(@ApiParam(value = VERSION_ID_PARAM_DESCRIPTION, required = true) @PathVariable String versionId, + @ApiParam(value = BRANCH_PARAM_DESCRIPTION, required = true) @RequestParam String branch) throws Exception { accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ); return wrapFuture(versionControlService.listAllEntitiesAtVersion(getTenantId(), branch, versionId)); @@ -332,7 +344,9 @@ public class EntitiesVersionControlController extends BaseController { @GetMapping("/info/{versionId}/{entityType}/{externalEntityUuid}") public DeferredResult getEntityDataInfo(@ApiParam(value = VERSION_ID_PARAM_DESCRIPTION, required = true) @PathVariable String versionId, + @ApiParam(value = ENTITY_TYPE_PARAM_DESCRIPTION, required = true) @PathVariable EntityType entityType, + @ApiParam(value = "A string value representing external entity id", required = true) @PathVariable UUID externalEntityUuid) throws Exception { accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ); EntityId entityId = EntityIdFactory.getByTypeAndUuid(entityType, externalEntityUuid); @@ -344,8 +358,11 @@ public class EntitiesVersionControlController extends BaseController { "Entity data structure is the same as stored in a repository. " + TENANT_AUTHORITY_PARAGRAPH) @GetMapping(value = "/diff/{entityType}/{internalEntityUuid}", params = {"branch", "versionId"}) - public DeferredResult compareEntityDataToVersion(@PathVariable EntityType entityType, + public DeferredResult compareEntityDataToVersion(@ApiParam(value = ENTITY_TYPE_PARAM_DESCRIPTION, required = true) + @PathVariable EntityType entityType, + @ApiParam(value = ENTITY_ID_PARAM_DESCRIPTION, required = true) @PathVariable UUID internalEntityUuid, + @ApiParam(value = BRANCH_PARAM_DESCRIPTION) @RequestParam String branch, @ApiParam(value = VERSION_ID_PARAM_DESCRIPTION, required = true) @RequestParam String versionId) throws Exception { From e9c9e24e2f1f036db73daacb933bf0f788ec7558 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Mon, 27 Jun 2022 10:35:50 +0300 Subject: [PATCH 11/12] Improvements to the chunks processing --- .../DefaultGitVersionControlQueueService.java | 73 +++++++++---------- .../vc/GitVersionControlQueueService.java | 2 - common/cluster-api/src/main/proto/queue.proto | 11 --- .../DefaultClusterVersionControlService.java | 8 -- 4 files changed, 35 insertions(+), 59 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java b/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java index 5a726c871f..6eaa2df74f 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java @@ -79,6 +79,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -125,7 +126,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu CommitGitRequest commit = new CommitGitRequest(user.getTenantId(), request); registerAndSend(commit, builder -> builder.setCommitRequest( buildCommitRequest(commit).setPrepareMsg(getCommitPrepareMsg(user, request)).build() - ).build(), wrap(future, commit, commit.getRequestId())); + ).build(), wrap(future, commit)); return future; } @@ -151,7 +152,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu .setChunkedMsgId(chunkedMsgId).setChunkIndex(chunkIndex.getAndIncrement()) .setChunksCount(chunksCount).build() ).build() - ).build(), wrap(chunkFuture, null, commit.getRequestId())); + ).build(), wrap(chunkFuture, null)); futures.add(chunkFuture); }); return Futures.transform(Futures.allAsList(futures), r -> { @@ -170,7 +171,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu buildCommitRequest(commit).setDeleteMsg( TransportProtos.DeleteMsg.newBuilder().setRelativePath(path).build() ).build() - ).build(), wrap(future, null, commit.getRequestId())); + ).build(), wrap(future, null)); return future; } @@ -280,18 +281,11 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu .build())); } - @Override - public ListenableFuture getContentsDiff(TenantId tenantId, String content1, String content2) { - ContentsDiffGitRequest request = new ContentsDiffGitRequest(tenantId, content1, content2); - return sendRequest(request, builder -> builder.setContentsDiffRequest(TransportProtos.ContentsDiffRequestMsg.newBuilder() - .setContent1(content1) - .setContent2(content2))); - } - @Override @SuppressWarnings("rawtypes") public ListenableFuture getEntity(TenantId tenantId, String versionId, EntityId entityId) { EntityContentGitRequest request = new EntityContentGitRequest(tenantId, versionId, entityId); + chunkedMsgs.put(request.getRequestId(), new HashMap<>()); registerAndSend(request, builder -> builder.setEntityContentRequest(EntityContentRequestMsg.newBuilder() .setVersionId(versionId) .setEntityType(entityId.getEntityType().name()) @@ -314,9 +308,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu log.trace("[{}][{}] PUSHING request: {}", request.getTenantId(), request.getRequestId(), requestBody); clusterService.pushMsgToVersionControl(request.getTenantId(), requestBody, callback); if (request.getTimeoutTask() == null) { - request.setTimeoutTask(scheduler.schedule(() -> { - processTimeout(request.getRequestId()); - }, requestTimeout, TimeUnit.MILLISECONDS)); + request.setTimeoutTask(scheduler.schedule(() -> processTimeout(request.getRequestId()), requestTimeout, TimeUnit.MILLISECONDS)); } } else { throw new RuntimeException("Future is already done!"); @@ -335,7 +327,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu @SuppressWarnings("rawtypes") public ListenableFuture> getEntities(TenantId tenantId, String versionId, EntityType entityType, int offset, int limit) { EntitiesContentGitRequest request = new EntitiesContentGitRequest(tenantId, versionId, entityType); - + chunkedMsgs.put(request.getRequestId(), new HashMap<>()); registerAndSend(request, builder -> builder.setEntitiesContentRequest(EntitiesContentRequestMsg.newBuilder() .setVersionId(versionId) .setEntityType(entityType.name()) @@ -418,14 +410,11 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu ((ListVersionsGitRequest) request).getFuture().set(toPageData(listVersionsResponse)); } else if (vcResponseMsg.hasEntityContentResponse()) { TransportProtos.EntityContentResponseMsg responseMsg = vcResponseMsg.getEntityContentResponse(); - String[] msgChunks = chunkedMsgs.computeIfAbsent(requestId, id -> new HashMap<>()) - .computeIfAbsent(responseMsg.getChunkedMsgId(), id -> new String[responseMsg.getChunksCount()]); - msgChunks[responseMsg.getChunkIndex()] = responseMsg.getData(); log.trace("[{}] received chunk {} for 'getEntity'", responseMsg.getChunkedMsgId(), responseMsg.getChunkIndex()); - if (CollectionsUtil.countNonNull(msgChunks) == responseMsg.getChunksCount()) { + var joined = joinChunks(requestId, responseMsg, 1); + if (joined.isPresent()) { log.trace("[{}] collected all chunks for 'getEntity'", responseMsg.getChunkedMsgId()); - String data = String.join("", msgChunks); - ((EntityContentGitRequest) request).getFuture().set(toData(data)); + ((EntityContentGitRequest) request).getFuture().set(joined.get().get(0)); } else { completed = false; } @@ -433,17 +422,9 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu TransportProtos.EntitiesContentResponseMsg responseMsg = vcResponseMsg.getEntitiesContentResponse(); TransportProtos.EntityContentResponseMsg item = responseMsg.getItem(); if (responseMsg.getItemsCount() > 0) { - Map chunkedItems = chunkedMsgs.computeIfAbsent(requestId, id -> new HashMap<>()); - String[] itemChunks = chunkedItems.computeIfAbsent(item.getChunkedMsgId(), id -> { - return new String[item.getChunksCount()]; - }); - itemChunks[item.getChunkIndex()] = item.getData(); - if (chunkedItems.size() == responseMsg.getItemsCount() && chunkedItems.values().stream() - .allMatch(chunks -> CollectionsUtil.countNonNull(chunks) == chunks.length)) { - ((EntitiesContentGitRequest) request).getFuture().set(chunkedItems.values().stream() - .map(chunks -> String.join("", chunks)) - .map(this::toData) - .collect(Collectors.toList())); + var joined = joinChunks(requestId, item, responseMsg.getItemsCount()); + if (joined.isPresent()) { + ((EntitiesContentGitRequest) request).getFuture().set(joined.get()); } else { completed = false; } @@ -464,9 +445,6 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu .build()) .collect(Collectors.toList()); ((VersionsDiffGitRequest) request).getFuture().set(entityVersionsDiffList); - } else if (vcResponseMsg.hasContentsDiffResponse()) { - String diff = vcResponseMsg.getContentsDiffResponse().getDiff(); - ((ContentsDiffGitRequest) request).getFuture().set(diff); } } if (completed) { @@ -474,6 +452,25 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu } } + @SuppressWarnings("rawtypes") + private Optional> joinChunks(UUID requestId, TransportProtos.EntityContentResponseMsg responseMsg, int expectedMsgCount) { + var chunksMap = chunkedMsgs.get(requestId); + if (chunksMap == null) { + return Optional.empty(); + } + String[] msgChunks = chunksMap.computeIfAbsent(responseMsg.getChunkedMsgId(), id -> new String[responseMsg.getChunksCount()]); + msgChunks[responseMsg.getChunkIndex()] = responseMsg.getData(); + if (chunksMap.size() == expectedMsgCount && chunksMap.values().stream() + .allMatch(chunks -> CollectionsUtil.countNonNull(chunks) == chunks.length)) { + return Optional.of(chunksMap.values().stream() + .map(chunks -> String.join("", chunks)) + .map(this::toData) + .collect(Collectors.toList())); + } else { + return Optional.empty(); + } + } + private void processTimeout(UUID requestId) { PendingGitRequest pendingRequest = removePendingRequest(requestId); if (pendingRequest != null) { @@ -515,6 +512,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu return JacksonUtil.fromString(data, EntityExportData.class); } + //The future will be completed when the corresponding result arrives from kafka private static TbQueueCallback wrap(SettableFuture future) { return new TbQueueCallback() { @Override @@ -528,18 +526,17 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu }; } - private TbQueueCallback wrap(SettableFuture future, T value, UUID requestId) { + //The future will be completed when the request is successfully sent to kafka + private TbQueueCallback wrap(SettableFuture future, T value) { return new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { future.set(value); - removePendingRequest(requestId); } @Override public void onFailure(Throwable t) { future.setException(t); - removePendingRequest(requestId); } }; } diff --git a/application/src/main/java/org/thingsboard/server/service/sync/vc/GitVersionControlQueueService.java b/application/src/main/java/org/thingsboard/server/service/sync/vc/GitVersionControlQueueService.java index a1e64da8f1..cc83479896 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/vc/GitVersionControlQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/vc/GitVersionControlQueueService.java @@ -64,8 +64,6 @@ public interface GitVersionControlQueueService { ListenableFuture> getVersionsDiff(TenantId tenantId, EntityType entityType, EntityId externalId, String versionId1, String versionId2); - ListenableFuture getContentsDiff(TenantId tenantId, String rawEntityData1, String rawEntityData2); - ListenableFuture initRepository(TenantId tenantId, RepositorySettings settings); ListenableFuture testRepository(TenantId tenantId, RepositorySettings settings); diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index 855ddb9299..1b6bc43f74 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -829,15 +829,6 @@ message EntityVersionsDiff { string rawDiff = 6; } -message ContentsDiffRequestMsg { - string content1 = 1; - string content2 = 2; -} - -message ContentsDiffResponseMsg { - string diff = 1; -} - message GenericRepositoryRequestMsg {} message GenericRepositoryResponseMsg {} @@ -859,7 +850,6 @@ message ToVersionControlServiceMsg { EntityContentRequestMsg entityContentRequest = 14; EntitiesContentRequestMsg entitiesContentRequest = 15; VersionsDiffRequestMsg versionsDiffRequest = 16; - ContentsDiffRequestMsg contentsDiffRequest = 17; } message VersionControlResponseMsg { @@ -874,7 +864,6 @@ message VersionControlResponseMsg { EntityContentResponseMsg entityContentResponse = 9; EntitiesContentResponseMsg entitiesContentResponse = 10; VersionsDiffResponseMsg versionsDiffResponse = 11; - ContentsDiffResponseMsg contentsDiffResponse = 12; } /** diff --git a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java index d8d137ae5b..7d8e81d1c4 100644 --- a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java +++ b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java @@ -257,8 +257,6 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe handleEntitiesContentRequest(ctx, msg.getEntitiesContentRequest()); } else if (msg.hasVersionsDiffRequest()) { handleVersionsDiffRequest(ctx, msg.getVersionsDiffRequest()); - } else if (msg.hasContentsDiffRequest()) { - handleContentsDiffRequest(ctx, msg.getContentsDiffRequest()); } } } @@ -394,12 +392,6 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe .addAllDiff(diffList))); } - private void handleContentsDiffRequest(VersionControlRequestCtx ctx, TransportProtos.ContentsDiffRequestMsg request) throws IOException { - String diff = vcService.getContentsDiff(ctx.getTenantId(), request.getContent1(), request.getContent2()); - reply(ctx, builder -> builder.setContentsDiffResponse(TransportProtos.ContentsDiffResponseMsg.newBuilder() - .setDiff(diff))); - } - private void handleCommitRequest(VersionControlRequestCtx ctx, CommitRequestMsg request) throws Exception { var tenantId = ctx.getTenantId(); UUID txId = UUID.fromString(request.getTxId()); From 48c3ce70476536f3a4ba17de25a9bb70343fde9f Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Mon, 27 Jun 2022 11:48:58 +0300 Subject: [PATCH 12/12] Fix cache value serialization issue --- .../sync/vc/DefaultEntitiesVersionControlService.java | 2 ++ .../server/cache/SimpleTbCacheValueWrapper.java | 2 ++ .../server/common/data/sync/vc/EntityLoadError.java | 5 ++++- .../server/common/data/sync/vc/EntityTypeLoadResult.java | 6 +++++- docker/.gitignore | 8 ++++++++ 5 files changed, 21 insertions(+), 2 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultEntitiesVersionControlService.java b/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultEntitiesVersionControlService.java index 74c5c78dae..5bfc696cc1 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultEntitiesVersionControlService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultEntitiesVersionControlService.java @@ -174,9 +174,11 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont private T getStatus(SecurityUser user, UUID requestId, Function getter) throws ThingsboardException { var cacheEntry = taskCache.get(requestId); if (cacheEntry == null || cacheEntry.get() == null) { + log.debug("[{}] No cache record: {}", requestId, cacheEntry); throw new ThingsboardException(ThingsboardErrorCode.ITEM_NOT_FOUND); } else { var entry = cacheEntry.get(); + log.debug("[{}] Cache get: {}", requestId, entry); var result = getter.apply(entry); if (result == null) { throw new ThingsboardException(ThingsboardErrorCode.BAD_REQUEST_PARAMS); diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/SimpleTbCacheValueWrapper.java b/common/cache/src/main/java/org/thingsboard/server/cache/SimpleTbCacheValueWrapper.java index f5607fb760..58ccfd8fb9 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/SimpleTbCacheValueWrapper.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/SimpleTbCacheValueWrapper.java @@ -17,8 +17,10 @@ package org.thingsboard.server.cache; import lombok.AccessLevel; import lombok.RequiredArgsConstructor; +import lombok.ToString; import org.springframework.cache.Cache; +@ToString @RequiredArgsConstructor(access = AccessLevel.PRIVATE) public class SimpleTbCacheValueWrapper implements TbCacheValueWrapper { diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/EntityLoadError.java b/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/EntityLoadError.java index bc3475c89e..cd3dc5e0e2 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/EntityLoadError.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/EntityLoadError.java @@ -20,12 +20,15 @@ import lombok.Builder; import lombok.Data; import org.thingsboard.server.common.data.id.EntityId; +import java.io.Serializable; import java.util.List; @Data @Builder @JsonInclude(JsonInclude.Include.NON_NULL) -public class EntityLoadError { +public class EntityLoadError implements Serializable { + + private static final long serialVersionUID = 7538450180582109391L; private String type; private EntityId source; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/EntityTypeLoadResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/EntityTypeLoadResult.java index 84a28d0770..06df7ebb83 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/EntityTypeLoadResult.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/sync/vc/EntityTypeLoadResult.java @@ -21,11 +21,15 @@ import lombok.Data; import lombok.NoArgsConstructor; import org.thingsboard.server.common.data.EntityType; +import java.io.Serializable; + @Data @AllArgsConstructor @NoArgsConstructor @Builder -public class EntityTypeLoadResult { +public class EntityTypeLoadResult implements Serializable { + private static final long serialVersionUID = -8428039809651395241L; + private EntityType entityType; private int created; private int updated; diff --git a/docker/.gitignore b/docker/.gitignore index 9c4c778f28..c9172ae6ce 100644 --- a/docker/.gitignore +++ b/docker/.gitignore @@ -6,4 +6,12 @@ tb-node/postgres/** tb-node/cassandra/** tb-transports/*/log tb-vc-executor/log/** +tb-node/redis-cluster-data-0/** +tb-node/redis-cluster-data-1/** +tb-node/redis-cluster-data-2/** +tb-node/redis-cluster-data-3/** +tb-node/redis-cluster-data-4/** +tb-node/redis-cluster-data-5/** +tb-node/redis-data/** + !.env