|
|
|
@ -69,46 +69,40 @@ public class EdgeProcessor extends BaseEdgeProcessor { |
|
|
|
try { |
|
|
|
EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); |
|
|
|
EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); |
|
|
|
ListenableFuture<Edge> edgeFuture; |
|
|
|
switch (actionType) { |
|
|
|
case ASSIGNED_TO_CUSTOMER: |
|
|
|
CustomerId customerId = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), CustomerId.class); |
|
|
|
edgeFuture = edgeService.findEdgeByIdAsync(tenantId, edgeId); |
|
|
|
return Futures.transformAsync(edgeFuture, edge -> { |
|
|
|
if (edge == null || customerId.isNullUid()) { |
|
|
|
return Futures.immediateFuture(null); |
|
|
|
} |
|
|
|
List<ListenableFuture<Void>> futures = new ArrayList<>(); |
|
|
|
futures.add(saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, EdgeEventActionType.ADDED, customerId, null)); |
|
|
|
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); |
|
|
|
PageData<User> pageData; |
|
|
|
do { |
|
|
|
pageData = userService.findCustomerUsers(tenantId, customerId, pageLink); |
|
|
|
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { |
|
|
|
log.trace("[{}] [{}] user(s) are going to be added to edge.", edge.getId(), pageData.getData().size()); |
|
|
|
for (User user : pageData.getData()) { |
|
|
|
futures.add(saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.USER, EdgeEventActionType.ADDED, user.getId(), null)); |
|
|
|
} |
|
|
|
if (pageData.hasNext()) { |
|
|
|
pageLink = pageLink.nextPageLink(); |
|
|
|
} |
|
|
|
Edge edge = edgeService.findEdgeById(tenantId, edgeId); |
|
|
|
if (edge == null || customerId.isNullUid()) { |
|
|
|
return Futures.immediateFuture(null); |
|
|
|
} |
|
|
|
List<ListenableFuture<Void>> futures = new ArrayList<>(); |
|
|
|
futures.add(saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, EdgeEventActionType.ADDED, customerId, null)); |
|
|
|
futures.add(saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.EDGE, EdgeEventActionType.ASSIGNED_TO_CUSTOMER, edgeId, null)); |
|
|
|
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); |
|
|
|
PageData<User> pageData; |
|
|
|
do { |
|
|
|
pageData = userService.findCustomerUsers(tenantId, customerId, pageLink); |
|
|
|
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { |
|
|
|
log.trace("[{}] [{}] user(s) are going to be added to edge.", edge.getId(), pageData.getData().size()); |
|
|
|
for (User user : pageData.getData()) { |
|
|
|
futures.add(saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.USER, EdgeEventActionType.ADDED, user.getId(), null)); |
|
|
|
} |
|
|
|
if (pageData.hasNext()) { |
|
|
|
pageLink = pageLink.nextPageLink(); |
|
|
|
} |
|
|
|
} while (pageData != null && pageData.hasNext()); |
|
|
|
return Futures.transformAsync(Futures.allAsList(futures), voids -> |
|
|
|
saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.EDGE, EdgeEventActionType.ASSIGNED_TO_CUSTOMER, edgeId, null), |
|
|
|
dbCallbackExecutorService); |
|
|
|
}, dbCallbackExecutorService); |
|
|
|
} |
|
|
|
} while (pageData != null && pageData.hasNext()); |
|
|
|
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); |
|
|
|
case UNASSIGNED_FROM_CUSTOMER: |
|
|
|
CustomerId customerIdToDelete = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), CustomerId.class); |
|
|
|
edgeFuture = edgeService.findEdgeByIdAsync(tenantId, edgeId); |
|
|
|
return Futures.transformAsync(edgeFuture, edge -> { |
|
|
|
if (edge == null || customerIdToDelete.isNullUid()) { |
|
|
|
return Futures.immediateFuture(null); |
|
|
|
} |
|
|
|
return Futures.transformAsync(saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, EdgeEventActionType.DELETED, customerIdToDelete, null), |
|
|
|
voids -> saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.EDGE, EdgeEventActionType.UNASSIGNED_FROM_CUSTOMER, edgeId, null), |
|
|
|
dbCallbackExecutorService); |
|
|
|
}, dbCallbackExecutorService); |
|
|
|
edge = edgeService.findEdgeById(tenantId, edgeId); |
|
|
|
if (edge == null || customerIdToDelete.isNullUid()) { |
|
|
|
return Futures.immediateFuture(null); |
|
|
|
} |
|
|
|
return Futures.transformAsync(saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.EDGE, EdgeEventActionType.UNASSIGNED_FROM_CUSTOMER, edgeId, null), |
|
|
|
voids -> saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, EdgeEventActionType.DELETED, customerIdToDelete, null), |
|
|
|
dbCallbackExecutorService); |
|
|
|
default: |
|
|
|
return Futures.immediateFuture(null); |
|
|
|
} |
|
|
|
|