From 05a3366efd55d1d10edea258956e6549c4c8defd Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 10 Jan 2023 16:06:45 +0200 Subject: [PATCH] Relation Edge processor - do not ignore result of saveRelationAsync --- .../edge/rpc/processor/BaseEdgeProcessor.java | 6 ++++++ .../edge/rpc/processor/DeviceEdgeProcessor.java | 3 +-- .../rpc/processor/RelationEdgeProcessor.java | 16 +++++++++------- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 3b39520606..9776cf3a4d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -306,6 +306,12 @@ public abstract class BaseEdgeProcessor { return futures; } + protected ListenableFuture handleUnsupportedMsgType(UpdateMsgType msgType) { + String errMsg = String.format("Unsupported msg type %s", msgType); + log.error(errMsg); + return Futures.immediateFailedFuture(new RuntimeException(errMsg)); + } + protected UpdateMsgType getUpdateMsgType(EdgeEventActionType actionType) { switch (actionType) { case UPDATED: diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java index 02aca5165c..39dfa28aec 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java @@ -132,8 +132,7 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { return Futures.immediateFuture(null); case UNRECOGNIZED: default: - log.error("Unsupported msg type {}", deviceUpdateMsg.getMsgType()); - return Futures.immediateFailedFuture(new RuntimeException("Unsupported msg type " + deviceUpdateMsg.getMsgType())); + return handleUnsupportedMsgType(deviceUpdateMsg.getMsgType()); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RelationEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RelationEdgeProcessor.java index 8b07d771d6..b2bec3559d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RelationEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RelationEdgeProcessor.java @@ -80,23 +80,25 @@ public class RelationEdgeProcessor extends BaseEdgeProcessor { case ENTITY_UPDATED_RPC_MESSAGE: if (isEntityExists(tenantId, entityRelation.getTo()) && isEntityExists(tenantId, entityRelation.getFrom())) { - relationService.saveRelationAsync(tenantId, entityRelation); + return Futures.transform(relationService.saveRelationAsync(tenantId, entityRelation), + (result) -> null, dbCallbackExecutorService); + } else { + log.warn("Skipping relating update msg because from/to entity doesn't exists on edge, {}", relationUpdateMsg); + return Futures.immediateFuture(null); } - break; case ENTITY_DELETED_RPC_MESSAGE: - relationService.deleteRelation(tenantId, entityRelation); - break; + return Futures.transform(relationService.deleteRelationAsync(tenantId, entityRelation), + (result) -> null, dbCallbackExecutorService); case UNRECOGNIZED: - log.error("Unsupported msg type"); + default: + return handleUnsupportedMsgType(relationUpdateMsg.getMsgType()); } - return Futures.immediateFuture(null); } catch (Exception e) { log.error("Failed to process relation update msg [{}]", relationUpdateMsg, e); return Futures.immediateFailedFuture(new RuntimeException("Failed to process relation update msg", e)); } } - private boolean isEntityExists(TenantId tenantId, EntityId entityId) throws ThingsboardException { switch (entityId.getEntityType()) { case DEVICE: