Browse Source

Added syncInProgress as edge attribute

pull/15111/head
Volodymyr Babak 3 months ago
parent
commit
e22a9d3e50
  1. 4
      application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java
  2. 28
      application/src/main/java/org/thingsboard/server/service/edge/rpc/AttributeSaveCallback.java
  3. 37
      application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java
  4. 20
      application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java
  5. 25
      application/src/test/java/org/thingsboard/server/edge/EdgeTest.java
  6. 2
      common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java
  7. 1
      common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
  8. 20
      dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java

4
application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java

@ -78,6 +78,7 @@ import org.thingsboard.server.service.edge.rpc.processor.telemetry.TelemetryEdge
import org.thingsboard.server.service.edge.rpc.processor.user.UserProcessor;
import org.thingsboard.server.service.edge.rpc.sync.EdgeRequestsService;
import org.thingsboard.server.service.executors.GrpcCallbackExecutorService;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import java.util.EnumMap;
import java.util.List;
@ -104,6 +105,9 @@ public class EdgeContextComponent {
}
// services
@Autowired
private TelemetrySubscriptionService tsSubService;
@Autowired
private AdminSettingsService adminSettingsService;

28
application/src/main/java/org/thingsboard/server/service/edge/rpc/AttributeSaveCallback.java

@ -0,0 +1,28 @@
package org.thingsboard.server.service.edge.rpc;
import com.google.common.util.concurrent.FutureCallback;
import jakarta.annotation.Nullable;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
@Slf4j
@AllArgsConstructor
public class AttributeSaveCallback implements FutureCallback<Void> {
private final TenantId tenantId;
private final EdgeId edgeId;
private final String key;
private final Object value;
@Override
public void onSuccess(@Nullable Void result) {
log.trace("[{}][{}] Successfully updated attribute [{}] with value [{}]", tenantId, edgeId, key, value);
}
@Override
public void onFailure(Throwable t) {
log.warn("[{}][{}] Failed to update attribute [{}] with value [{}]", tenantId, edgeId, key, value, t);
}
}

37
application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java

