diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java index 7518291e38..d7971e78be 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java @@ -141,8 +141,8 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { if (type != null) { SettableFuture futureToSet = SettableFuture.create(); String scope = attributesRequestMsg.getScope(); - ListenableFuture> ssAttrFuture = attributesService.findAll(tenantId, entityId, scope); - Futures.addCallback(ssAttrFuture, new FutureCallback>() { + ListenableFuture> findAttrFuture = attributesService.findAll(tenantId, entityId, scope); + Futures.addCallback(findAttrFuture, new FutureCallback>() { @Override public void onSuccess(@Nullable List ssAttributes) { if (ssAttributes != null && !ssAttributes.isEmpty()) { @@ -171,8 +171,9 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { entityId, body); } catch (Exception e) { - log.error("[{}] Failed to send attribute updates to the edge", edge.getName(), e); - throw new RuntimeException("[" + edge.getName() + "] Failed to send attribute updates to the edge", e); + log.error("[{}] Failed to save attribute updates to the edge", edge.getName(), e); + futureToSet.setException(new RuntimeException("[" + edge.getName() + "] Failed to send attribute updates to the edge", e)); + return; } } else { log.trace("[{}][{}] No attributes found for entity {} [{}]", tenantId, @@ -185,7 +186,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { @Override public void onFailure(Throwable t) { - log.error("Can't save attributes [{}]", attributesRequestMsg, t); + log.error("Can't find attributes [{}]", attributesRequestMsg, t); futureToSet.setException(t); } }, dbCallbackExecutorService); @@ -345,25 +346,37 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { public void onSuccess(@Nullable List entityViews) { try { if (entityViews != null && !entityViews.isEmpty()) { + List> futures = new ArrayList<>(); for (EntityView entityView : entityViews) { - Futures.addCallback(relationService.checkRelation(tenantId, edge.getId(), entityView.getId(), - EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE), new FutureCallback<>() { + ListenableFuture future = relationService.checkRelation(tenantId, edge.getId(), entityView.getId(), + EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE); + futures.add(future); + Futures.addCallback(future, new FutureCallback<>() { @Override public void onSuccess(@Nullable Boolean result) { if (Boolean.TRUE.equals(result)) { saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ENTITY_VIEW, EdgeEventActionType.ADDED, entityView.getId(), null); } - futureToSet.set(null); } - @Override public void onFailure(Throwable t) { - log.error("Exception during loading relation [{}] to edge on sync!", t, t); - futureToSet.setException(t); + // Do nothing - error handles in allAsList } }, dbCallbackExecutorService); } + Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() { + @Override + public void onSuccess(@Nullable List result) { + futureToSet.set(null); + } + + @Override + public void onFailure(Throwable t) { + log.error("Exception during loading relation [{}] to edge on sync!", t, t); + futureToSet.setException(t); + } + }, dbCallbackExecutorService); } else { futureToSet.set(null); }