Browse Source

save edge event -Do not do validation for active edges

pull/13201/head
Volodymyr Babak 1 year ago
parent
commit
375792da19
  1. 52
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java

52
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java

@ -95,28 +95,42 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor {
EdgeEventActionType action,
EntityId entityId,
JsonNode body) {
ListenableFuture<Optional<AttributeKvEntry>> future =
edgeCtx.getAttributesService().find(tenantId, edgeId, AttributeScope.SERVER_SCOPE, DefaultDeviceStateService.ACTIVITY_STATE);
return Futures.transformAsync(future, activeOpt -> {
if (activeOpt.isEmpty()) {
log.trace("Edge is not activated. Skipping event. tenantId [{}], edgeId [{}], type[{}], " +
"action [{}], entityId [{}], body [{}]",
tenantId, edgeId, type, action, entityId, body);
return Futures.immediateFuture(null);
}
if (activeOpt.get().getBooleanValue().isPresent() && activeOpt.get().getBooleanValue().get()) {
return doSaveEdgeEvent(tenantId, edgeId, type, action, entityId, body);
} else {
if (doSaveIfEdgeIsOffline(type, action)) {
return doSaveEdgeEvent(tenantId, edgeId, type, action, entityId, body);
} else {
log.trace("Edge is not active at the moment. Skipping event. tenantId [{}], edgeId [{}], type[{}], " +
return saveEdgeEvent(tenantId, edgeId, type, action, entityId, body, true);
}
protected ListenableFuture<Void> saveEdgeEvent(TenantId tenantId,
EdgeId edgeId,
EdgeEventType type,
EdgeEventActionType action,
EntityId entityId,
JsonNode body,
boolean doValidate) {
if (doValidate) {
ListenableFuture<Optional<AttributeKvEntry>> future =
edgeCtx.getAttributesService().find(tenantId, edgeId, AttributeScope.SERVER_SCOPE, DefaultDeviceStateService.ACTIVITY_STATE);
return Futures.transformAsync(future, activeOpt -> {
if (activeOpt.isEmpty()) {
log.trace("Edge is not activated. Skipping event. tenantId [{}], edgeId [{}], type[{}], " +
"action [{}], entityId [{}], body [{}]",
tenantId, edgeId, type, action, entityId, body);
return Futures.immediateFuture(null);
}
}
}, dbCallbackExecutorService);
if (activeOpt.get().getBooleanValue().isPresent() && activeOpt.get().getBooleanValue().get()) {
return doSaveEdgeEvent(tenantId, edgeId, type, action, entityId, body);
} else {
if (doSaveIfEdgeIsOffline(type, action)) {
return doSaveEdgeEvent(tenantId, edgeId, type, action, entityId, body);
} else {
log.trace("Edge is not active at the moment. Skipping event. tenantId [{}], edgeId [{}], type[{}], " +
"action [{}], entityId [{}], body [{}]",
tenantId, edgeId, type, action, entityId, body);
return Futures.immediateFuture(null);
}
}
}, dbCallbackExecutorService);
} else {
return doSaveEdgeEvent(tenantId, edgeId, type, action, entityId, body);
}
}
private boolean doSaveIfEdgeIsOffline(EdgeEventType type, EdgeEventActionType action) {
@ -147,7 +161,7 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor {
if (TenantId.SYS_TENANT_ID.equals(tenantId)) {
PageDataIterable<Edge> edges = new PageDataIterable<>(link -> edgeCtx.getEdgeService().findActiveEdges(link), 1024);
for (Edge edge : edges) {
futures.add(saveEdgeEvent(edge.getTenantId(), edge.getId(), type, actionType, entityId, body));
futures.add(saveEdgeEvent(edge.getTenantId(), edge.getId(), type, actionType, entityId, body, false));
}
return Futures.immediateFuture(null);
} else {

Loading…
Cancel
Save