From e372587926fc2fbf152df3c4f54fdf7121bca8c8 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Thu, 27 Feb 2025 13:01:21 +0200 Subject: [PATCH] Edge rule chain CRUD: fixed setting root rule chain --- .../processor/edge/EdgeEntityProcessor.java | 4 +++ .../rule/BaseRuleChainProcessor.java | 13 +++++--- .../rule/RuleChainEdgeProcessor.java | 33 +++++++++++-------- .../entitiy/EntityStateSourcingListener.java | 5 +++ .../server/edge/AbstractEdgeTest.java | 28 +++++++--------- .../server/edge/RuleChainEdgeTest.java | 1 + 6 files changed, 49 insertions(+), 35 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/edge/EdgeEntityProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/edge/EdgeEntityProcessor.java index ddbe4810df..77fa31c028 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/edge/EdgeEntityProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/edge/EdgeEntityProcessor.java @@ -49,8 +49,12 @@ public class EdgeEntityProcessor extends BaseEdgeProcessor { @Override public ListenableFuture processEntityNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { try { + EdgeId originatorEdgeId = safeGetEdgeId(edgeNotificationMsg.getOriginatorEdgeIdMSB(), edgeNotificationMsg.getOriginatorEdgeIdLSB()); EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); + if (edgeId.equals(originatorEdgeId)) { + return Futures.immediateFuture(null); + } switch (actionType) { case ASSIGNED_TO_CUSTOMER: { CustomerId customerId = JacksonUtil.fromString(edgeNotificationMsg.getBody(), CustomerId.class); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/rule/BaseRuleChainProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/rule/BaseRuleChainProcessor.java index 03904ce5ef..3231d7788a 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/rule/BaseRuleChainProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/rule/BaseRuleChainProcessor.java @@ -39,24 +39,27 @@ public class BaseRuleChainProcessor extends BaseEdgeProcessor { protected Pair saveOrUpdateRuleChain(TenantId tenantId, RuleChainId ruleChainId, RuleChainUpdateMsg ruleChainUpdateMsg, RuleChainType ruleChainType) { boolean created = false; - RuleChain ruleChain = edgeCtx.getRuleChainService().findRuleChainById(tenantId, ruleChainId); - if (ruleChain == null) { + RuleChain ruleChainFromDb = edgeCtx.getRuleChainService().findRuleChainById(tenantId, ruleChainId); + if (ruleChainFromDb == null) { created = true; } - ruleChain = JacksonUtil.fromString(ruleChainUpdateMsg.getEntity(), RuleChain.class, true); + RuleChain ruleChain = JacksonUtil.fromString(ruleChainUpdateMsg.getEntity(), RuleChain.class, true); if (ruleChain == null) { throw new RuntimeException("[{" + tenantId + "}] ruleChainUpdateMsg {" + ruleChainUpdateMsg + "} cannot be converted to rule chain"); } boolean isRoot = ruleChain.isRoot(); - ruleChain.setRoot(false); + if (RuleChainType.CORE.equals(ruleChainType)) { + ruleChain.setRoot(false); + } else { + ruleChain.setRoot(ruleChainFromDb == null ? false : ruleChainFromDb.isRoot()); + } ruleChain.setType(ruleChainType); ruleChainValidator.validate(ruleChain, RuleChain::getTenantId); if (created) { ruleChain.setId(ruleChainId); } - edgeCtx.getRuleChainService().saveRuleChain(ruleChain); return Pair.of(created, isRoot); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/rule/RuleChainEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/rule/RuleChainEdgeProcessor.java index 340fb09eab..06fb4c37a2 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/rule/RuleChainEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/rule/RuleChainEdgeProcessor.java @@ -57,8 +57,7 @@ public class RuleChainEdgeProcessor extends BaseRuleChainProcessor { switch (ruleChainUpdateMsg.getMsgType()) { case ENTITY_CREATED_RPC_MESSAGE: case ENTITY_UPDATED_RPC_MESSAGE: - saveOrUpdateRuleChain(tenantId, ruleChainId, ruleChainUpdateMsg, edge); - return Futures.immediateFuture(null); + return saveOrUpdateRuleChain(tenantId, ruleChainId, ruleChainUpdateMsg, edge); case ENTITY_DELETED_RPC_MESSAGE: RuleChain ruleChainToDelete = edgeCtx.getRuleChainService().findRuleChainById(tenantId, ruleChainId); if (ruleChainToDelete != null) { @@ -81,19 +80,25 @@ public class RuleChainEdgeProcessor extends BaseRuleChainProcessor { } } - private void saveOrUpdateRuleChain(TenantId tenantId, RuleChainId ruleChainId, RuleChainUpdateMsg ruleChainUpdateMsg, Edge edge) { - Pair resultPair = super.saveOrUpdateRuleChain(tenantId, ruleChainId, ruleChainUpdateMsg, RuleChainType.EDGE); - Boolean created = resultPair.getFirst(); - if (created) { - createRelationFromEdge(tenantId, edge.getId(), ruleChainId); - pushRuleChainCreatedEventToRuleEngine(tenantId, edge, ruleChainId, ruleChainUpdateMsg.getEntity()); - edgeCtx.getRuleChainService().assignRuleChainToEdge(tenantId, ruleChainId, edge.getId()); - } - Boolean isRoot = resultPair.getSecond(); - if (isRoot) { - edge.setRootRuleChainId(ruleChainId); - edgeCtx.getEdgeService().saveEdge(edge); + private ListenableFuture saveOrUpdateRuleChain(TenantId tenantId, RuleChainId ruleChainId, RuleChainUpdateMsg ruleChainUpdateMsg, Edge edge) { + try { + Pair resultPair = super.saveOrUpdateRuleChain(tenantId, ruleChainId, ruleChainUpdateMsg, RuleChainType.EDGE); + Boolean created = resultPair.getFirst(); + if (created) { + createRelationFromEdge(tenantId, edge.getId(), ruleChainId); + pushRuleChainCreatedEventToRuleEngine(tenantId, edge, ruleChainId, ruleChainUpdateMsg.getEntity()); + edgeCtx.getRuleChainService().assignRuleChainToEdge(tenantId, ruleChainId, edge.getId()); + } + Boolean isRoot = resultPair.getSecond(); + if (isRoot) { + edge = edgeCtx.getEdgeService().findEdgeById(tenantId, edge.getId()); + edgeCtx.getEdgeService().setEdgeRootRuleChain(tenantId, edge, ruleChainId); + } + } catch (Exception e) { + log.error("Failed to save or update rule chain", e); + return Futures.immediateFailedFuture(e); } + return Futures.immediateFuture(null); } private void pushRuleChainCreatedEventToRuleEngine(TenantId tenantId, Edge edge, RuleChainId ruleChainId, String ruleChainAsString) { diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java index 35aad36dff..e40251673d 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java @@ -50,6 +50,7 @@ import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg; +import org.thingsboard.server.dao.edge.EdgeSynchronizationManager; import org.thingsboard.server.dao.eventsourcing.ActionEntityEvent; import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent; import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent; @@ -64,6 +65,7 @@ public class EntityStateSourcingListener { private final TenantService tenantService; private final TbClusterService tbClusterService; + private final EdgeSynchronizationManager edgeSynchronizationManager; @PostConstruct public void init() { @@ -241,6 +243,9 @@ public class EntityStateSourcingListener { private void onEdgeEvent(TenantId tenantId, EntityId entityId, Object entity, ComponentLifecycleEvent lifecycleEvent) { if (entity instanceof Edge) { + if (entityId.equals(edgeSynchronizationManager.getEdgeId().get())) { + return; + } tbClusterService.onEdgeStateChangeEvent(new ComponentLifecycleMsg(tenantId, entityId, lifecycleEvent)); } else if (entity instanceof EdgeEvent edgeEvent) { tbClusterService.onEdgeEventUpdate(new EdgeEventUpdateMsg(tenantId, edgeEvent.getEdgeId())); diff --git a/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java index 44dac215e2..feac7adb04 100644 --- a/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java @@ -140,7 +140,7 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { installation(); edgeImitator = new EdgeImitator("localhost", 7070, edge.getRoutingKey(), edge.getSecret()); - edgeImitator.expectMessageAmount(27); + edgeImitator.expectMessageAmount(25); edgeImitator.ignoreType(OAuth2ClientUpdateMsg.class); edgeImitator.ignoreType(OAuth2DomainUpdateMsg.class); edgeImitator.connect(); @@ -191,8 +191,10 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { } private RuleChainId getEdgeRootRuleChainId() throws Exception { - List edgeRuleChains = doGetTypedWithPageLink("/api/edge/" + edge.getUuidId() + "/ruleChains?", - new TypeReference>() {}, new PageLink(100)).getData(); + List edgeRuleChains = doGetTypedWithPageLink("/api/ruleChains?type={type}&", + new TypeReference>() {}, + new PageLink(100, 0, "Edge Root Rule Chain"), + "EDGE").getData(); for (RuleChain edgeRuleChain : edgeRuleChains) { if (edgeRuleChain.isRoot()) { return edgeRuleChain.getId(); @@ -243,8 +245,8 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { validateMsgsCnt(RuleChainUpdateMsg.class, 1); UUID ruleChainUUID = validateRuleChains(); - // 2 messages - 1 from rule chain fetcher and 1 from rule chain controller (it goes along with RuleChainUpdateMsg) - validateMsgsCnt(RuleChainMetadataUpdateMsg.class, 2); + // 1 from rule chain fetcher + validateMsgsCnt(RuleChainMetadataUpdateMsg.class, 1); validateRuleChainMetadataUpdates(ruleChainUUID); // 4 messages ('general', 'mail', 'connectivity', 'jwt') @@ -426,17 +428,11 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { } private void validateRuleChainMetadataUpdates(UUID expectedRuleChainUUID) { - List ruleChainMetadataUpdateMsgList = edgeImitator.findAllMessagesByType(RuleChainMetadataUpdateMsg.class); - Assert.assertEquals(2, ruleChainMetadataUpdateMsgList.size()); - // metadata create msg - RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsgCreated = ruleChainMetadataUpdateMsgList.get(0); - Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, ruleChainMetadataUpdateMsgCreated.getMsgType()); - RuleChainMetaData ruleChainMetaData = JacksonUtil.fromString(ruleChainMetadataUpdateMsgCreated.getEntity(), RuleChainMetaData.class, true); - Assert.assertEquals(expectedRuleChainUUID, ruleChainMetaData.getRuleChainId().getId()); - // metadata update msg - RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsgUpdated = ruleChainMetadataUpdateMsgList.get(1); - Assert.assertEquals(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, ruleChainMetadataUpdateMsgUpdated.getMsgType()); - ruleChainMetaData = JacksonUtil.fromString(ruleChainMetadataUpdateMsgUpdated.getEntity(), RuleChainMetaData.class, true); + Optional ruleChainMetadataUpdateMsgOpt = edgeImitator.findMessageByType(RuleChainMetadataUpdateMsg.class); + Assert.assertTrue(ruleChainMetadataUpdateMsgOpt.isPresent()); + RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = ruleChainMetadataUpdateMsgOpt.get(); + Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, ruleChainMetadataUpdateMsg.getMsgType()); + RuleChainMetaData ruleChainMetaData = JacksonUtil.fromString(ruleChainMetadataUpdateMsg.getEntity(), RuleChainMetaData.class, true); Assert.assertEquals(expectedRuleChainUUID, ruleChainMetaData.getRuleChainId().getId()); } diff --git a/application/src/test/java/org/thingsboard/server/edge/RuleChainEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/RuleChainEdgeTest.java index ed77c4b97a..7c029e0471 100644 --- a/application/src/test/java/org/thingsboard/server/edge/RuleChainEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/RuleChainEdgeTest.java @@ -105,6 +105,7 @@ public class RuleChainEdgeTest extends AbstractEdgeTest { // create rule chain on edge RuleChain edgeRuleChain = new RuleChain(); + edgeRuleChain.setTenantId(tenantId); edgeRuleChain.setId(new RuleChainId(uuid)); edgeRuleChain.setName(ruleChainName); UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder();