diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EdgeProcessor.java index c1fff8258f..d4f6998926 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EdgeProcessor.java @@ -69,46 +69,40 @@ public class EdgeProcessor extends BaseEdgeProcessor { try { EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); - ListenableFuture edgeFuture; switch (actionType) { case ASSIGNED_TO_CUSTOMER: CustomerId customerId = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), CustomerId.class); - edgeFuture = edgeService.findEdgeByIdAsync(tenantId, edgeId); - return Futures.transformAsync(edgeFuture, edge -> { - if (edge == null || customerId.isNullUid()) { - return Futures.immediateFuture(null); - } - List> futures = new ArrayList<>(); - futures.add(saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, EdgeEventActionType.ADDED, customerId, null)); - PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); - PageData pageData; - do { - pageData = userService.findCustomerUsers(tenantId, customerId, pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - log.trace("[{}] [{}] user(s) are going to be added to edge.", edge.getId(), pageData.getData().size()); - for (User user : pageData.getData()) { - futures.add(saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.USER, EdgeEventActionType.ADDED, user.getId(), null)); - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } + Edge edge = edgeService.findEdgeById(tenantId, edgeId); + if (edge == null || customerId.isNullUid()) { + return Futures.immediateFuture(null); + } + List> futures = new ArrayList<>(); + futures.add(saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, EdgeEventActionType.ADDED, customerId, null)); + futures.add(saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.EDGE, EdgeEventActionType.ASSIGNED_TO_CUSTOMER, edgeId, null)); + PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); + PageData pageData; + do { + pageData = userService.findCustomerUsers(tenantId, customerId, pageLink); + if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { + log.trace("[{}] [{}] user(s) are going to be added to edge.", edge.getId(), pageData.getData().size()); + for (User user : pageData.getData()) { + futures.add(saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.USER, EdgeEventActionType.ADDED, user.getId(), null)); + } + if (pageData.hasNext()) { + pageLink = pageLink.nextPageLink(); } - } while (pageData != null && pageData.hasNext()); - return Futures.transformAsync(Futures.allAsList(futures), voids -> - saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.EDGE, EdgeEventActionType.ASSIGNED_TO_CUSTOMER, edgeId, null), - dbCallbackExecutorService); - }, dbCallbackExecutorService); + } + } while (pageData != null && pageData.hasNext()); + return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); case UNASSIGNED_FROM_CUSTOMER: CustomerId customerIdToDelete = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), CustomerId.class); - edgeFuture = edgeService.findEdgeByIdAsync(tenantId, edgeId); - return Futures.transformAsync(edgeFuture, edge -> { - if (edge == null || customerIdToDelete.isNullUid()) { - return Futures.immediateFuture(null); - } - return Futures.transformAsync(saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, EdgeEventActionType.DELETED, customerIdToDelete, null), - voids -> saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.EDGE, EdgeEventActionType.UNASSIGNED_FROM_CUSTOMER, edgeId, null), - dbCallbackExecutorService); - }, dbCallbackExecutorService); + edge = edgeService.findEdgeById(tenantId, edgeId); + if (edge == null || customerIdToDelete.isNullUid()) { + return Futures.immediateFuture(null); + } + return Futures.transformAsync(saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.EDGE, EdgeEventActionType.UNASSIGNED_FROM_CUSTOMER, edgeId, null), + voids -> saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, EdgeEventActionType.DELETED, customerIdToDelete, null), + dbCallbackExecutorService); default: return Futures.immediateFuture(null); }