@ -135,9 +135,6 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
@Lazy
private EdgeContextComponent ctx;
@Autowired
private TelemetrySubscriptionService tsSubService;
@Autowired
private TbClusterService clusterService;
@ -553,14 +550,14 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
private void save(TenantId tenantId, EdgeId edgeId, String key, long value) {
log.debug("[{}][{}] Updating long edge telemetry [{}] [{}]", tenantId, edgeId, key, value);
if (persistToTelemetry) {
tsSubService.saveTimeseries(TimeseriesSaveRequest.builder()
ctx.getTsSubService().saveTimeseries(TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.entityId(edgeId)
.entry(new LongDataEntry(key, value))
.callback(new AttributeSaveCallback(tenantId, edgeId, key, value))
.build());
} else {
tsSubService.saveAttributes(AttributesSaveRequest.builder()
ctx.getTsSubService().saveAttributes(AttributesSaveRequest.builder()
.tenantId(tenantId)
.entityId(edgeId)
.scope(AttributeScope.SERVER_SCOPE)
@ -573,14 +570,14 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
private void save(TenantId tenantId, EdgeId edgeId, String key, boolean value) {
log.debug("[{}][{}] Updating boolean edge telemetry [{}] [{}]", tenantId, edgeId, key, value);
if (persistToTelemetry) {
tsSubService.saveTimeseries(TimeseriesSaveRequest.builder()
ctx.getTsSubService().saveTimeseries(TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.entityId(edgeId)
.entry(new BooleanDataEntry(key, value))
.callback(new AttributeSaveCallback(tenantId, edgeId, key, value))
.build());
} else {
tsSubService.saveAttributes(AttributesSaveRequest.builder()
ctx.getTsSubService().saveAttributes(AttributesSaveRequest.builder()
.tenantId(tenantId)
.entityId(edgeId)
.scope(AttributeScope.SERVER_SCOPE)
@ -590,32 +587,6 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
}
}
private static class AttributeSaveCallback implements FutureCallback<Void> {
private final TenantId tenantId;
private final EdgeId edgeId;
private final String key;
private final Object value;
AttributeSaveCallback(TenantId tenantId, EdgeId edgeId, String key, Object value) {
this.tenantId = tenantId;
this.edgeId = edgeId;
this.key = key;
this.value = value;
}
@Override
public void onSuccess(@Nullable Void result) {
log.trace("[{}][{}] Successfully updated attribute [{}] with value [{}]", tenantId, edgeId, key, value);
}
@Override
public void onFailure(Throwable t) {
log.warn("[{}][{}] Failed to update attribute [{}] with value [{}]", tenantId, edgeId, key, value, t);
}
}
private void pushRuleEngineMessage(TenantId tenantId, Edge edge, long ts, TbMsgType msgType) {
try {
EdgeId edgeId = edge.getId();

20
application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java

@ -25,6 +25,7 @@ import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.data.util.Pair;
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EdgeUtils;
@ -37,6 +38,7 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.AttributesSaveResult;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.limit.LimitedApi;
@ -195,7 +197,7 @@ public abstract class EdgeGrpcSession implements Closeable {
}
startSyncProcess(fullSync);
} else {
syncInProgress = false;
updateSyncInProgress(false);
}
}
if (requestMsg.getMsgType().equals(RequestMsgType.UPLINK_RPC_MESSAGE)) {
@ -252,7 +254,7 @@ public abstract class EdgeGrpcSession implements Closeable {
public void startSyncProcess(boolean fullSync) {
if (!syncInProgress) {
log.info("[{}][{}][{}] Staring edge sync process", tenantId, edge.getId(), sessionId);
syncInProgress = true;
updateSyncInProgress(true);
interruptGeneralProcessingOnSync();
doSync(new EdgeSyncCursor(ctx, edge, fullSync));
} else {
@ -398,6 +400,18 @@ public abstract class EdgeGrpcSession implements Closeable {
ctx.getAttributesService().save(tenantId, edge.getId(), AttributeScope.SERVER_SCOPE, attributeKvEntry);
}
private void updateSyncInProgress(Boolean value) {
this.syncInProgress = value;
ctx.getTsSubService().saveAttributes(AttributesSaveRequest.builder()
.tenantId(tenantId)
.entityId(edge.getId())
.scope(AttributeScope.SERVER_SCOPE)
.entry(new BooleanDataEntry(DataConstants.EDGE_SYNC_IN_PROGRESS_ATTR_KEY, value))
.callback(new AttributeSaveCallback(tenantId, edge.getId(), DataConstants.EDGE_SYNC_IN_PROGRESS_ATTR_KEY, value))
.build());
}
private void interruptGeneralProcessingOnSync() {
log.debug("[{}][{}][{}] Sync process started. General processing interrupted!", tenantId, edge.getId(), sessionId);
stopCurrentSendDownlinkMsgsTask(true);
@ -766,7 +780,7 @@ public abstract class EdgeGrpcSession implements Closeable {
}
private void markSyncCompletedSendEdgeEventUpdate() {
syncInProgress = false;
updateSyncInProgress(false);
ctx.getClusterService().onEdgeEventUpdate(new EdgeEventUpdateMsg(edge.getTenantId(), edge.getId()));
}

25
application/src/test/java/org/thingsboard/server/edge/EdgeTest.java

@ -15,10 +15,12 @@
*/
package org.thingsboard.server.edge;
import com.fasterxml.jackson.databind.JsonNode;
import org.junit.Assert;
import org.junit.Test;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
@ -26,10 +28,14 @@ import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.gen.edge.v1.CustomerUpdateMsg;
import org.thingsboard.server.gen.edge.v1.EdgeConfiguration;
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
import org.thingsboard.server.service.subscription.TbAttributeSubscriptionScope;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
@DaoSqlTest
public class EdgeTest extends AbstractEdgeTest {
@ -76,4 +82,23 @@ public class EdgeTest extends AbstractEdgeTest {
Assert.assertEquals(savedCustomer.getUuidId().getMostSignificantBits(), customerUpdateMsg.getIdMSB());
Assert.assertEquals(savedCustomer.getUuidId().getLeastSignificantBits(), customerUpdateMsg.getIdLSB());
}
@Test
public void testSyncEdge_attributeUpdated() throws Exception {
getWsClient().subscribeForAttributes(edge.getId(), TbAttributeSubscriptionScope.SERVER_SCOPE.name(), List.of(DataConstants.EDGE_SYNC_IN_PROGRESS_ATTR_KEY));
doPost("/api/edge/sync/" + edge.getId());
// wait for sync to start
waitForEdgeSyncInProgressEqualsValue(true);
// wait for sync to end
waitForEdgeSyncInProgressEqualsValue(false);
}
private void waitForEdgeSyncInProgressEqualsValue(Boolean value) {
getWsClient().registerWaitForUpdate();
JsonNode update = JacksonUtil.toJsonNode(getWsClient().waitForUpdate());
assertThat(update.get("data").get(DataConstants.EDGE_SYNC_IN_PROGRESS_ATTR_KEY).get(0).get(1).asBoolean()).isEqualTo(value);
}
}

2
common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java

@ -51,6 +51,8 @@ public interface EdgeService extends EntityDaoService {
Edge saveEdge(Edge edge);
Edge saveEdge(Edge edge, boolean publishEvent);
Edge assignEdgeToCustomer(TenantId tenantId, EdgeId edgeId, CustomerId customerId);
Edge unassignEdgeFromCustomer(TenantId tenantId, EdgeId edgeId);

1
common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java

@ -127,6 +127,7 @@ public class DataConstants {
public static final String EDGE_MSG_SOURCE = "edge";
public static final String MSG_SOURCE_KEY = "source";
public static final String EDGE_VERSION_ATTR_KEY = "edgeVersion";
public static final String EDGE_SYNC_IN_PROGRESS_ATTR_KEY = "syncInProgress";
public static final String LAST_CONNECTED_GATEWAY = "lastConnectedGateway";

20
dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java

@ -203,18 +203,25 @@ public class EdgeServiceImpl extends AbstractCachedEntityService<EdgeCacheKey, E
@Override
public Edge saveEdge(Edge edge) {
return saveEntity(edge, () -> doSaveEdge(edge));
return saveEdge(edge, true);
}
private Edge doSaveEdge(Edge edge) {
@Override
public Edge saveEdge(Edge edge, boolean publishEvent) {
return saveEntity(edge, () -> doSaveEdge(edge, publishEvent));
}
private Edge doSaveEdge(Edge edge, boolean publishEvent) {
log.trace("Executing saveEdge [{}]", edge);
Edge oldEdge = edgeValidator.validate(edge, Edge::getTenantId);
EdgeCacheEvictEvent evictEvent = new EdgeCacheEvictEvent(edge.getTenantId(), edge.getName(), oldEdge != null ? oldEdge.getName() : null);
try {
Edge savedEdge = edgeDao.save(edge.getTenantId(), edge);
publishEvictEvent(evictEvent);
eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(savedEdge.getTenantId())
.entityId(savedEdge.getId()).entity(savedEdge).created(edge.getId() == null).build());
if (publishEvent) {
eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(savedEdge.getTenantId())
.entityId(savedEdge.getId()).entity(savedEdge).created(edge.getId() == null).build());
}
if (edge.getId() == null) {
countService.publishCountEntityEvictEvent(savedEdge.getTenantId(), EntityType.EDGE);
}
@ -239,9 +246,10 @@ public class EdgeServiceImpl extends AbstractCachedEntityService<EdgeCacheKey, E
return edge;
}
edge.setCustomerId(customerId);
Edge result = saveEdge(edge, false);
eventPublisher.publishEvent(ActionEntityEvent.builder().tenantId(tenantId).entityId(edgeId)
.body(JacksonUtil.toString(customerId)).actionType(ActionType.ASSIGNED_TO_CUSTOMER).build());
return saveEdge(edge);
return result;
}
@Override
@ -253,7 +261,7 @@ public class EdgeServiceImpl extends AbstractCachedEntityService<EdgeCacheKey, E
return edge;
}
edge.setCustomerId(null);
Edge result = saveEdge(edge);
Edge result = saveEdge(edge, false);
eventPublisher.publishEvent(ActionEntityEvent.builder().tenantId(tenantId).entityId(edgeId)
.body(JacksonUtil.toString(customerId)).actionType(ActionType.UNASSIGNED_FROM_CUSTOMER).build());
return result;

Loading…
Cancel
Save