Browse Source

Improved stability of handling errors in processEntityViewsRequestMsg function

pull/4957/head
Volodymyr Babak 5 years ago
parent
commit
c7386df829
  1. 35
      application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java

35
application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java

@ -141,8 +141,8 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
if (type != null) {
SettableFuture<Void> futureToSet = SettableFuture.create();
String scope = attributesRequestMsg.getScope();
ListenableFuture<List<AttributeKvEntry>> ssAttrFuture = attributesService.findAll(tenantId, entityId, scope);
Futures.addCallback(ssAttrFuture, new FutureCallback<List<AttributeKvEntry>>() {
ListenableFuture<List<AttributeKvEntry>> findAttrFuture = attributesService.findAll(tenantId, entityId, scope);
Futures.addCallback(findAttrFuture, new FutureCallback<List<AttributeKvEntry>>() {
@Override
public void onSuccess(@Nullable List<AttributeKvEntry> ssAttributes) {
if (ssAttributes != null && !ssAttributes.isEmpty()) {
@ -171,8 +171,9 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
entityId,
body);
} catch (Exception e) {
log.error("[{}] Failed to send attribute updates to the edge", edge.getName(), e);
throw new RuntimeException("[" + edge.getName() + "] Failed to send attribute updates to the edge", e);
log.error("[{}] Failed to save attribute updates to the edge", edge.getName(), e);
futureToSet.setException(new RuntimeException("[" + edge.getName() + "] Failed to send attribute updates to the edge", e));
return;
}
} else {
log.trace("[{}][{}] No attributes found for entity {} [{}]", tenantId,
@ -185,7 +186,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
@Override
public void onFailure(Throwable t) {
log.error("Can't save attributes [{}]", attributesRequestMsg, t);
log.error("Can't find attributes [{}]", attributesRequestMsg, t);
futureToSet.setException(t);
}
}, dbCallbackExecutorService);
@ -345,25 +346,37 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
public void onSuccess(@Nullable List<EntityView> entityViews) {
try {
if (entityViews != null && !entityViews.isEmpty()) {
List<ListenableFuture<Boolean>> futures = new ArrayList<>();
for (EntityView entityView : entityViews) {
Futures.addCallback(relationService.checkRelation(tenantId, edge.getId(), entityView.getId(),
EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE), new FutureCallback<>() {
ListenableFuture<Boolean> future = relationService.checkRelation(tenantId, edge.getId(), entityView.getId(),
EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE);
futures.add(future);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Boolean result) {
if (Boolean.TRUE.equals(result)) {
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ENTITY_VIEW,
EdgeEventActionType.ADDED, entityView.getId(), null);
}
futureToSet.set(null);
}
@Override
public void onFailure(Throwable t) {
log.error("Exception during loading relation [{}] to edge on sync!", t, t);
futureToSet.setException(t);
// Do nothing - error handles in allAsList
}
}, dbCallbackExecutorService);
}
Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<Boolean> result) {
futureToSet.set(null);
}
@Override
public void onFailure(Throwable t) {
log.error("Exception during loading relation [{}] to edge on sync!", t, t);
futureToSet.setException(t);
}
}, dbCallbackExecutorService);
} else {
futureToSet.set(null);
}

Loading…
Cancel
Save