From d045ef23e23bbd287d1748eac8e1cef1be36a7dd Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Mon, 14 Apr 2025 11:41:24 +0300 Subject: [PATCH 1/5] Edge - slow down processing of edge events for SYS_TENANT_ID --- .../edge/EdgeEventSourcingListener.java | 5 +++++ .../service/edge/rpc/EdgeGrpcSession.java | 2 +- .../edge/rpc/processor/BaseEdgeProcessor.java | 22 +++++++++++++------ 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java index c62a551310..8ec42b4a10 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java @@ -167,6 +167,11 @@ public class EdgeEventSourcingListener { @TransactionalEventListener(fallbackExecution = true) public void handleEvent(RelationActionEvent event) { try { + TenantId tenantId = event.getTenantId(); + if (!tenantId.isSysTenantId() && !tenantService.tenantExists(tenantId)) { + log.debug("[{}] Ignoring RelationActionEvent because tenant does not exist: {}", tenantId, event); + return; + } EntityRelation relation = event.getRelation(); if (relation == null) { log.trace("[{}] skipping RelationActionEvent event in case relation is null: {}", event.getTenantId(), event); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 9ce5973639..7861885989 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -528,7 +528,7 @@ public abstract class EdgeGrpcSession implements Closeable { sessionState.getPendingMsgsMap().remove(msg.getDownlinkMsgId()); log.debug("[{}][{}][{}] Msg has been processed successfully! Msg Id: [{}], Msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg); } else { - log.error("[{}][{}][{}] Msg processing failed! Msg Id: [{}], Error msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg.getErrorMsg()); + log.debug("[{}][{}][{}] Msg processing failed! Msg Id: [{}], Error msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg.getErrorMsg()); DownlinkMsg downlinkMsg = sessionState.getPendingMsgsMap().get(msg.getDownlinkMsgId()); // if NOT timeseries or attributes failures - ack failed downlink if (downlinkMsg.getEntityDataCount() == 0) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 9f30d70fb2..7ce1159397 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -65,6 +65,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -142,17 +143,24 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor { protected ListenableFuture processActionForAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId, - JsonNode body, EdgeId sourceEdgeId) { - List> futures = new ArrayList<>(); + EdgeId sourceEdgeId) { if (TenantId.SYS_TENANT_ID.equals(tenantId)) { - PageDataIterable tenantIds = new PageDataIterable<>(link -> edgeCtx.getTenantService().findTenantsIds(link), 1024); + PageDataIterable tenantIds = new PageDataIterable<>(link -> edgeCtx.getTenantService().findTenantsIds(link), 500); for (TenantId tenantId1 : tenantIds) { - futures.addAll(processActionForAllEdgesByTenantId(tenantId1, type, actionType, entityId, body, sourceEdgeId)); + try { + List> sysTenantFutures = processActionForAllEdgesByTenantId(tenantId1, type, actionType, entityId, null, sourceEdgeId); + for (ListenableFuture future : sysTenantFutures) { + future.get(10, TimeUnit.SECONDS); + } + } catch (Exception e) { + log.error("Failed to process action for all edges by SYS_TENANT_ID. Failed tenantId = [{}]", tenantId1, e); + } } + return Futures.immediateFuture(null); } else { - futures = processActionForAllEdgesByTenantId(tenantId, type, actionType, entityId, null, sourceEdgeId); + List> tenantFutures = processActionForAllEdgesByTenantId(tenantId, type, actionType, entityId, null, sourceEdgeId); + return Futures.transform(Futures.allAsList(tenantFutures), voids -> null, dbCallbackExecutorService); } - return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); } private List> processActionForAllEdgesByTenantId(TenantId tenantId, @@ -284,7 +292,7 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor { private ListenableFuture processEntityNotificationForAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId, EdgeId sourceEdgeId) { return switch (actionType) { case ADDED, UPDATED, DELETED, CREDENTIALS_UPDATED -> // used by USER entity - processActionForAllEdges(tenantId, type, actionType, entityId, null, sourceEdgeId); + processActionForAllEdges(tenantId, type, actionType, entityId, sourceEdgeId); default -> Futures.immediateFuture(null); }; } From 69bfe737b75f757cc1ba998b4aecc998ecfedf9d Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Mon, 14 Apr 2025 11:50:48 +0300 Subject: [PATCH 2/5] Fixed to be compatible with PE --- .../service/edge/rpc/processor/BaseEdgeProcessor.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 7ce1159397..4356a1d60c 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -143,12 +143,12 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor { protected ListenableFuture processActionForAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId, - EdgeId sourceEdgeId) { + JsonNode body, EdgeId sourceEdgeId) { if (TenantId.SYS_TENANT_ID.equals(tenantId)) { PageDataIterable tenantIds = new PageDataIterable<>(link -> edgeCtx.getTenantService().findTenantsIds(link), 500); for (TenantId tenantId1 : tenantIds) { try { - List> sysTenantFutures = processActionForAllEdgesByTenantId(tenantId1, type, actionType, entityId, null, sourceEdgeId); + List> sysTenantFutures = processActionForAllEdgesByTenantId(tenantId1, type, actionType, entityId, body, sourceEdgeId); for (ListenableFuture future : sysTenantFutures) { future.get(10, TimeUnit.SECONDS); } @@ -292,7 +292,7 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor { private ListenableFuture processEntityNotificationForAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId, EdgeId sourceEdgeId) { return switch (actionType) { case ADDED, UPDATED, DELETED, CREDENTIALS_UPDATED -> // used by USER entity - processActionForAllEdges(tenantId, type, actionType, entityId, sourceEdgeId); + processActionForAllEdges(tenantId, type, actionType, entityId, null, sourceEdgeId); default -> Futures.immediateFuture(null); }; } From 2478acc42d31c62f90fa511c47c79c03eb241a23 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Mon, 14 Apr 2025 13:25:32 +0300 Subject: [PATCH 3/5] Refactoring - introduced new method - findActiveEdges for all tenants --- .../edge/rpc/processor/BaseEdgeProcessor.java | 19 ++++++------------- .../server/dao/edge/EdgeService.java | 2 ++ .../thingsboard/server/dao/edge/EdgeDao.java | 2 ++ .../server/dao/edge/EdgeServiceImpl.java | 7 +++++++ .../server/dao/sql/edge/EdgeRepository.java | 12 ++++++++++++ .../server/dao/sql/edge/JpaEdgeDao.java | 8 ++++++++ 6 files changed, 37 insertions(+), 13 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 4356a1d60c..d622963737 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -65,7 +65,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -144,23 +143,17 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor { protected ListenableFuture processActionForAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId, JsonNode body, EdgeId sourceEdgeId) { + List> futures = new ArrayList<>(); if (TenantId.SYS_TENANT_ID.equals(tenantId)) { - PageDataIterable tenantIds = new PageDataIterable<>(link -> edgeCtx.getTenantService().findTenantsIds(link), 500); - for (TenantId tenantId1 : tenantIds) { - try { - List> sysTenantFutures = processActionForAllEdgesByTenantId(tenantId1, type, actionType, entityId, body, sourceEdgeId); - for (ListenableFuture future : sysTenantFutures) { - future.get(10, TimeUnit.SECONDS); - } - } catch (Exception e) { - log.error("Failed to process action for all edges by SYS_TENANT_ID. Failed tenantId = [{}]", tenantId1, e); - } + PageDataIterable edges = new PageDataIterable<>(link -> edgeCtx.getEdgeService().findActiveEdges(link), 1024); + for (Edge edge : edges) { + futures.add(saveEdgeEvent(edge.getTenantId(), edge.getId(), type, actionType, entityId, body)); } return Futures.immediateFuture(null); } else { - List> tenantFutures = processActionForAllEdgesByTenantId(tenantId, type, actionType, entityId, null, sourceEdgeId); - return Futures.transform(Futures.allAsList(tenantFutures), voids -> null, dbCallbackExecutorService); + futures = processActionForAllEdgesByTenantId(tenantId, type, actionType, entityId, null, sourceEdgeId); } + return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); } private List> processActionForAllEdgesByTenantId(TenantId tenantId, diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java index 59ad6f64b3..a8993a4a37 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java @@ -47,6 +47,8 @@ public interface EdgeService extends EntityDaoService { Optional findEdgeByRoutingKey(TenantId tenantId, String routingKey); + PageData findActiveEdges(PageLink pageLink); + Edge saveEdge(Edge edge); Edge assignEdgeToCustomer(TenantId tenantId, EdgeId edgeId, CustomerId customerId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeDao.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeDao.java index fdb9144ab2..405b2446b0 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeDao.java @@ -41,6 +41,8 @@ public interface EdgeDao extends Dao, TenantEntityDao { EdgeInfo findEdgeInfoById(TenantId tenantId, UUID edgeId); + PageData findActiveEdges(PageLink pageLink); + PageData findEdgeIdsByTenantId(UUID tenantId, PageLink pageLink); PageData findEdgesByTenantId(UUID tenantId, PageLink pageLink); diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java index 8163109686..0655d05572 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java @@ -192,6 +192,13 @@ public class EdgeServiceImpl extends AbstractCachedEntityService findActiveEdges(PageLink pageLink) { + log.trace("Executing findActiveEdges [{}]", pageLink); + Validator.validatePageLink(pageLink); + return edgeDao.findActiveEdges(pageLink); + } + @Override public Edge saveEdge(Edge edge) { log.trace("Executing saveEdge [{}]", edge); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/EdgeRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/EdgeRepository.java index c11db0a348..bd1dda54b8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/EdgeRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/EdgeRepository.java @@ -44,6 +44,18 @@ public interface EdgeRepository extends JpaRepository { "WHERE d.id = :edgeId") EdgeInfoEntity findEdgeInfoById(@Param("edgeId") UUID edgeId); + @Query(value = "SELECT ee.id, ee.created_time, ee.additional_info, ee.customer_id, " + + "ee.root_rule_chain_id, ee.type, ee.name, ee.label, ee.routing_key, " + + "ee.secret, ee.tenant_id, ee.version " + + "FROM edge ee " + + "JOIN attribute_kv ON ee.id = attribute_kv.entity_id " + + "JOIN key_dictionary ON attribute_kv.attribute_key = key_dictionary.key_id " + + "WHERE attribute_kv.bool_v = true AND key_dictionary.key = 'active' " + + "AND (:textSearch IS NULL OR ee.name ILIKE CONCAT('%', :textSearch, '%')) " + + "ORDER BY ee.id", nativeQuery = true) + Page findActiveEdges(@Param("textSearch") String textSearch, + Pageable pageable); + @Query("SELECT d.id FROM EdgeEntity d WHERE d.tenantId = :tenantId " + "AND (:textSearch IS NULL OR ilike(d.name, CONCAT('%', :textSearch, '%')) = true)") Page findIdsByTenantId(@Param("tenantId") UUID tenantId, diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaEdgeDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaEdgeDao.java index 3f45b1ca1a..50b8092731 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaEdgeDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaEdgeDao.java @@ -66,6 +66,14 @@ public class JpaEdgeDao extends JpaAbstractDao implements Edge return DaoUtil.getData(edgeRepository.findEdgeInfoById(edgeId)); } + @Override + public PageData findActiveEdges(PageLink pageLink) { + return DaoUtil.toPageData( + edgeRepository.findActiveEdges( + pageLink.getTextSearch(), + DaoUtil.toPageable(pageLink))); + } + @Override public PageData findEdgeIdsByTenantId(UUID tenantId, PageLink pageLink) { return DaoUtil.pageToPageData( From 375792da193acbe838cd533fbd2f036e0d114424 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Mon, 14 Apr 2025 13:30:46 +0300 Subject: [PATCH 4/5] save edge event -Do not do validation for active edges --- .../edge/rpc/processor/BaseEdgeProcessor.java | 52 ++++++++++++------- 1 file changed, 33 insertions(+), 19 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index d622963737..bb4bd19980 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -95,28 +95,42 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor { EdgeEventActionType action, EntityId entityId, JsonNode body) { - ListenableFuture> future = - edgeCtx.getAttributesService().find(tenantId, edgeId, AttributeScope.SERVER_SCOPE, DefaultDeviceStateService.ACTIVITY_STATE); - return Futures.transformAsync(future, activeOpt -> { - if (activeOpt.isEmpty()) { - log.trace("Edge is not activated. Skipping event. tenantId [{}], edgeId [{}], type[{}], " + - "action [{}], entityId [{}], body [{}]", - tenantId, edgeId, type, action, entityId, body); - return Futures.immediateFuture(null); - } - if (activeOpt.get().getBooleanValue().isPresent() && activeOpt.get().getBooleanValue().get()) { - return doSaveEdgeEvent(tenantId, edgeId, type, action, entityId, body); - } else { - if (doSaveIfEdgeIsOffline(type, action)) { - return doSaveEdgeEvent(tenantId, edgeId, type, action, entityId, body); - } else { - log.trace("Edge is not active at the moment. Skipping event. tenantId [{}], edgeId [{}], type[{}], " + + return saveEdgeEvent(tenantId, edgeId, type, action, entityId, body, true); + } + + protected ListenableFuture saveEdgeEvent(TenantId tenantId, + EdgeId edgeId, + EdgeEventType type, + EdgeEventActionType action, + EntityId entityId, + JsonNode body, + boolean doValidate) { + if (doValidate) { + ListenableFuture> future = + edgeCtx.getAttributesService().find(tenantId, edgeId, AttributeScope.SERVER_SCOPE, DefaultDeviceStateService.ACTIVITY_STATE); + return Futures.transformAsync(future, activeOpt -> { + if (activeOpt.isEmpty()) { + log.trace("Edge is not activated. Skipping event. tenantId [{}], edgeId [{}], type[{}], " + "action [{}], entityId [{}], body [{}]", tenantId, edgeId, type, action, entityId, body); return Futures.immediateFuture(null); } - } - }, dbCallbackExecutorService); + if (activeOpt.get().getBooleanValue().isPresent() && activeOpt.get().getBooleanValue().get()) { + return doSaveEdgeEvent(tenantId, edgeId, type, action, entityId, body); + } else { + if (doSaveIfEdgeIsOffline(type, action)) { + return doSaveEdgeEvent(tenantId, edgeId, type, action, entityId, body); + } else { + log.trace("Edge is not active at the moment. Skipping event. tenantId [{}], edgeId [{}], type[{}], " + + "action [{}], entityId [{}], body [{}]", + tenantId, edgeId, type, action, entityId, body); + return Futures.immediateFuture(null); + } + } + }, dbCallbackExecutorService); + } else { + return doSaveEdgeEvent(tenantId, edgeId, type, action, entityId, body); + } } private boolean doSaveIfEdgeIsOffline(EdgeEventType type, EdgeEventActionType action) { @@ -147,7 +161,7 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor { if (TenantId.SYS_TENANT_ID.equals(tenantId)) { PageDataIterable edges = new PageDataIterable<>(link -> edgeCtx.getEdgeService().findActiveEdges(link), 1024); for (Edge edge : edges) { - futures.add(saveEdgeEvent(edge.getTenantId(), edge.getId(), type, actionType, entityId, body)); + futures.add(saveEdgeEvent(edge.getTenantId(), edge.getId(), type, actionType, entityId, body, false)); } return Futures.immediateFuture(null); } else { From 5653ac7f2f496f64e1247064d271a5b860c68e62 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Mon, 14 Apr 2025 13:37:55 +0300 Subject: [PATCH 5/5] RelationActionEvent - check for existing tenant in case RELATION_DELETED --- .../server/service/edge/EdgeEventSourcingListener.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java index 8ec42b4a10..7bafd53644 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java @@ -168,7 +168,7 @@ public class EdgeEventSourcingListener { public void handleEvent(RelationActionEvent event) { try { TenantId tenantId = event.getTenantId(); - if (!tenantId.isSysTenantId() && !tenantService.tenantExists(tenantId)) { + if (ActionType.RELATION_DELETED.equals(event.getActionType()) && !tenantService.tenantExists(tenantId)) { log.debug("[{}] Ignoring RelationActionEvent because tenant does not exist: {}", tenantId, event); return; }