From 375792da193acbe838cd533fbd2f036e0d114424 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Mon, 14 Apr 2025 13:30:46 +0300 Subject: [PATCH] save edge event -Do not do validation for active edges --- .../edge/rpc/processor/BaseEdgeProcessor.java | 52 ++++++++++++------- 1 file changed, 33 insertions(+), 19 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index d622963737..bb4bd19980 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -95,28 +95,42 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor { EdgeEventActionType action, EntityId entityId, JsonNode body) { - ListenableFuture> future = - edgeCtx.getAttributesService().find(tenantId, edgeId, AttributeScope.SERVER_SCOPE, DefaultDeviceStateService.ACTIVITY_STATE); - return Futures.transformAsync(future, activeOpt -> { - if (activeOpt.isEmpty()) { - log.trace("Edge is not activated. Skipping event. tenantId [{}], edgeId [{}], type[{}], " + - "action [{}], entityId [{}], body [{}]", - tenantId, edgeId, type, action, entityId, body); - return Futures.immediateFuture(null); - } - if (activeOpt.get().getBooleanValue().isPresent() && activeOpt.get().getBooleanValue().get()) { - return doSaveEdgeEvent(tenantId, edgeId, type, action, entityId, body); - } else { - if (doSaveIfEdgeIsOffline(type, action)) { - return doSaveEdgeEvent(tenantId, edgeId, type, action, entityId, body); - } else { - log.trace("Edge is not active at the moment. Skipping event. tenantId [{}], edgeId [{}], type[{}], " + + return saveEdgeEvent(tenantId, edgeId, type, action, entityId, body, true); + } + + protected ListenableFuture saveEdgeEvent(TenantId tenantId, + EdgeId edgeId, + EdgeEventType type, + EdgeEventActionType action, + EntityId entityId, + JsonNode body, + boolean doValidate) { + if (doValidate) { + ListenableFuture> future = + edgeCtx.getAttributesService().find(tenantId, edgeId, AttributeScope.SERVER_SCOPE, DefaultDeviceStateService.ACTIVITY_STATE); + return Futures.transformAsync(future, activeOpt -> { + if (activeOpt.isEmpty()) { + log.trace("Edge is not activated. Skipping event. tenantId [{}], edgeId [{}], type[{}], " + "action [{}], entityId [{}], body [{}]", tenantId, edgeId, type, action, entityId, body); return Futures.immediateFuture(null); } - } - }, dbCallbackExecutorService); + if (activeOpt.get().getBooleanValue().isPresent() && activeOpt.get().getBooleanValue().get()) { + return doSaveEdgeEvent(tenantId, edgeId, type, action, entityId, body); + } else { + if (doSaveIfEdgeIsOffline(type, action)) { + return doSaveEdgeEvent(tenantId, edgeId, type, action, entityId, body); + } else { + log.trace("Edge is not active at the moment. Skipping event. tenantId [{}], edgeId [{}], type[{}], " + + "action [{}], entityId [{}], body [{}]", + tenantId, edgeId, type, action, entityId, body); + return Futures.immediateFuture(null); + } + } + }, dbCallbackExecutorService); + } else { + return doSaveEdgeEvent(tenantId, edgeId, type, action, entityId, body); + } } private boolean doSaveIfEdgeIsOffline(EdgeEventType type, EdgeEventActionType action) { @@ -147,7 +161,7 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor { if (TenantId.SYS_TENANT_ID.equals(tenantId)) { PageDataIterable edges = new PageDataIterable<>(link -> edgeCtx.getEdgeService().findActiveEdges(link), 1024); for (Edge edge : edges) { - futures.add(saveEdgeEvent(edge.getTenantId(), edge.getId(), type, actionType, entityId, body)); + futures.add(saveEdgeEvent(edge.getTenantId(), edge.getId(), type, actionType, entityId, body, false)); } return Futures.immediateFuture(null); } else {