From e22a9d3e5028f850210190b1fa93e7be76dce82b Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Wed, 25 Feb 2026 17:06:00 +0200 Subject: [PATCH] Added syncInProgress as edge attribute --- .../service/edge/EdgeContextComponent.java | 4 ++ .../edge/rpc/AttributeSaveCallback.java | 28 ++++++++++++++ .../service/edge/rpc/EdgeGrpcService.java | 37 ++----------------- .../service/edge/rpc/EdgeGrpcSession.java | 20 ++++++++-- .../org/thingsboard/server/edge/EdgeTest.java | 25 +++++++++++++ .../server/dao/edge/EdgeService.java | 2 + .../server/common/data/DataConstants.java | 1 + .../server/dao/edge/EdgeServiceImpl.java | 20 +++++++--- 8 files changed, 95 insertions(+), 42 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/service/edge/rpc/AttributeSaveCallback.java diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java index a3b9ba2810..c03c6affe2 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java @@ -78,6 +78,7 @@ import org.thingsboard.server.service.edge.rpc.processor.telemetry.TelemetryEdge import org.thingsboard.server.service.edge.rpc.processor.user.UserProcessor; import org.thingsboard.server.service.edge.rpc.sync.EdgeRequestsService; import org.thingsboard.server.service.executors.GrpcCallbackExecutorService; +import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import java.util.EnumMap; import java.util.List; @@ -104,6 +105,9 @@ public class EdgeContextComponent { } // services + @Autowired + private TelemetrySubscriptionService tsSubService; + @Autowired private AdminSettingsService adminSettingsService; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/AttributeSaveCallback.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/AttributeSaveCallback.java new file mode 100644 index 0000000000..df8a51879e --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/AttributeSaveCallback.java @@ -0,0 +1,28 @@ +package org.thingsboard.server.service.edge.rpc; + +import com.google.common.util.concurrent.FutureCallback; +import jakarta.annotation.Nullable; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.TenantId; + +@Slf4j +@AllArgsConstructor +public class AttributeSaveCallback implements FutureCallback { + + private final TenantId tenantId; + private final EdgeId edgeId; + private final String key; + private final Object value; + + @Override + public void onSuccess(@Nullable Void result) { + log.trace("[{}][{}] Successfully updated attribute [{}] with value [{}]", tenantId, edgeId, key, value); + } + + @Override + public void onFailure(Throwable t) { + log.warn("[{}][{}] Failed to update attribute [{}] with value [{}]", tenantId, edgeId, key, value, t); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 76070bec53..c5822cc494 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -135,9 +135,6 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i @Lazy private EdgeContextComponent ctx; - @Autowired - private TelemetrySubscriptionService tsSubService; - @Autowired private TbClusterService clusterService; @@ -553,14 +550,14 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private void save(TenantId tenantId, EdgeId edgeId, String key, long value) { log.debug("[{}][{}] Updating long edge telemetry [{}] [{}]", tenantId, edgeId, key, value); if (persistToTelemetry) { - tsSubService.saveTimeseries(TimeseriesSaveRequest.builder() + ctx.getTsSubService().saveTimeseries(TimeseriesSaveRequest.builder() .tenantId(tenantId) .entityId(edgeId) .entry(new LongDataEntry(key, value)) .callback(new AttributeSaveCallback(tenantId, edgeId, key, value)) .build()); } else { - tsSubService.saveAttributes(AttributesSaveRequest.builder() + ctx.getTsSubService().saveAttributes(AttributesSaveRequest.builder() .tenantId(tenantId) .entityId(edgeId) .scope(AttributeScope.SERVER_SCOPE) @@ -573,14 +570,14 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private void save(TenantId tenantId, EdgeId edgeId, String key, boolean value) { log.debug("[{}][{}] Updating boolean edge telemetry [{}] [{}]", tenantId, edgeId, key, value); if (persistToTelemetry) { - tsSubService.saveTimeseries(TimeseriesSaveRequest.builder() + ctx.getTsSubService().saveTimeseries(TimeseriesSaveRequest.builder() .tenantId(tenantId) .entityId(edgeId) .entry(new BooleanDataEntry(key, value)) .callback(new AttributeSaveCallback(tenantId, edgeId, key, value)) .build()); } else { - tsSubService.saveAttributes(AttributesSaveRequest.builder() + ctx.getTsSubService().saveAttributes(AttributesSaveRequest.builder() .tenantId(tenantId) .entityId(edgeId) .scope(AttributeScope.SERVER_SCOPE) @@ -590,32 +587,6 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } } - private static class AttributeSaveCallback implements FutureCallback { - - private final TenantId tenantId; - private final EdgeId edgeId; - private final String key; - private final Object value; - - AttributeSaveCallback(TenantId tenantId, EdgeId edgeId, String key, Object value) { - this.tenantId = tenantId; - this.edgeId = edgeId; - this.key = key; - this.value = value; - } - - @Override - public void onSuccess(@Nullable Void result) { - log.trace("[{}][{}] Successfully updated attribute [{}] with value [{}]", tenantId, edgeId, key, value); - } - - @Override - public void onFailure(Throwable t) { - log.warn("[{}][{}] Failed to update attribute [{}] with value [{}]", tenantId, edgeId, key, value, t); - } - - } - private void pushRuleEngineMessage(TenantId tenantId, Edge edge, long ts, TbMsgType msgType) { try { EdgeId edgeId = edge.getId(); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 2b026f207f..41a5916e7b 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -25,6 +25,7 @@ import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.Nullable; import org.springframework.data.util.Pair; +import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EdgeUtils; @@ -37,6 +38,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributesSaveResult; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; +import org.thingsboard.server.common.data.kv.BooleanDataEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.limit.LimitedApi; @@ -195,7 +197,7 @@ public abstract class EdgeGrpcSession implements Closeable { } startSyncProcess(fullSync); } else { - syncInProgress = false; + updateSyncInProgress(false); } } if (requestMsg.getMsgType().equals(RequestMsgType.UPLINK_RPC_MESSAGE)) { @@ -252,7 +254,7 @@ public abstract class EdgeGrpcSession implements Closeable { public void startSyncProcess(boolean fullSync) { if (!syncInProgress) { log.info("[{}][{}][{}] Staring edge sync process", tenantId, edge.getId(), sessionId); - syncInProgress = true; + updateSyncInProgress(true); interruptGeneralProcessingOnSync(); doSync(new EdgeSyncCursor(ctx, edge, fullSync)); } else { @@ -398,6 +400,18 @@ public abstract class EdgeGrpcSession implements Closeable { ctx.getAttributesService().save(tenantId, edge.getId(), AttributeScope.SERVER_SCOPE, attributeKvEntry); } + private void updateSyncInProgress(Boolean value) { + this.syncInProgress = value; + + ctx.getTsSubService().saveAttributes(AttributesSaveRequest.builder() + .tenantId(tenantId) + .entityId(edge.getId()) + .scope(AttributeScope.SERVER_SCOPE) + .entry(new BooleanDataEntry(DataConstants.EDGE_SYNC_IN_PROGRESS_ATTR_KEY, value)) + .callback(new AttributeSaveCallback(tenantId, edge.getId(), DataConstants.EDGE_SYNC_IN_PROGRESS_ATTR_KEY, value)) + .build()); + } + private void interruptGeneralProcessingOnSync() { log.debug("[{}][{}][{}] Sync process started. General processing interrupted!", tenantId, edge.getId(), sessionId); stopCurrentSendDownlinkMsgsTask(true); @@ -766,7 +780,7 @@ public abstract class EdgeGrpcSession implements Closeable { } private void markSyncCompletedSendEdgeEventUpdate() { - syncInProgress = false; + updateSyncInProgress(false); ctx.getClusterService().onEdgeEventUpdate(new EdgeEventUpdateMsg(edge.getTenantId(), edge.getId())); } diff --git a/application/src/test/java/org/thingsboard/server/edge/EdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/EdgeTest.java index 334680f7a7..f4b204b3e6 100644 --- a/application/src/test/java/org/thingsboard/server/edge/EdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/EdgeTest.java @@ -15,10 +15,12 @@ */ package org.thingsboard.server.edge; +import com.fasterxml.jackson.databind.JsonNode; import org.junit.Assert; import org.junit.Test; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Customer; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; @@ -26,10 +28,14 @@ import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.gen.edge.v1.CustomerUpdateMsg; import org.thingsboard.server.gen.edge.v1.EdgeConfiguration; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; +import org.thingsboard.server.service.subscription.TbAttributeSubscriptionScope; +import java.util.List; import java.util.Optional; import java.util.UUID; +import static org.assertj.core.api.Assertions.assertThat; + @DaoSqlTest public class EdgeTest extends AbstractEdgeTest { @@ -76,4 +82,23 @@ public class EdgeTest extends AbstractEdgeTest { Assert.assertEquals(savedCustomer.getUuidId().getMostSignificantBits(), customerUpdateMsg.getIdMSB()); Assert.assertEquals(savedCustomer.getUuidId().getLeastSignificantBits(), customerUpdateMsg.getIdLSB()); } + + @Test + public void testSyncEdge_attributeUpdated() throws Exception { + getWsClient().subscribeForAttributes(edge.getId(), TbAttributeSubscriptionScope.SERVER_SCOPE.name(), List.of(DataConstants.EDGE_SYNC_IN_PROGRESS_ATTR_KEY)); + + doPost("/api/edge/sync/" + edge.getId()); + + // wait for sync to start + waitForEdgeSyncInProgressEqualsValue(true); + + // wait for sync to end + waitForEdgeSyncInProgressEqualsValue(false); + } + + private void waitForEdgeSyncInProgressEqualsValue(Boolean value) { + getWsClient().registerWaitForUpdate(); + JsonNode update = JacksonUtil.toJsonNode(getWsClient().waitForUpdate()); + assertThat(update.get("data").get(DataConstants.EDGE_SYNC_IN_PROGRESS_ATTR_KEY).get(0).get(1).asBoolean()).isEqualTo(value); + } } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java index 7bf03a6a69..52ff515918 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java @@ -51,6 +51,8 @@ public interface EdgeService extends EntityDaoService { Edge saveEdge(Edge edge); + Edge saveEdge(Edge edge, boolean publishEvent); + Edge assignEdgeToCustomer(TenantId tenantId, EdgeId edgeId, CustomerId customerId); Edge unassignEdgeFromCustomer(TenantId tenantId, EdgeId edgeId); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index de46575e06..04f3143834 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -127,6 +127,7 @@ public class DataConstants { public static final String EDGE_MSG_SOURCE = "edge"; public static final String MSG_SOURCE_KEY = "source"; public static final String EDGE_VERSION_ATTR_KEY = "edgeVersion"; + public static final String EDGE_SYNC_IN_PROGRESS_ATTR_KEY = "syncInProgress"; public static final String LAST_CONNECTED_GATEWAY = "lastConnectedGateway"; diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java index fa955fa72e..91ee06aed2 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java @@ -203,18 +203,25 @@ public class EdgeServiceImpl extends AbstractCachedEntityService doSaveEdge(edge)); + return saveEdge(edge, true); } - private Edge doSaveEdge(Edge edge) { + @Override + public Edge saveEdge(Edge edge, boolean publishEvent) { + return saveEntity(edge, () -> doSaveEdge(edge, publishEvent)); + } + + private Edge doSaveEdge(Edge edge, boolean publishEvent) { log.trace("Executing saveEdge [{}]", edge); Edge oldEdge = edgeValidator.validate(edge, Edge::getTenantId); EdgeCacheEvictEvent evictEvent = new EdgeCacheEvictEvent(edge.getTenantId(), edge.getName(), oldEdge != null ? oldEdge.getName() : null); try { Edge savedEdge = edgeDao.save(edge.getTenantId(), edge); publishEvictEvent(evictEvent); - eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(savedEdge.getTenantId()) - .entityId(savedEdge.getId()).entity(savedEdge).created(edge.getId() == null).build()); + if (publishEvent) { + eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(savedEdge.getTenantId()) + .entityId(savedEdge.getId()).entity(savedEdge).created(edge.getId() == null).build()); + } if (edge.getId() == null) { countService.publishCountEntityEvictEvent(savedEdge.getTenantId(), EntityType.EDGE); } @@ -239,9 +246,10 @@ public class EdgeServiceImpl extends AbstractCachedEntityService