Browse Source

Relation Edge processor - do not ignore result of saveRelationAsync

pull/7914/head
Volodymyr Babak 3 years ago
parent
commit
05a3366efd
  1. 6
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java
  2. 3
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java
  3. 16
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RelationEdgeProcessor.java

6
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java

@ -306,6 +306,12 @@ public abstract class BaseEdgeProcessor {
return futures;
}
protected ListenableFuture<Void> handleUnsupportedMsgType(UpdateMsgType msgType) {
String errMsg = String.format("Unsupported msg type %s", msgType);
log.error(errMsg);
return Futures.immediateFailedFuture(new RuntimeException(errMsg));
}
protected UpdateMsgType getUpdateMsgType(EdgeEventActionType actionType) {
switch (actionType) {
case UPDATED:

3
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java

@ -132,8 +132,7 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
return Futures.immediateFuture(null);
case UNRECOGNIZED:
default:
log.error("Unsupported msg type {}", deviceUpdateMsg.getMsgType());
return Futures.immediateFailedFuture(new RuntimeException("Unsupported msg type " + deviceUpdateMsg.getMsgType()));
return handleUnsupportedMsgType(deviceUpdateMsg.getMsgType());
}
}

16
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RelationEdgeProcessor.java

@ -80,23 +80,25 @@ public class RelationEdgeProcessor extends BaseEdgeProcessor {
case ENTITY_UPDATED_RPC_MESSAGE:
if (isEntityExists(tenantId, entityRelation.getTo())
&& isEntityExists(tenantId, entityRelation.getFrom())) {
relationService.saveRelationAsync(tenantId, entityRelation);
return Futures.transform(relationService.saveRelationAsync(tenantId, entityRelation),
(result) -> null, dbCallbackExecutorService);
} else {
log.warn("Skipping relating update msg because from/to entity doesn't exists on edge, {}", relationUpdateMsg);
return Futures.immediateFuture(null);
}
break;
case ENTITY_DELETED_RPC_MESSAGE:
relationService.deleteRelation(tenantId, entityRelation);
break;
return Futures.transform(relationService.deleteRelationAsync(tenantId, entityRelation),
(result) -> null, dbCallbackExecutorService);
case UNRECOGNIZED:
log.error("Unsupported msg type");
default:
return handleUnsupportedMsgType(relationUpdateMsg.getMsgType());
}
return Futures.immediateFuture(null);
} catch (Exception e) {
log.error("Failed to process relation update msg [{}]", relationUpdateMsg, e);
return Futures.immediateFailedFuture(new RuntimeException("Failed to process relation update msg", e));
}
}
private boolean isEntityExists(TenantId tenantId, EntityId entityId) throws ThingsboardException {
switch (entityId.getEntityType()) {
case DEVICE:

Loading…
Cancel
Save