Browse Source

Edge rule chain CRUD: fixed setting root rule chain

pull/9195/head
Volodymyr Babak 1 year ago
parent
commit
e372587926
  1. 4
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/edge/EdgeEntityProcessor.java
  2. 13
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/rule/BaseRuleChainProcessor.java
  3. 33
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/rule/RuleChainEdgeProcessor.java
  4. 5
      application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java
  5. 28
      application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java
  6. 1
      application/src/test/java/org/thingsboard/server/edge/RuleChainEdgeTest.java

4
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/edge/EdgeEntityProcessor.java

@ -49,8 +49,12 @@ public class EdgeEntityProcessor extends BaseEdgeProcessor {
@Override
public ListenableFuture<Void> processEntityNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
try {
EdgeId originatorEdgeId = safeGetEdgeId(edgeNotificationMsg.getOriginatorEdgeIdMSB(), edgeNotificationMsg.getOriginatorEdgeIdLSB());
EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction());
EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
if (edgeId.equals(originatorEdgeId)) {
return Futures.immediateFuture(null);
}
switch (actionType) {
case ASSIGNED_TO_CUSTOMER: {
CustomerId customerId = JacksonUtil.fromString(edgeNotificationMsg.getBody(), CustomerId.class);

13
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/rule/BaseRuleChainProcessor.java

@ -39,24 +39,27 @@ public class BaseRuleChainProcessor extends BaseEdgeProcessor {
protected Pair<Boolean, Boolean> saveOrUpdateRuleChain(TenantId tenantId, RuleChainId ruleChainId, RuleChainUpdateMsg ruleChainUpdateMsg, RuleChainType ruleChainType) {
boolean created = false;
RuleChain ruleChain = edgeCtx.getRuleChainService().findRuleChainById(tenantId, ruleChainId);
if (ruleChain == null) {
RuleChain ruleChainFromDb = edgeCtx.getRuleChainService().findRuleChainById(tenantId, ruleChainId);
if (ruleChainFromDb == null) {
created = true;
}
ruleChain = JacksonUtil.fromString(ruleChainUpdateMsg.getEntity(), RuleChain.class, true);
RuleChain ruleChain = JacksonUtil.fromString(ruleChainUpdateMsg.getEntity(), RuleChain.class, true);
if (ruleChain == null) {
throw new RuntimeException("[{" + tenantId + "}] ruleChainUpdateMsg {" + ruleChainUpdateMsg + "} cannot be converted to rule chain");
}
boolean isRoot = ruleChain.isRoot();
ruleChain.setRoot(false);
if (RuleChainType.CORE.equals(ruleChainType)) {
ruleChain.setRoot(false);
} else {
ruleChain.setRoot(ruleChainFromDb == null ? false : ruleChainFromDb.isRoot());
}
ruleChain.setType(ruleChainType);
ruleChainValidator.validate(ruleChain, RuleChain::getTenantId);
if (created) {
ruleChain.setId(ruleChainId);
}
edgeCtx.getRuleChainService().saveRuleChain(ruleChain);
return Pair.of(created, isRoot);
}

33
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/rule/RuleChainEdgeProcessor.java

@ -57,8 +57,7 @@ public class RuleChainEdgeProcessor extends BaseRuleChainProcessor {
switch (ruleChainUpdateMsg.getMsgType()) {
case ENTITY_CREATED_RPC_MESSAGE:
case ENTITY_UPDATED_RPC_MESSAGE:
saveOrUpdateRuleChain(tenantId, ruleChainId, ruleChainUpdateMsg, edge);
return Futures.immediateFuture(null);
return saveOrUpdateRuleChain(tenantId, ruleChainId, ruleChainUpdateMsg, edge);
case ENTITY_DELETED_RPC_MESSAGE:
RuleChain ruleChainToDelete = edgeCtx.getRuleChainService().findRuleChainById(tenantId, ruleChainId);
if (ruleChainToDelete != null) {
@ -81,19 +80,25 @@ public class RuleChainEdgeProcessor extends BaseRuleChainProcessor {
}
}
private void saveOrUpdateRuleChain(TenantId tenantId, RuleChainId ruleChainId, RuleChainUpdateMsg ruleChainUpdateMsg, Edge edge) {
Pair<Boolean, Boolean> resultPair = super.saveOrUpdateRuleChain(tenantId, ruleChainId, ruleChainUpdateMsg, RuleChainType.EDGE);
Boolean created = resultPair.getFirst();
if (created) {
createRelationFromEdge(tenantId, edge.getId(), ruleChainId);
pushRuleChainCreatedEventToRuleEngine(tenantId, edge, ruleChainId, ruleChainUpdateMsg.getEntity());
edgeCtx.getRuleChainService().assignRuleChainToEdge(tenantId, ruleChainId, edge.getId());
}
Boolean isRoot = resultPair.getSecond();
if (isRoot) {
edge.setRootRuleChainId(ruleChainId);
edgeCtx.getEdgeService().saveEdge(edge);
private ListenableFuture<Void> saveOrUpdateRuleChain(TenantId tenantId, RuleChainId ruleChainId, RuleChainUpdateMsg ruleChainUpdateMsg, Edge edge) {
try {
Pair<Boolean, Boolean> resultPair = super.saveOrUpdateRuleChain(tenantId, ruleChainId, ruleChainUpdateMsg, RuleChainType.EDGE);
Boolean created = resultPair.getFirst();
if (created) {
createRelationFromEdge(tenantId, edge.getId(), ruleChainId);
pushRuleChainCreatedEventToRuleEngine(tenantId, edge, ruleChainId, ruleChainUpdateMsg.getEntity());
edgeCtx.getRuleChainService().assignRuleChainToEdge(tenantId, ruleChainId, edge.getId());
}
Boolean isRoot = resultPair.getSecond();
if (isRoot) {
edge = edgeCtx.getEdgeService().findEdgeById(tenantId, edge.getId());
edgeCtx.getEdgeService().setEdgeRootRuleChain(tenantId, edge, ruleChainId);
}
} catch (Exception e) {
log.error("Failed to save or update rule chain", e);
return Futures.immediateFailedFuture(e);
}
return Futures.immediateFuture(null);
}
private void pushRuleChainCreatedEventToRuleEngine(TenantId tenantId, Edge edge, RuleChainId ruleChainId, String ruleChainAsString) {

5
application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java

@ -50,6 +50,7 @@ import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg;
import org.thingsboard.server.dao.edge.EdgeSynchronizationManager;
import org.thingsboard.server.dao.eventsourcing.ActionEntityEvent;
import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent;
import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent;
@ -64,6 +65,7 @@ public class EntityStateSourcingListener {
private final TenantService tenantService;
private final TbClusterService tbClusterService;
private final EdgeSynchronizationManager edgeSynchronizationManager;
@PostConstruct
public void init() {
@ -241,6 +243,9 @@ public class EntityStateSourcingListener {
private void onEdgeEvent(TenantId tenantId, EntityId entityId, Object entity, ComponentLifecycleEvent lifecycleEvent) {
if (entity instanceof Edge) {
if (entityId.equals(edgeSynchronizationManager.getEdgeId().get())) {
return;
}
tbClusterService.onEdgeStateChangeEvent(new ComponentLifecycleMsg(tenantId, entityId, lifecycleEvent));
} else if (entity instanceof EdgeEvent edgeEvent) {
tbClusterService.onEdgeEventUpdate(new EdgeEventUpdateMsg(tenantId, edgeEvent.getEdgeId()));

28
application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java

@ -140,7 +140,7 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
installation();
edgeImitator = new EdgeImitator("localhost", 7070, edge.getRoutingKey(), edge.getSecret());
edgeImitator.expectMessageAmount(27);
edgeImitator.expectMessageAmount(25);
edgeImitator.ignoreType(OAuth2ClientUpdateMsg.class);
edgeImitator.ignoreType(OAuth2DomainUpdateMsg.class);
edgeImitator.connect();
@ -191,8 +191,10 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
}
private RuleChainId getEdgeRootRuleChainId() throws Exception {
List<RuleChain> edgeRuleChains = doGetTypedWithPageLink("/api/edge/" + edge.getUuidId() + "/ruleChains?",
new TypeReference<PageData<RuleChain>>() {}, new PageLink(100)).getData();
List<RuleChain> edgeRuleChains = doGetTypedWithPageLink("/api/ruleChains?type={type}&",
new TypeReference<PageData<RuleChain>>() {},
new PageLink(100, 0, "Edge Root Rule Chain"),
"EDGE").getData();
for (RuleChain edgeRuleChain : edgeRuleChains) {
if (edgeRuleChain.isRoot()) {
return edgeRuleChain.getId();
@ -243,8 +245,8 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
validateMsgsCnt(RuleChainUpdateMsg.class, 1);
UUID ruleChainUUID = validateRuleChains();
// 2 messages - 1 from rule chain fetcher and 1 from rule chain controller (it goes along with RuleChainUpdateMsg)
validateMsgsCnt(RuleChainMetadataUpdateMsg.class, 2);
// 1 from rule chain fetcher
validateMsgsCnt(RuleChainMetadataUpdateMsg.class, 1);
validateRuleChainMetadataUpdates(ruleChainUUID);
// 4 messages ('general', 'mail', 'connectivity', 'jwt')
@ -426,17 +428,11 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
}
private void validateRuleChainMetadataUpdates(UUID expectedRuleChainUUID) {
List<RuleChainMetadataUpdateMsg> ruleChainMetadataUpdateMsgList = edgeImitator.findAllMessagesByType(RuleChainMetadataUpdateMsg.class);
Assert.assertEquals(2, ruleChainMetadataUpdateMsgList.size());
// metadata create msg
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsgCreated = ruleChainMetadataUpdateMsgList.get(0);
Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, ruleChainMetadataUpdateMsgCreated.getMsgType());
RuleChainMetaData ruleChainMetaData = JacksonUtil.fromString(ruleChainMetadataUpdateMsgCreated.getEntity(), RuleChainMetaData.class, true);
Assert.assertEquals(expectedRuleChainUUID, ruleChainMetaData.getRuleChainId().getId());
// metadata update msg
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsgUpdated = ruleChainMetadataUpdateMsgList.get(1);
Assert.assertEquals(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, ruleChainMetadataUpdateMsgUpdated.getMsgType());
ruleChainMetaData = JacksonUtil.fromString(ruleChainMetadataUpdateMsgUpdated.getEntity(), RuleChainMetaData.class, true);
Optional<RuleChainMetadataUpdateMsg> ruleChainMetadataUpdateMsgOpt = edgeImitator.findMessageByType(RuleChainMetadataUpdateMsg.class);
Assert.assertTrue(ruleChainMetadataUpdateMsgOpt.isPresent());
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = ruleChainMetadataUpdateMsgOpt.get();
Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, ruleChainMetadataUpdateMsg.getMsgType());
RuleChainMetaData ruleChainMetaData = JacksonUtil.fromString(ruleChainMetadataUpdateMsg.getEntity(), RuleChainMetaData.class, true);
Assert.assertEquals(expectedRuleChainUUID, ruleChainMetaData.getRuleChainId().getId());
}

1
application/src/test/java/org/thingsboard/server/edge/RuleChainEdgeTest.java

@ -105,6 +105,7 @@ public class RuleChainEdgeTest extends AbstractEdgeTest {
// create rule chain on edge
RuleChain edgeRuleChain = new RuleChain();
edgeRuleChain.setTenantId(tenantId);
edgeRuleChain.setId(new RuleChainId(uuid));
edgeRuleChain.setName(ruleChainName);
UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder();

Loading…
Cancel
Save