From cfca80defe71af0188e20046d7866697d1a84659 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Fri, 2 Aug 2024 12:48:13 +0300 Subject: [PATCH 1/5] Remove Edge request messages - send multiple data messages in single rpc message --- .../server/controller/EdgeController.java | 20 +++++++++++++- .../service/edge/rpc/EdgeGrpcService.java | 26 ++++++++++++++----- .../service/edge/rpc/EdgeGrpcSession.java | 17 +++++++++--- .../service/edge/rpc/EdgeRpcService.java | 2 ++ .../service/edge/rpc/EdgeSyncCursor.java | 19 ++++++-------- .../edge/rpc/processor/BaseEdgeProcessor.java | 2 +- .../processor/device/DeviceEdgeProcessor.java | 24 +++++------------ .../rule/RuleChainEdgeProcessor.java | 15 ++++++++--- .../rpc/processor/user/UserEdgeProcessor.java | 12 ++++++--- .../common/data/edge/EdgeEventActionType.java | 2 +- .../common/msg/edge/FromEdgeSyncResponse.java | 3 ++- .../server/common/util/ProtoUtils.java | 3 ++- common/proto/src/main/proto/queue.proto | 1 + .../server/common/util/ProtoUtilsTest.java | 2 +- .../thingsboard/rest/client/RestClient.java | 4 +++ 15 files changed, 102 insertions(+), 50 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/EdgeController.java b/application/src/main/java/org/thingsboard/server/controller/EdgeController.java index 33b6045248..011f2d6614 100644 --- a/application/src/main/java/org/thingsboard/server/controller/EdgeController.java +++ b/application/src/main/java/org/thingsboard/server/controller/EdgeController.java @@ -490,7 +490,25 @@ public class EdgeController extends BaseController { if (fromEdgeSyncResponse.isSuccess()) { response.setResult(new ResponseEntity<>(HttpStatus.OK)); } else { - response.setErrorResult(new ThingsboardException("Edge is not connected", ThingsboardErrorCode.GENERAL)); + response.setErrorResult(new ThingsboardException(fromEdgeSyncResponse.getError(), ThingsboardErrorCode.GENERAL)); + } + } + + @ApiOperation(value = "Is edge sync process is active (isEdgeSyncProcessActive)", + notes = "Returns 'true' if edge is currently in sync process, 'false' - otherwise." + TENANT_AUTHORITY_PARAGRAPH) + @PreAuthorize("hasAuthority('TENANT_ADMIN')") + @GetMapping(value = "/edge/sync/{edgeId}/active") + public Boolean isEdgeSyncProcessActive( + @Parameter(description = EDGE_ID_PARAM_DESCRIPTION, required = true) + @PathVariable("edgeId") String strEdgeId) throws ThingsboardException { + checkParameter("edgeId", strEdgeId); + if (isEdgesEnabled() && edgeRpcServiceOpt.isPresent()) { + EdgeId edgeId = new EdgeId(toUUID(strEdgeId)); + edgeId = checkNotNull(edgeId); + Edge edge = checkEdgeId(edgeId, Operation.READ); + return edgeRpcServiceOpt.get().isEdgeSyncProcessActive(edge.getTenantId(), edge.getId()); + } else { + throw new ThingsboardException("Edges support disabled", ThingsboardErrorCode.GENERAL); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 04e6fe2185..19b70f5e87 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -292,17 +292,31 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i session.startSyncProcess(true); success = true; } - clusterService.pushEdgeSyncResponseToCore(new FromEdgeSyncResponse(requestId, tenantId, edgeId, success)); + clusterService.pushEdgeSyncResponseToCore(new FromEdgeSyncResponse(requestId, tenantId, edgeId, success, null)); } } @Override public void processSyncRequest(ToEdgeSyncRequest request, Consumer responseConsumer) { - log.trace("[{}][{}] Processing sync edge request [{}]", request.getTenantId(), request.getId(), request.getEdgeId()); UUID requestId = request.getId(); - localSyncEdgeRequests.put(requestId, responseConsumer); - clusterService.pushEdgeSyncRequestToCore(request); - scheduleSyncRequestTimeout(request, requestId); + EdgeGrpcSession session = sessions.get(request.getEdgeId()); + if (!session.isSyncCompleted()) { + responseConsumer.accept(new FromEdgeSyncResponse(requestId, request.getTenantId(), request.getEdgeId(), false, "Sync process is active at the moment")); + } else { + log.trace("[{}][{}] Processing sync edge request [{}]", request.getTenantId(), request.getId(), request.getEdgeId()); + localSyncEdgeRequests.put(requestId, responseConsumer); + clusterService.pushEdgeSyncRequestToCore(request); + scheduleSyncRequestTimeout(request, requestId); + } + } + + @Override + public Boolean isEdgeSyncProcessActive(TenantId tenantId, EdgeId edgeId) { + EdgeGrpcSession session = sessions.get(edgeId); + if (session == null) { + return false; + } + return !session.isSyncCompleted(); } private void scheduleSyncRequestTimeout(ToEdgeSyncRequest request, UUID requestId) { @@ -312,7 +326,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i Consumer consumer = localSyncEdgeRequests.remove(requestId); if (consumer != null) { log.trace("[{}] timeout for processing sync edge request.", requestId); - consumer.accept(new FromEdgeSyncResponse(requestId, request.getTenantId(), request.getEdgeId(), false)); + consumer.accept(new FromEdgeSyncResponse(requestId, request.getTenantId(), request.getEdgeId(), false, "Edge is not connected")); } }, 20, TimeUnit.SECONDS); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 55e8e051ab..1e84caeadf 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -247,6 +247,7 @@ public final class EdgeGrpcSession implements Closeable { } }, ctx.getGrpcCallbackExecutorService()); } else { + log.info("[{}][{}] sync process completed", this.tenantId, edge.getId()); DownlinkMsg syncCompleteDownlinkMsg = DownlinkMsg.newBuilder() .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) .setSyncCompletedMsg(SyncCompletedMsg.newBuilder().build()) @@ -325,7 +326,11 @@ public final class EdgeGrpcSession implements Closeable { } private void sendDownlinkMsg(ResponseMsg downlinkMsg) { - log.trace("[{}][{}] Sending downlink msg [{}]", this.tenantId, this.sessionId, downlinkMsg); + if (downlinkMsg.getDownlinkMsg().getWidgetTypeUpdateMsgCount() > 0) { + log.trace("[{}][{}] Sending downlink widgetTypeUpdateMsg, downlinkMsgId = {}", this.tenantId, this.sessionId, downlinkMsg.getDownlinkMsg().getDownlinkMsgId()); + } else { + log.trace("[{}][{}] Sending downlink msg [{}]", this.tenantId, this.sessionId, downlinkMsg); + } if (isConnected()) { downlinkMsgLock.lock(); try { @@ -337,7 +342,7 @@ public final class EdgeGrpcSession implements Closeable { } finally { downlinkMsgLock.unlock(); } - log.trace("[{}][{}] Response msg successfully sent [{}]", this.tenantId, this.sessionId, downlinkMsg); + log.trace("[{}][{}] Response msg successfully sent. downlinkMsgId = {}", this.tenantId, this.sessionId, downlinkMsg.getDownlinkMsg().getDownlinkMsgId()); } } @@ -551,9 +556,13 @@ public final class EdgeGrpcSession implements Closeable { DownlinkMsg downlinkMsg = null; try { switch (edgeEvent.getAction()) { - case UPDATED, ADDED, DELETED, ASSIGNED_TO_EDGE, UNASSIGNED_FROM_EDGE, ALARM_ACK, ALARM_CLEAR, ALARM_DELETE, CREDENTIALS_UPDATED, RELATION_ADD_OR_UPDATE, RELATION_DELETED, CREDENTIALS_REQUEST, RPC_CALL, ASSIGNED_TO_CUSTOMER, UNASSIGNED_FROM_CUSTOMER, ADDED_COMMENT, UPDATED_COMMENT, DELETED_COMMENT -> { + case UPDATED, ADDED, DELETED, ASSIGNED_TO_EDGE, UNASSIGNED_FROM_EDGE, ALARM_ACK, ALARM_CLEAR, ALARM_DELETE, CREDENTIALS_UPDATED, RELATION_ADD_OR_UPDATE, RELATION_DELETED, RPC_CALL, ASSIGNED_TO_CUSTOMER, UNASSIGNED_FROM_CUSTOMER, ADDED_COMMENT, UPDATED_COMMENT, DELETED_COMMENT -> { downlinkMsg = convertEntityEventToDownlink(edgeEvent); - log.trace("[{}][{}] entity message processed [{}]", this.tenantId, this.sessionId, downlinkMsg); + if (downlinkMsg != null && downlinkMsg.getWidgetTypeUpdateMsgCount() > 0) { + log.trace("[{}][{}] widgetTypeUpdateMsg message processed, downlinkMsgId = {}", this.tenantId, this.sessionId, downlinkMsg.getDownlinkMsgId()); + } else { + log.trace("[{}][{}] entity message processed [{}]", this.tenantId, this.sessionId, downlinkMsg); + } } case ATTRIBUTES_UPDATED, POST_ATTRIBUTES, ATTRIBUTES_DELETED, TIMESERIES_UPDATED -> downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edge, edgeEvent); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java index 12203db0c2..733d2e95cb 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java @@ -33,4 +33,6 @@ public interface EdgeRpcService { void deleteEdge(TenantId tenantId, EdgeId edgeId); void processSyncRequest(ToEdgeSyncRequest request, Consumer responseConsumer); + + Boolean isEdgeSyncProcessActive(TenantId tenantId, EdgeId edgeId); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java index 6321991586..125fd48463 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.edge.rpc; +import lombok.Getter; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.EntityId; @@ -53,6 +54,7 @@ public class EdgeSyncCursor { private final List fetchers = new LinkedList<>(); + @Getter private int currentIdx = 0; public EdgeSyncCursor(EdgeContextComponent ctx, Edge edge, boolean fullSync) { @@ -62,12 +64,12 @@ public class EdgeSyncCursor { fetchers.add(new RuleChainsEdgeEventFetcher(ctx.getRuleChainService())); fetchers.add(new AdminSettingsEdgeEventFetcher(ctx.getAdminSettingsService())); fetchers.add(new TenantAdminUsersEdgeEventFetcher(ctx.getUserService())); - Customer publicCustomer = ctx.getCustomerService().findOrCreatePublicCustomer(edge.getTenantId()); - fetchers.add(new CustomerEdgeEventFetcher(publicCustomer.getId())); - if (edge.getCustomerId() != null && !EntityId.NULL_UUID.equals(edge.getCustomerId().getId())) { - fetchers.add(new CustomerEdgeEventFetcher(edge.getCustomerId())); - fetchers.add(new CustomerUsersEdgeEventFetcher(ctx.getUserService(), edge.getCustomerId())); - } + } + Customer publicCustomer = ctx.getCustomerService().findOrCreatePublicCustomer(edge.getTenantId()); + fetchers.add(new CustomerEdgeEventFetcher(publicCustomer.getId())); + if (edge.getCustomerId() != null && !EntityId.NULL_UUID.equals(edge.getCustomerId().getId())) { + fetchers.add(new CustomerEdgeEventFetcher(edge.getCustomerId())); + fetchers.add(new CustomerUsersEdgeEventFetcher(ctx.getUserService(), edge.getCustomerId())); } fetchers.add(new DashboardsEdgeEventFetcher(ctx.getDashboardService())); fetchers.add(new DefaultProfilesEdgeEventFetcher(ctx.getDeviceProfileService(), ctx.getAssetProfileService())); @@ -102,9 +104,4 @@ public class EdgeSyncCursor { currentIdx++; return edgeEventFetcher; } - - public int getCurrentIdx() { - return currentIdx; - } - } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index f46c07ab5a..b1c1fc63c5 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -373,7 +373,7 @@ public abstract class BaseEdgeProcessor { private boolean doSaveIfEdgeIsOffline(EdgeEventType type, EdgeEventActionType action) { return switch (action) { - case TIMESERIES_UPDATED, ALARM_ACK, ALARM_CLEAR, ALARM_ASSIGNED, ALARM_UNASSIGNED, CREDENTIALS_REQUEST, ADDED_COMMENT, UPDATED_COMMENT -> + case TIMESERIES_UPDATED, ALARM_ACK, ALARM_CLEAR, ALARM_ASSIGNED, ALARM_UNASSIGNED, ADDED_COMMENT, UPDATED_COMMENT -> true; default -> switch (type) { case ALARM, ALARM_COMMENT, RULE_CHAIN, RULE_CHAIN_METADATA, USER, CUSTOMER, TENANT, TENANT_PROFILE, WIDGETS_BUNDLE, WIDGET_TYPE, diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java index 5e73034b1c..34a3a79acf 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java @@ -43,7 +43,6 @@ import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponseActorMsg; import org.thingsboard.server.dao.exception.DataValidationException; -import org.thingsboard.server.gen.edge.v1.DeviceCredentialsRequestMsg; import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg; import org.thingsboard.server.gen.edge.v1.DeviceRpcCallMsg; import org.thingsboard.server.gen.edge.v1.DeviceUpdateMsg; @@ -70,7 +69,7 @@ public abstract class DeviceEdgeProcessor extends BaseDeviceProcessor implements case ENTITY_CREATED_RPC_MESSAGE: case ENTITY_UPDATED_RPC_MESSAGE: saveOrUpdateDevice(tenantId, deviceId, deviceUpdateMsg, edge); - return saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, deviceId, null); + return Futures.immediateFuture(null); case ENTITY_DELETED_RPC_MESSAGE: Device deviceToDelete = deviceService.findDeviceById(tenantId, deviceId); if (deviceToDelete != null) { @@ -232,6 +231,12 @@ public abstract class DeviceEdgeProcessor extends BaseDeviceProcessor implements DownlinkMsg.Builder builder = DownlinkMsg.newBuilder() .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) .addDeviceUpdateMsg(deviceUpdateMsg); + + DeviceCredentials deviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(edgeEvent.getTenantId(), deviceId); + DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg = ((DeviceMsgConstructor) + deviceMsgConstructorFactory.getMsgConstructorByEdgeVersion(edgeVersion)).constructDeviceCredentialsUpdatedMsg(deviceCredentials); + builder.addDeviceCredentialsUpdateMsg(deviceCredentialsUpdateMsg).build(); + if (UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE.equals(msgType)) { DeviceProfile deviceProfile = deviceProfileService.findDeviceProfileById(edgeEvent.getTenantId(), device.getDeviceProfileId()); deviceProfile = checkIfDeviceProfileDefaultFieldsAssignedToEdge(edgeEvent.getTenantId(), edgeId, deviceProfile, edgeVersion); @@ -269,22 +274,7 @@ public abstract class DeviceEdgeProcessor extends BaseDeviceProcessor implements deviceMsgConstructorFactory.getMsgConstructorByEdgeVersion(edgeVersion)) .constructDeviceRpcCallMsg(edgeEvent.getEntityId(), edgeEvent.getBody())) .build(); - case CREDENTIALS_REQUEST: - return convertCredentialsRequestEventToDownlink(edgeEvent); } return downlinkMsg; } - - private DownlinkMsg convertCredentialsRequestEventToDownlink(EdgeEvent edgeEvent) { - DeviceId deviceId = new DeviceId(edgeEvent.getEntityId()); - DeviceCredentialsRequestMsg deviceCredentialsRequestMsg = DeviceCredentialsRequestMsg.newBuilder() - .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) - .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) - .build(); - DownlinkMsg.Builder builder = DownlinkMsg.newBuilder() - .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) - .addDeviceCredentialsRequestMsg(deviceCredentialsRequestMsg); - return builder.build(); - } - } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/rule/RuleChainEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/rule/RuleChainEdgeProcessor.java index 30f74ae42d..f63aa8b829 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/rule/RuleChainEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/rule/RuleChainEdgeProcessor.java @@ -61,10 +61,19 @@ public class RuleChainEdgeProcessor extends BaseEdgeProcessor { RuleChainUpdateMsg ruleChainUpdateMsg = ((RuleChainMsgConstructor) ruleChainMsgConstructorFactory.getMsgConstructorByEdgeVersion(edgeVersion)) .constructRuleChainUpdatedMsg(msgType, ruleChain, isRoot); - downlinkMsg = DownlinkMsg.newBuilder() + + DownlinkMsg.Builder builder = DownlinkMsg.newBuilder() .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) - .addRuleChainUpdateMsg(ruleChainUpdateMsg) - .build(); + .addRuleChainUpdateMsg(ruleChainUpdateMsg); + + RuleChainMetaData ruleChainMetaData = ruleChainService.loadRuleChainMetaData(edgeEvent.getTenantId(), ruleChainId); + RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = ((RuleChainMsgConstructor) + ruleChainMsgConstructorFactory.getMsgConstructorByEdgeVersion(edgeVersion)) + .constructRuleChainMetadataUpdatedMsg(edgeEvent.getTenantId(), msgType, ruleChainMetaData, edgeVersion); + if (ruleChainMetadataUpdateMsg != null) { + builder.addRuleChainMetadataUpdateMsg(ruleChainMetadataUpdateMsg); + } + downlinkMsg = builder.build(); } } case DELETED, UNASSIGNED_FROM_EDGE -> downlinkMsg = DownlinkMsg.newBuilder() diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/user/UserEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/user/UserEdgeProcessor.java index 607c5e665c..cd072b4a1e 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/user/UserEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/user/UserEdgeProcessor.java @@ -43,10 +43,16 @@ public class UserEdgeProcessor extends BaseEdgeProcessor { User user = userService.findUserById(edgeEvent.getTenantId(), userId); if (user != null) { UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction()); - downlinkMsg = DownlinkMsg.newBuilder() + DownlinkMsg.Builder builder = DownlinkMsg.newBuilder() .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) - .addUserUpdateMsg(((UserMsgConstructor) userMsgConstructorFactory.getMsgConstructorByEdgeVersion(edgeVersion)).constructUserUpdatedMsg(msgType, user)) - .build(); + .addUserUpdateMsg(((UserMsgConstructor) userMsgConstructorFactory.getMsgConstructorByEdgeVersion(edgeVersion)).constructUserUpdatedMsg(msgType, user)); + UserCredentials userCredentialsByUserId = userService.findUserCredentialsByUserId(edgeEvent.getTenantId(), userId); + if (userCredentialsByUserId != null && userCredentialsByUserId.isEnabled()) { + UserCredentialsUpdateMsg userCredentialsUpdateMsg = + ((UserMsgConstructor) userMsgConstructorFactory.getMsgConstructorByEdgeVersion(edgeVersion)).constructUserCredentialsUpdatedMsg(userCredentialsByUserId); + builder.addUserCredentialsUpdateMsg(userCredentialsUpdateMsg); + } + downlinkMsg = builder.build(); } } case DELETED -> downlinkMsg = DownlinkMsg.newBuilder() diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java index 7bee0c6094..176de62f69 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java @@ -43,7 +43,7 @@ public enum EdgeEventActionType { DELETED_COMMENT(ActionType.DELETED_COMMENT), ASSIGNED_TO_EDGE(ActionType.ASSIGNED_TO_EDGE), UNASSIGNED_FROM_EDGE(ActionType.UNASSIGNED_FROM_EDGE), - CREDENTIALS_REQUEST(null), + CREDENTIALS_REQUEST(null), // deprecated ENTITY_MERGE_REQUEST(null); // deprecated private final ActionType actionType; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/edge/FromEdgeSyncResponse.java b/common/message/src/main/java/org/thingsboard/server/common/msg/edge/FromEdgeSyncResponse.java index 733129b4de..baf483310c 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/edge/FromEdgeSyncResponse.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/edge/FromEdgeSyncResponse.java @@ -25,12 +25,13 @@ import java.util.UUID; @Data public class FromEdgeSyncResponse implements EdgeSessionMsg { - private static final long serialVersionUID = -6360890886315347486L; + private static final long serialVersionUID = -6360890556315667486L; private final UUID id; private final TenantId tenantId; private final EdgeId edgeId; private final boolean success; + private final String error; @Override public MsgType getMsgType() { diff --git a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java index 3c4aefd4f3..a5e0712936 100644 --- a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java @@ -167,7 +167,8 @@ public class ProtoUtils { new UUID(proto.getResponseIdMSB(), proto.getResponseIdLSB()), TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())), new EdgeId(new UUID(proto.getEdgeIdMSB(), proto.getEdgeIdLSB())), - proto.getSuccess() + proto.getSuccess(), + proto.getError() ); } diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 50a6140d8b..0ebbc6ea03 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -1148,6 +1148,7 @@ message FromEdgeSyncResponseMsgProto { int64 edgeIdMSB = 5; int64 edgeIdLSB = 6; bool success = 7; + string error = 8; } message DeviceEdgeUpdateMsgProto { diff --git a/common/proto/src/test/java/org/thingsboard/server/common/util/ProtoUtilsTest.java b/common/proto/src/test/java/org/thingsboard/server/common/util/ProtoUtilsTest.java index 904e85f702..7852457277 100644 --- a/common/proto/src/test/java/org/thingsboard/server/common/util/ProtoUtilsTest.java +++ b/common/proto/src/test/java/org/thingsboard/server/common/util/ProtoUtilsTest.java @@ -124,7 +124,7 @@ class ProtoUtilsTest { @Test void protoFromEdgeSyncResponseSerialization() { - FromEdgeSyncResponse msg = new FromEdgeSyncResponse(id, tenantId, edgeId, true); + FromEdgeSyncResponse msg = new FromEdgeSyncResponse(id, tenantId, edgeId, true, null); assertThat(ProtoUtils.fromProto(ProtoUtils.toProto(msg))).as("deserialized").isEqualTo(msg); } diff --git a/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java b/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java index 8783770be3..e650db9608 100644 --- a/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java +++ b/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java @@ -2950,6 +2950,10 @@ public class RestClient implements Closeable { return restTemplate.getForEntity(baseURL + "/api/edges/enabled", Boolean.class).getBody(); } + public Boolean isEdgeSyncProcessActive(EdgeId edgeId) { + return restTemplate.getForEntity(baseURL + "/api/edge/sync/" + edgeId.getId() + "/active", Boolean.class).getBody(); + } + public Edge saveEdge(Edge edge) { return restTemplate.postForEntity(baseURL + "/api/edge", edge, Edge.class).getBody(); } From 61b7bfd849d0dd0503ed9e096c1860f9e8ecb5d2 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Fri, 2 Aug 2024 14:41:36 +0300 Subject: [PATCH 2/5] Fixed FromEdgeSyncResponse in ProtoUtils --- .../thingsboard/server/service/edge/rpc/EdgeGrpcService.java | 2 +- .../java/org/thingsboard/server/common/util/ProtoUtils.java | 1 + .../java/org/thingsboard/server/common/util/ProtoUtilsTest.java | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 19b70f5e87..69495f19bc 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -292,7 +292,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i session.startSyncProcess(true); success = true; } - clusterService.pushEdgeSyncResponseToCore(new FromEdgeSyncResponse(requestId, tenantId, edgeId, success, null)); + clusterService.pushEdgeSyncResponseToCore(new FromEdgeSyncResponse(requestId, tenantId, edgeId, success, "")); } } diff --git a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java index a5e0712936..f9d4237fdc 100644 --- a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java @@ -159,6 +159,7 @@ public class ProtoUtils { .setEdgeIdMSB(response.getEdgeId().getId().getMostSignificantBits()) .setEdgeIdLSB(response.getEdgeId().getId().getLeastSignificantBits()) .setSuccess(response.isSuccess()) + .setError(response.getError()) .build(); } diff --git a/common/proto/src/test/java/org/thingsboard/server/common/util/ProtoUtilsTest.java b/common/proto/src/test/java/org/thingsboard/server/common/util/ProtoUtilsTest.java index 7852457277..a513d3e311 100644 --- a/common/proto/src/test/java/org/thingsboard/server/common/util/ProtoUtilsTest.java +++ b/common/proto/src/test/java/org/thingsboard/server/common/util/ProtoUtilsTest.java @@ -124,7 +124,7 @@ class ProtoUtilsTest { @Test void protoFromEdgeSyncResponseSerialization() { - FromEdgeSyncResponse msg = new FromEdgeSyncResponse(id, tenantId, edgeId, true, null); + FromEdgeSyncResponse msg = new FromEdgeSyncResponse(id, tenantId, edgeId, true, "Error Msg"); assertThat(ProtoUtils.fromProto(ProtoUtils.toProto(msg))).as("deserialized").isEqualTo(msg); } From 62f0ba573925f39206568148a2a65d039b2a63f1 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Fri, 2 Aug 2024 15:20:46 +0300 Subject: [PATCH 3/5] Fixed DeviceEdgeProcessorTest --- .../rpc/processor/device/AbstractDeviceProcessorTest.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/device/AbstractDeviceProcessorTest.java b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/device/AbstractDeviceProcessorTest.java index 0ec40fb908..31524e467a 100644 --- a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/device/AbstractDeviceProcessorTest.java +++ b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/device/AbstractDeviceProcessorTest.java @@ -30,6 +30,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.gen.edge.v1.DeviceProfileUpdateMsg; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessorTest; @@ -61,6 +62,9 @@ public abstract class AbstractDeviceProcessorTest extends BaseEdgeProcessorTest deviceProfile.setProfileData(deviceProfileData); deviceProfile.setTransportType(DeviceTransportType.DEFAULT); + DeviceCredentials deviceCredentials = new DeviceCredentials(); + deviceCredentials.setDeviceId(deviceId); + Device device = new Device(); device.setDeviceProfileId(deviceProfileId); device.setId(deviceId); @@ -71,9 +75,9 @@ public abstract class AbstractDeviceProcessorTest extends BaseEdgeProcessorTest edgeEvent.setTenantId(tenantId); edgeEvent.setAction(EdgeEventActionType.ADDED); - willReturn(device).given(deviceService).findDeviceById(tenantId, deviceId); willReturn(deviceProfile).given(deviceProfileService).findDeviceProfileById(tenantId, deviceProfileId); + willReturn(deviceCredentials).given(deviceCredentialsService).findDeviceCredentialsByDeviceId(tenantId, deviceId); } protected void updateDeviceProfileDefaultFields(long expectedDashboardIdMSB, long expectedDashboardIdLSB, From 8e8aa5c920f76a973e835c96f57ec31c397aaf67 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Mon, 5 Aug 2024 18:12:23 +0300 Subject: [PATCH 4/5] Remove check if edge is in sync. Fixed misc test according to new changes --- .../server/controller/EdgeController.java | 18 ----- .../service/edge/rpc/EdgeGrpcService.java | 25 +++---- .../service/edge/rpc/EdgeRpcService.java | 2 - .../processor/device/DeviceEdgeProcessor.java | 10 +-- .../server/controller/EdgeControllerTest.java | 66 +++++++++++++++++-- .../server/edge/AbstractEdgeTest.java | 10 ++- .../server/edge/DeviceEdgeTest.java | 52 ++++----------- .../server/edge/RuleChainEdgeTest.java | 4 +- .../server/edge/TelemetryEdgeTest.java | 6 +- .../thingsboard/server/edge/UserEdgeTest.java | 20 +++--- .../thingsboard/rest/client/RestClient.java | 4 -- 11 files changed, 115 insertions(+), 102 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/EdgeController.java b/application/src/main/java/org/thingsboard/server/controller/EdgeController.java index 011f2d6614..26c30f85d1 100644 --- a/application/src/main/java/org/thingsboard/server/controller/EdgeController.java +++ b/application/src/main/java/org/thingsboard/server/controller/EdgeController.java @@ -494,24 +494,6 @@ public class EdgeController extends BaseController { } } - @ApiOperation(value = "Is edge sync process is active (isEdgeSyncProcessActive)", - notes = "Returns 'true' if edge is currently in sync process, 'false' - otherwise." + TENANT_AUTHORITY_PARAGRAPH) - @PreAuthorize("hasAuthority('TENANT_ADMIN')") - @GetMapping(value = "/edge/sync/{edgeId}/active") - public Boolean isEdgeSyncProcessActive( - @Parameter(description = EDGE_ID_PARAM_DESCRIPTION, required = true) - @PathVariable("edgeId") String strEdgeId) throws ThingsboardException { - checkParameter("edgeId", strEdgeId); - if (isEdgesEnabled() && edgeRpcServiceOpt.isPresent()) { - EdgeId edgeId = new EdgeId(toUUID(strEdgeId)); - edgeId = checkNotNull(edgeId); - Edge edge = checkEdgeId(edgeId, Operation.READ); - return edgeRpcServiceOpt.get().isEdgeSyncProcessActive(edge.getTenantId(), edge.getId()); - } else { - throw new ThingsboardException("Edges support disabled", ThingsboardErrorCode.GENERAL); - } - } - @ApiOperation(value = "Find missing rule chains (findMissingToRelatedRuleChains)", notes = "Returns list of rule chains ids that are not assigned to particular edge, but these rule chains are present in the already assigned rule chains to edge." + TENANT_AUTHORITY_PARAGRAPH) @PreAuthorize("hasAuthority('TENANT_ADMIN')") diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 69495f19bc..46bf3f69a4 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -287,12 +287,16 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private void startSyncProcess(TenantId tenantId, EdgeId edgeId, UUID requestId) { EdgeGrpcSession session = sessions.get(edgeId); if (session != null) { - boolean success = false; - if (session.isConnected()) { - session.startSyncProcess(true); - success = true; + if (!session.isSyncCompleted()) { + clusterService.pushEdgeSyncResponseToCore(new FromEdgeSyncResponse(requestId, tenantId, edgeId, false, "Sync process is active at the moment")); + } else { + boolean success = false; + if (session.isConnected()) { + session.startSyncProcess(true); + success = true; + } + clusterService.pushEdgeSyncResponseToCore(new FromEdgeSyncResponse(requestId, tenantId, edgeId, success, "")); } - clusterService.pushEdgeSyncResponseToCore(new FromEdgeSyncResponse(requestId, tenantId, edgeId, success, "")); } } @@ -300,7 +304,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i public void processSyncRequest(ToEdgeSyncRequest request, Consumer responseConsumer) { UUID requestId = request.getId(); EdgeGrpcSession session = sessions.get(request.getEdgeId()); - if (!session.isSyncCompleted()) { + if (session != null && !session.isSyncCompleted()) { responseConsumer.accept(new FromEdgeSyncResponse(requestId, request.getTenantId(), request.getEdgeId(), false, "Sync process is active at the moment")); } else { log.trace("[{}][{}] Processing sync edge request [{}]", request.getTenantId(), request.getId(), request.getEdgeId()); @@ -310,15 +314,6 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } } - @Override - public Boolean isEdgeSyncProcessActive(TenantId tenantId, EdgeId edgeId) { - EdgeGrpcSession session = sessions.get(edgeId); - if (session == null) { - return false; - } - return !session.isSyncCompleted(); - } - private void scheduleSyncRequestTimeout(ToEdgeSyncRequest request, UUID requestId) { log.trace("[{}] scheduling sync edge request", requestId); executorService.schedule(() -> { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java index 733d2e95cb..12203db0c2 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java @@ -33,6 +33,4 @@ public interface EdgeRpcService { void deleteEdge(TenantId tenantId, EdgeId edgeId); void processSyncRequest(ToEdgeSyncRequest request, Consumer responseConsumer); - - Boolean isEdgeSyncProcessActive(TenantId tenantId, EdgeId edgeId); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java index 34a3a79acf..f448766994 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java @@ -231,12 +231,12 @@ public abstract class DeviceEdgeProcessor extends BaseDeviceProcessor implements DownlinkMsg.Builder builder = DownlinkMsg.newBuilder() .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) .addDeviceUpdateMsg(deviceUpdateMsg); - DeviceCredentials deviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(edgeEvent.getTenantId(), deviceId); - DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg = ((DeviceMsgConstructor) - deviceMsgConstructorFactory.getMsgConstructorByEdgeVersion(edgeVersion)).constructDeviceCredentialsUpdatedMsg(deviceCredentials); - builder.addDeviceCredentialsUpdateMsg(deviceCredentialsUpdateMsg).build(); - + if (deviceCredentials != null) { + DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg = ((DeviceMsgConstructor) + deviceMsgConstructorFactory.getMsgConstructorByEdgeVersion(edgeVersion)).constructDeviceCredentialsUpdatedMsg(deviceCredentials); + builder.addDeviceCredentialsUpdateMsg(deviceCredentialsUpdateMsg).build(); + } if (UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE.equals(msgType)) { DeviceProfile deviceProfile = deviceProfileService.findDeviceProfileById(edgeEvent.getTenantId(), device.getDeviceProfileId()); deviceProfile = checkIfDeviceProfileDefaultFieldsAssignedToEdge(edgeEvent.getTenantId(), edgeId, deviceProfile, edgeVersion); diff --git a/application/src/test/java/org/thingsboard/server/controller/EdgeControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/EdgeControllerTest.java index 49dfa71223..de530b5268 100644 --- a/application/src/test/java/org/thingsboard/server/controller/EdgeControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/EdgeControllerTest.java @@ -55,14 +55,18 @@ import org.thingsboard.server.common.data.asset.AssetProfile; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.data.rule.RuleChain; +import org.thingsboard.server.common.data.rule.RuleChainMetaData; import org.thingsboard.server.common.data.security.Authority; +import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.data.security.model.JwtSettings; import org.thingsboard.server.dao.edge.EdgeDao; import org.thingsboard.server.dao.exception.DataValidationException; @@ -73,11 +77,13 @@ import org.thingsboard.server.gen.edge.v1.AdminSettingsUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetUpdateMsg; import org.thingsboard.server.gen.edge.v1.CustomerUpdateMsg; +import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg; import org.thingsboard.server.gen.edge.v1.DeviceProfileUpdateMsg; import org.thingsboard.server.gen.edge.v1.DeviceUpdateMsg; import org.thingsboard.server.gen.edge.v1.EdgeVersion; import org.thingsboard.server.gen.edge.v1.OAuth2UpdateMsg; import org.thingsboard.server.gen.edge.v1.QueueUpdateMsg; +import org.thingsboard.server.gen.edge.v1.RuleChainMetadataUpdateMsg; import org.thingsboard.server.gen.edge.v1.RuleChainUpdateMsg; import org.thingsboard.server.gen.edge.v1.SyncCompletedMsg; import org.thingsboard.server.gen.edge.v1.TenantProfileUpdateMsg; @@ -93,6 +99,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.containsString; @@ -887,23 +894,24 @@ public class EdgeControllerTest extends AbstractControllerTest { edgeImitator.ignoreType(UserCredentialsUpdateMsg.class); edgeImitator.ignoreType(OAuth2UpdateMsg.class); - edgeImitator.expectMessageAmount(24); + edgeImitator.expectMessageAmount(27); edgeImitator.connect(); waitForMessages(edgeImitator); - verifyFetchersMsgs(edgeImitator); + verifyFetchersMsgs(edgeImitator, savedDevice); // verify queue msgs Assert.assertTrue(popDeviceProfileMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "default")); Assert.assertTrue(popDeviceMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "Test Sync Edge Device 1")); + Assert.assertTrue(popDeviceCredentialsMsg(edgeImitator.getDownlinkMsgs(), savedDevice.getId())); Assert.assertTrue(popAssetProfileMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "test")); Assert.assertTrue(popAssetMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "Test Sync Edge Asset 1")); Assert.assertTrue(edgeImitator.getDownlinkMsgs().isEmpty()); - edgeImitator.expectMessageAmount(20); + edgeImitator.expectMessageAmount(22); doPost("/api/edge/sync/" + edge.getId()); waitForMessages(edgeImitator); - verifyFetchersMsgs(edgeImitator); + verifyFetchersMsgs(edgeImitator, savedDevice); Assert.assertTrue(edgeImitator.getDownlinkMsgs().isEmpty()); edgeImitator.allowIgnoredTypes(); @@ -920,6 +928,23 @@ public class EdgeControllerTest extends AbstractControllerTest { .andExpect(status().isOk()); } + private RuleChainId getEdgeRootRuleChainId(EdgeImitator edgeImitator) { + try { + EdgeId edgeId = new EdgeId(new UUID(edgeImitator.getConfiguration().getEdgeIdMSB(), edgeImitator.getConfiguration().getEdgeIdLSB())); + List edgeRuleChains = doGetTypedWithPageLink("/api/edge/" + edgeId.getId() + "/ruleChains?", + new TypeReference>() { + }, new PageLink(100)).getData(); + for (RuleChain edgeRuleChain : edgeRuleChains) { + if (edgeRuleChain.isRoot()) { + return edgeRuleChain.getId(); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + throw new RuntimeException("Root rule chain not found"); + } + private void simulateEdgeActivation(Edge edge) throws Exception { ObjectNode attributes = JacksonUtil.newObjectNode(); attributes.put("active", true); @@ -949,9 +974,10 @@ public class EdgeControllerTest extends AbstractControllerTest { } } - private void verifyFetchersMsgs(EdgeImitator edgeImitator) { + private void verifyFetchersMsgs(EdgeImitator edgeImitator, Device savedDevice) { Assert.assertTrue(popQueueMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "Main")); Assert.assertTrue(popRuleChainMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "Edge Root Rule Chain")); + Assert.assertTrue(popRuleChainMetadataMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, getEdgeRootRuleChainId(edgeImitator))); Assert.assertTrue(popAdminSettingsMsg(edgeImitator.getDownlinkMsgs(), "general")); Assert.assertTrue(popAdminSettingsMsg(edgeImitator.getDownlinkMsgs(), "mail")); Assert.assertTrue(popAdminSettingsMsg(edgeImitator.getDownlinkMsgs(), "connectivity")); @@ -965,6 +991,7 @@ public class EdgeControllerTest extends AbstractControllerTest { Assert.assertTrue(popCustomerMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "Public")); Assert.assertTrue(popDeviceProfileMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "default")); Assert.assertTrue(popDeviceMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "Test Sync Edge Device 1")); + Assert.assertTrue(popDeviceCredentialsMsg(edgeImitator.getDownlinkMsgs(), savedDevice.getId())); Assert.assertTrue(popAssetProfileMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "test")); Assert.assertTrue(popAssetMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "Test Sync Edge Asset 1")); Assert.assertTrue(popTenantMsg(edgeImitator.getDownlinkMsgs(), tenantId)); @@ -1002,6 +1029,21 @@ public class EdgeControllerTest extends AbstractControllerTest { return false; } + private boolean popRuleChainMetadataMsg(List messages, UpdateMsgType msgType, RuleChainId ruleChainId) { + for (AbstractMessage message : messages) { + if (message instanceof RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg) { + RuleChainMetaData ruleChainMetaData = JacksonUtil.fromString(ruleChainMetadataUpdateMsg.getEntity(), RuleChainMetaData.class, true); + Assert.assertNotNull(ruleChainMetaData); + if (msgType.equals(ruleChainMetadataUpdateMsg.getMsgType()) + && ruleChainId.equals(ruleChainMetaData.getRuleChainId())) { + messages.remove(message); + return true; + } + } + } + return false; + } + private boolean popAdminSettingsMsg(List messages, String key) { for (AbstractMessage message : messages) { if (message instanceof AdminSettingsUpdateMsg adminSettingsUpdateMsg) { @@ -1046,6 +1088,20 @@ public class EdgeControllerTest extends AbstractControllerTest { return false; } + private boolean popDeviceCredentialsMsg(List messages, DeviceId deviceId) { + for (AbstractMessage message : messages) { + if (message instanceof DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) { + DeviceCredentials deviceCredentials = JacksonUtil.fromString(deviceCredentialsUpdateMsg.getEntity(), DeviceCredentials.class, true); + Assert.assertNotNull(deviceCredentials); + if (deviceId.equals(deviceCredentials.getDeviceId())) { + messages.remove(message); + return true; + } + } + } + return false; + } + private boolean popAssetProfileMsg(List messages, UpdateMsgType msgType, String name) { for (AbstractMessage message : messages) { if (message instanceof AssetProfileUpdateMsg assetProfileUpdateMsg) { diff --git a/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java index e7f664477c..f7a066f7e0 100644 --- a/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java @@ -83,6 +83,7 @@ import org.thingsboard.server.gen.edge.v1.AdminSettingsUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetUpdateMsg; import org.thingsboard.server.gen.edge.v1.CustomerUpdateMsg; +import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg; import org.thingsboard.server.gen.edge.v1.DeviceProfileUpdateMsg; import org.thingsboard.server.gen.edge.v1.DeviceUpdateMsg; import org.thingsboard.server.gen.edge.v1.EdgeConfiguration; @@ -96,6 +97,7 @@ import org.thingsboard.server.gen.edge.v1.TenantProfileUpdateMsg; import org.thingsboard.server.gen.edge.v1.TenantUpdateMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.edge.v1.UplinkMsg; +import org.thingsboard.server.gen.edge.v1.UserCredentialsUpdateMsg; import org.thingsboard.server.gen.edge.v1.UserUpdateMsg; import java.util.ArrayList; @@ -143,7 +145,7 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { edgeImitator = new EdgeImitator("localhost", 7070, edge.getRoutingKey(), edge.getSecret()); edgeImitator.ignoreType(OAuth2UpdateMsg.class); - edgeImitator.expectMessageAmount(21); + edgeImitator.expectMessageAmount(24); edgeImitator.connect(); requestEdgeRuleChainMetadata(); @@ -250,7 +252,7 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { UUID ruleChainUUID = validateRuleChains(); // 1 from request message - validateMsgsCnt(RuleChainMetadataUpdateMsg.class, 1); + validateMsgsCnt(RuleChainMetadataUpdateMsg.class, 2); validateRuleChainMetadataUpdates(ruleChainUUID); // 4 messages ('general', 'mail', 'connectivity', 'jwt') @@ -275,6 +277,8 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { validateMsgsCnt(DeviceUpdateMsg.class, 1); validateDevices(); + validateMsgsCnt(DeviceCredentialsUpdateMsg.class, 1); + // 1 from asset fetcher validateMsgsCnt(AssetUpdateMsg.class, 1); validateAssets(); @@ -287,6 +291,8 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { validateMsgsCnt(UserUpdateMsg.class, 1); validateUsers(); + validateMsgsCnt(UserCredentialsUpdateMsg.class, 1); + // 1 from tenant fetcher validateMsgsCnt(TenantUpdateMsg.class, 1); validateTenant(); diff --git a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java index 9e09a16fd9..c5f34e4f47 100644 --- a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java @@ -150,25 +150,25 @@ public class DeviceEdgeTest extends AbstractEdgeTest { + "/edge/" + edge.getUuidId(), Edge.class); Assert.assertTrue(edgeImitator.waitForMessages()); - edgeImitator.expectMessageAmount(1); + edgeImitator.expectMessageAmount(2); doPost("/api/customer/" + savedCustomer.getUuidId() + "/device/" + savedDevice.getUuidId(), Device.class); Assert.assertTrue(edgeImitator.waitForMessages()); - latestMessage = edgeImitator.getLatestMessage(); - Assert.assertTrue(latestMessage instanceof DeviceUpdateMsg); - deviceUpdateMsg = (DeviceUpdateMsg) latestMessage; + deviceUpdateMsgOpt = edgeImitator.findMessageByType(DeviceUpdateMsg.class); + Assert.assertTrue(deviceUpdateMsgOpt.isPresent()); + deviceUpdateMsg = deviceUpdateMsgOpt.get(); deviceFromMsg = JacksonUtil.fromString(deviceUpdateMsg.getEntity(), Device.class, true); Assert.assertNotNull(deviceFromMsg); Assert.assertEquals(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, deviceUpdateMsg.getMsgType()); Assert.assertEquals(savedCustomer.getId(), deviceFromMsg.getCustomerId()); // unassign device #2 from customer - edgeImitator.expectMessageAmount(1); + edgeImitator.expectMessageAmount(2); doDelete("/api/customer/device/" + savedDevice.getUuidId(), Device.class); Assert.assertTrue(edgeImitator.waitForMessages()); - latestMessage = edgeImitator.getLatestMessage(); - Assert.assertTrue(latestMessage instanceof DeviceUpdateMsg); - deviceUpdateMsg = (DeviceUpdateMsg) latestMessage; + deviceUpdateMsgOpt = edgeImitator.findMessageByType(DeviceUpdateMsg.class); + Assert.assertTrue(deviceUpdateMsgOpt.isPresent()); + deviceUpdateMsg = deviceUpdateMsgOpt.get(); deviceFromMsg = JacksonUtil.fromString(deviceUpdateMsg.getEntity(), Device.class, true); Assert.assertNotNull(deviceFromMsg); Assert.assertEquals(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, deviceUpdateMsg.getMsgType()); @@ -243,7 +243,7 @@ public class DeviceEdgeTest extends AbstractEdgeTest { Assert.assertTrue(edgeImitator.waitForMessages()); // update device - edgeImitator.expectMessageAmount(1); + edgeImitator.expectMessageAmount(2); savedDevice.setFirmwareId(firmwareOtaPackageInfo.getId()); savedDevice.setSoftwareId(softwareOtaPackageInfo.getId()); @@ -256,9 +256,9 @@ public class DeviceEdgeTest extends AbstractEdgeTest { savedDevice = doPost("/api/device", savedDevice, Device.class); Assert.assertTrue(edgeImitator.waitForMessages()); - AbstractMessage latestMessage = edgeImitator.getLatestMessage(); - Assert.assertTrue(latestMessage instanceof DeviceUpdateMsg); - DeviceUpdateMsg deviceUpdateMsg = (DeviceUpdateMsg) latestMessage; + Optional deviceUpdateMsgOpt = edgeImitator.findMessageByType(DeviceUpdateMsg.class); + Assert.assertTrue(deviceUpdateMsgOpt.isPresent()); + DeviceUpdateMsg deviceUpdateMsg = deviceUpdateMsgOpt.get(); Device deviceMsg = JacksonUtil.fromString(deviceUpdateMsg.getEntity(), Device.class, true); Assert.assertNotNull(deviceMsg); Assert.assertEquals(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, deviceUpdateMsg.getMsgType()); @@ -504,7 +504,7 @@ public class DeviceEdgeTest extends AbstractEdgeTest { uplinkMsgBuilder.addDeviceUpdateMsg(deviceUpdateMsgBuilder.build()); edgeImitator.expectResponsesAmount(1); - edgeImitator.expectMessageAmount(2); + edgeImitator.expectMessageAmount(1); testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder); edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); @@ -526,18 +526,6 @@ public class DeviceEdgeTest extends AbstractEdgeTest { Device device = doGet("/api/device/" + newDeviceId, Device.class); Assert.assertNotNull(device); Assert.assertNotEquals(deviceOnCloudName, device.getName()); - - Optional deviceCredentialsUpdateMsgOpt = edgeImitator.findMessageByType(DeviceCredentialsRequestMsg.class); - Assert.assertTrue(deviceCredentialsUpdateMsgOpt.isPresent()); - DeviceCredentialsRequestMsg latestDeviceCredentialsRequestMsg = deviceCredentialsUpdateMsgOpt.get(); - Assert.assertEquals(deviceMsg.getUuidId().getMostSignificantBits(), latestDeviceCredentialsRequestMsg.getDeviceIdMSB()); - Assert.assertEquals(device.getUuidId().getLeastSignificantBits(), latestDeviceCredentialsRequestMsg.getDeviceIdLSB()); - - newDeviceId = new UUID(latestDeviceCredentialsRequestMsg.getDeviceIdMSB(), latestDeviceCredentialsRequestMsg.getDeviceIdLSB()); - - device = doGet("/api/device/" + newDeviceId, Device.class); - Assert.assertNotNull(device); - Assert.assertNotEquals(deviceOnCloudName, device.getName()); } @Test @@ -553,22 +541,10 @@ public class DeviceEdgeTest extends AbstractEdgeTest { uplinkMsgBuilder.addDeviceUpdateMsg(deviceUpdateMsgBuilder.build()); edgeImitator.expectResponsesAmount(1); - edgeImitator.expectMessageAmount(1); - edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); - Assert.assertTrue(edgeImitator.waitForResponses()); - Assert.assertTrue(edgeImitator.waitForMessages()); - AbstractMessage latestMessage = edgeImitator.getLatestMessage(); - Assert.assertTrue(latestMessage instanceof DeviceCredentialsRequestMsg); - DeviceCredentialsRequestMsg latestDeviceCredentialsRequestMsg = (DeviceCredentialsRequestMsg) latestMessage; - Assert.assertEquals(deviceMsg.getUuidId().getMostSignificantBits(), latestDeviceCredentialsRequestMsg.getDeviceIdMSB()); - Assert.assertEquals(deviceMsg.getUuidId().getLeastSignificantBits(), latestDeviceCredentialsRequestMsg.getDeviceIdLSB()); - - UUID newDeviceId = new UUID(latestDeviceCredentialsRequestMsg.getDeviceIdMSB(), latestDeviceCredentialsRequestMsg.getDeviceIdLSB()); - - Device device = doGet("/api/device/" + newDeviceId, Device.class); + Device device = doGet("/api/device/" + deviceMsg.getId().getId(), Device.class); Assert.assertNotNull(device); Assert.assertEquals("Edge Device 2", device.getName()); } diff --git a/application/src/test/java/org/thingsboard/server/edge/RuleChainEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/RuleChainEdgeTest.java index 62d2c5eade..c694df84f8 100644 --- a/application/src/test/java/org/thingsboard/server/edge/RuleChainEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/RuleChainEdgeTest.java @@ -193,7 +193,7 @@ public class RuleChainEdgeTest extends AbstractEdgeTest { ruleChain.setType(RuleChainType.EDGE); RuleChain savedRuleChain = doPost("/api/ruleChain", ruleChain, RuleChain.class); - edgeImitator.expectMessageAmount(2); + edgeImitator.expectMessageAmount(4); doPost("/api/edge/" + edge.getUuidId() + "/ruleChain/" + savedRuleChain.getUuidId(), RuleChain.class); RuleChainMetaData metaData = createRuleChainMetadata(savedRuleChain); @@ -201,7 +201,7 @@ public class RuleChainEdgeTest extends AbstractEdgeTest { // set new rule chain as root RuleChainId currentRootRuleChainId = edge.getRootRuleChainId(); - edgeImitator.expectMessageAmount(1); + edgeImitator.expectMessageAmount(2); doPost("/api/edge/" + edge.getUuidId() + "/" + savedRuleChain.getUuidId() + "/root", Edge.class); Assert.assertTrue(edgeImitator.waitForMessages()); diff --git a/application/src/test/java/org/thingsboard/server/edge/TelemetryEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/TelemetryEdgeTest.java index 00b8cc4b74..b77e0b5ccc 100644 --- a/application/src/test/java/org/thingsboard/server/edge/TelemetryEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/TelemetryEdgeTest.java @@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.gen.edge.v1.AttributeDeleteMsg; +import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg; import org.thingsboard.server.gen.edge.v1.DeviceUpdateMsg; import org.thingsboard.server.gen.edge.v1.EntityDataProto; import org.thingsboard.server.gen.edge.v1.UplinkMsg; @@ -183,7 +184,7 @@ public class TelemetryEdgeTest extends AbstractEdgeTest { edgeImitator.setRandomFailuresOnTimeseriesDownlink(true); // imitator will generate failure in 100% of timeseries cases edgeImitator.setFailureProbability(100); - edgeImitator.expectMessageAmount(numberOfMsgsToSend); + edgeImitator.expectMessageAmount(numberOfMsgsToSend * 2); for (int idx = 1; idx <= numberOfMsgsToSend; idx++) { String timeseriesData = "{\"data\":{\"idx\":" + idx + "},\"ts\":" + System.currentTimeMillis() + "}"; JsonNode timeseriesEntityData = JacksonUtil.toJsonNode(timeseriesData); @@ -204,6 +205,9 @@ public class TelemetryEdgeTest extends AbstractEdgeTest { List deviceUpdateMsgs = edgeImitator.findAllMessagesByType(DeviceUpdateMsg.class); Assert.assertEquals(numberOfMsgsToSend, deviceUpdateMsgs.size()); + List deviceCredentialsUpdateMsgs = edgeImitator.findAllMessagesByType(DeviceCredentialsUpdateMsg.class); + Assert.assertEquals(numberOfMsgsToSend, deviceCredentialsUpdateMsgs.size()); + edgeImitator.setRandomFailuresOnTimeseriesDownlink(false); } diff --git a/application/src/test/java/org/thingsboard/server/edge/UserEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/UserEdgeTest.java index 6314cf53b9..503d87cf6f 100644 --- a/application/src/test/java/org/thingsboard/server/edge/UserEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/UserEdgeTest.java @@ -71,13 +71,13 @@ public class UserEdgeTest extends AbstractEdgeTest { Assert.assertTrue(userCredentialsUpdateMsgOpt.isPresent()); // update user - edgeImitator.expectMessageAmount(1); + edgeImitator.expectMessageAmount(2); savedTenantAdmin.setLastName("Borisov"); savedTenantAdmin = doPost("/api/user", savedTenantAdmin, User.class); Assert.assertTrue(edgeImitator.waitForMessages()); - AbstractMessage latestMessage = edgeImitator.getLatestMessage(); - Assert.assertTrue(latestMessage instanceof UserUpdateMsg); - userUpdateMsg = (UserUpdateMsg) latestMessage; + userUpdateMsgOpt = edgeImitator.findMessageByType(UserUpdateMsg.class); + Assert.assertTrue(userUpdateMsgOpt.isPresent()); + userUpdateMsg = userUpdateMsgOpt.get(); userMsg = JacksonUtil.fromString(userUpdateMsg.getEntity(), User.class, true); Assert.assertNotNull(userMsg); Assert.assertEquals(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, userUpdateMsg.getMsgType()); @@ -92,7 +92,7 @@ public class UserEdgeTest extends AbstractEdgeTest { changePasswordRequest.setNewPassword("newTenant"); doPost("/api/auth/changePassword", changePasswordRequest); Assert.assertTrue(edgeImitator.waitForMessages()); - latestMessage = edgeImitator.getLatestMessage(); + AbstractMessage latestMessage = edgeImitator.getLatestMessage(); Assert.assertTrue(latestMessage instanceof UserCredentialsUpdateMsg); UserCredentialsUpdateMsg userCredentialsUpdateMsg = (UserCredentialsUpdateMsg) latestMessage; UserCredentials userCredentialsMsg = JacksonUtil.fromString(userCredentialsUpdateMsg.getEntity(), UserCredentials.class, true); @@ -155,13 +155,13 @@ public class UserEdgeTest extends AbstractEdgeTest { Assert.assertEquals(savedCustomerUser.getLastName(), userMsg.getLastName()); // update user - edgeImitator.expectMessageAmount(1); + edgeImitator.expectMessageAmount(2); savedCustomerUser.setLastName("Addams"); savedCustomerUser = doPost("/api/user", savedCustomerUser, User.class); Assert.assertTrue(edgeImitator.waitForMessages()); - AbstractMessage latestMessage = edgeImitator.getLatestMessage(); - Assert.assertTrue(latestMessage instanceof UserUpdateMsg); - userUpdateMsg = (UserUpdateMsg) latestMessage; + userUpdateMsgOpt = edgeImitator.findMessageByType(UserUpdateMsg.class); + Assert.assertTrue(userUpdateMsgOpt.isPresent()); + userUpdateMsg = userUpdateMsgOpt.get(); userMsg = JacksonUtil.fromString(userUpdateMsg.getEntity(), User.class, true); Assert.assertNotNull(userMsg); Assert.assertEquals(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, userUpdateMsg.getMsgType()); @@ -176,7 +176,7 @@ public class UserEdgeTest extends AbstractEdgeTest { changePasswordRequest.setNewPassword("newCustomer"); doPost("/api/auth/changePassword", changePasswordRequest); Assert.assertTrue(edgeImitator.waitForMessages()); - latestMessage = edgeImitator.getLatestMessage(); + AbstractMessage latestMessage = edgeImitator.getLatestMessage(); Assert.assertTrue(latestMessage instanceof UserCredentialsUpdateMsg); UserCredentialsUpdateMsg userCredentialsUpdateMsg = (UserCredentialsUpdateMsg) latestMessage; UserCredentials userCredentialsMsg = JacksonUtil.fromString(userCredentialsUpdateMsg.getEntity(), UserCredentials.class, true); diff --git a/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java b/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java index 40aab430da..c116eddc74 100644 --- a/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java +++ b/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java @@ -2950,10 +2950,6 @@ public class RestClient implements Closeable { return restTemplate.getForEntity(baseURL + "/api/edges/enabled", Boolean.class).getBody(); } - public Boolean isEdgeSyncProcessActive(EdgeId edgeId) { - return restTemplate.getForEntity(baseURL + "/api/edge/sync/" + edgeId.getId() + "/active", Boolean.class).getBody(); - } - public Edge saveEdge(Edge edge) { return restTemplate.postForEntity(baseURL + "/api/edge", edge, Edge.class).getBody(); } From 8343c602968c5c6891ba37cf57222446bdb4fec1 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Wed, 7 Aug 2024 12:57:35 +0300 Subject: [PATCH 5/5] DefaultEdgeRequestsService - logging attributes and entityData in case failure --- .../rpc/sync/DefaultEdgeRequestsService.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java index 3f61734801..6e4117869b 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java @@ -82,8 +82,6 @@ import java.util.UUID; @Slf4j public class DefaultEdgeRequestsService implements EdgeRequestsService { - private static final int DEFAULT_PAGE_SIZE = 1000; - @Autowired private EdgeEventService edgeEventService; @@ -142,8 +140,10 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { private ListenableFuture processEntityAttributesAndAddToEdgeQueue(TenantId tenantId, EntityId entityId, Edge edge, EdgeEventType entityType, String scope, List ssAttributes, AttributesRequestMsg attributesRequestMsg) { + Map entityData = null; + ObjectNode attributes = null; + ListenableFuture future; try { - ListenableFuture future; if (ssAttributes == null || ssAttributes.isEmpty()) { log.trace("[{}][{}] No attributes found for entity {} [{}]", tenantId, edge.getName(), @@ -151,8 +151,8 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { entityId.getId()); future = Futures.immediateFuture(null); } else { - Map entityData = new HashMap<>(); - ObjectNode attributes = JacksonUtil.newObjectNode(); + entityData = new HashMap<>(); + attributes = JacksonUtil.newObjectNode(); for (AttributeKvEntry attr : ssAttributes) { if (DefaultDeviceStateService.PERSISTENT_ATTRIBUTES.contains(attr.getKey()) && !DefaultDeviceStateService.INACTIVITY_TIMEOUT.equals(attr.getKey())) { @@ -170,7 +170,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { attributes.put(attr.getKey(), attr.getValueAsString()); } } - if (attributes.size() > 0) { + if (!attributes.isEmpty()) { entityData.put("kv", attributes); entityData.put("scope", scope); JsonNode body = JacksonUtil.valueToTree(entityData); @@ -182,12 +182,13 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { } return Futures.transformAsync(future, v -> processLatestTimeseriesAndAddToEdgeQueue(tenantId, entityId, edge, entityType), dbCallbackExecutorService); } catch (Exception e) { - String errMsg = String.format("[%s][%s] Failed to save attribute updates to the edge [%s]", tenantId, edge.getId(), attributesRequestMsg); + String errMsg = String.format("[%s][%s] Failed to save attribute updates to the edge [%s], scope = %s, entityData = %s, attributes = %s", + tenantId, edge.getId(), attributesRequestMsg, scope, entityData, attributes); log.error(errMsg, e); return Futures.immediateFailedFuture(new RuntimeException(errMsg, e)); } } - + private ListenableFuture processLatestTimeseriesAndAddToEdgeQueue(TenantId tenantId, EntityId entityId, Edge edge, EdgeEventType entityType) { ListenableFuture> getAllLatestFuture = timeseriesService.findAllLatest(tenantId, entityId);