Browse Source

updated rule node upgrade logic

pull/8661/head
ShvaykaD 3 years ago
parent
commit
490ad63ab6
  1. 64
      application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java
  2. 31
      dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java

64
application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java

@ -27,7 +27,6 @@ import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TbVersionedNode;
import org.thingsboard.rule.engine.flow.TbRuleChainInputNode;
import org.thingsboard.rule.engine.flow.TbRuleChainInputNodeConfiguration;
@ -86,10 +85,11 @@ import org.thingsboard.server.service.bean.BeanDiscoveryService;
import org.thingsboard.server.service.install.InstallScripts;
import org.thingsboard.server.service.install.SystemDataLoaderService;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@ -221,35 +221,26 @@ public class DefaultDataUpdateService implements DataUpdateService {
private void upgradeRuleNodes() {
try {
log.info("Lookup rule nodes to upgrade ...");
ArrayList<TbVersionedNode> tbVersionedNodes = getTbVersionedNodes();
log.info("Found {} versioned nodes to check for upgrade!", tbVersionedNodes.size());
for (TbVersionedNode tbVersionedNode : tbVersionedNodes) {
String ruleNodeType = tbVersionedNode.getClass().getName();
String ruleNodeTypeForLogs = tbVersionedNode.getClass().getSimpleName();
int toVersion = tbVersionedNode.getCurrentVersion();
var nodeClassToVersionMap = getNodeClassToVersionMap();
log.info("Found {} versioned nodes to check for upgrade!", nodeClassToVersionMap.size());
nodeClassToVersionMap.forEach((clazz, toVersion) -> {
var ruleNodeType = clazz.getName();
var ruleNodeTypeForLogs = clazz.getSimpleName();
log.info("Going to check for nodes with type: {} to upgrade to version: {}.", ruleNodeTypeForLogs, toVersion);
var ruleNodesToUpdate = new PageDataIterable<>(
pageLink ->
ruleChainService.findAllRuleNodesByTypeAndVersionLessThan(
ruleNodeType,
toVersion,
pageLink
),
1024
pageLink -> ruleChainService.findAllRuleNodesByTypeAndVersionLessThan(ruleNodeType, toVersion, pageLink), 1024
);
if (Iterables.isEmpty(ruleNodesToUpdate)) {
log.info("There are no active nodes with type: {}, or all nodes with this type already set to latest version!", ruleNodeTypeForLogs);
} else {
for (RuleNode ruleNode : ruleNodesToUpdate) {
RuleNodeId ruleNodeId = ruleNode.getId();
for (var ruleNode : ruleNodesToUpdate) {
var ruleNodeId = ruleNode.getId();
var oldConfiguration = ruleNode.getConfiguration();
int fromVersion = ruleNode.getConfigurationVersion();
log.info("Going to upgrade rule node with id: {} type: {} fromVersion: {} toVersion: {}",
ruleNodeId,
ruleNodeTypeForLogs,
fromVersion,
toVersion);
ruleNodeId, ruleNodeTypeForLogs, fromVersion, toVersion);
try {
var tbVersionedNode = (TbVersionedNode) clazz.getDeclaredConstructor().newInstance();
TbPair<Boolean, JsonNode> upgradeRuleNodeConfigurationResult = tbVersionedNode.upgrade(fromVersion, oldConfiguration);
if (upgradeRuleNodeConfigurationResult.getFirst()) {
ruleNode.setConfiguration(upgradeRuleNodeConfigurationResult.getSecond());
@ -257,45 +248,34 @@ public class DefaultDataUpdateService implements DataUpdateService {
ruleNode.setConfigurationVersion(toVersion);
ruleChainService.saveRuleNode(TenantId.SYS_TENANT_ID, ruleNode);
log.info("Successfully upgrade rule node with id: {} type: {} fromVersion: {} toVersion: {}",
ruleNodeId,
ruleNodeTypeForLogs,
fromVersion,
toVersion);
} catch (TbNodeException e) {
ruleNodeId, ruleNodeTypeForLogs, fromVersion, toVersion);
} catch (Exception e) {
log.warn("Failed to upgrade rule node with id: {} type: {} fromVersion: {} toVersion: {} due to: ",
ruleNodeId,
ruleNodeTypeForLogs,
fromVersion,
toVersion,
e);
ruleNodeId, ruleNodeTypeForLogs, fromVersion, toVersion, e);
}
}
}
}
});
log.info("Finished rule nodes upgrade!");
} catch (Exception e) {
log.error("Unexpected error during rule nodes upgrade: ", e);
}
}
private ArrayList<TbVersionedNode> getTbVersionedNodes() {
private Map<Class<?>, Integer> getNodeClassToVersionMap() {
var ruleNodeDefinitions = beanDiscoveryService.discoverBeansByAnnotationType(
org.thingsboard.rule.engine.api.RuleNode.class
);
var tbVersionedNodes = new ArrayList<TbVersionedNode>();
var tbVersionedNodes = new HashMap<Class<?>, Integer>();
for (var def : ruleNodeDefinitions) {
String clazzName = def.getBeanClassName();
try {
var clazz = Class.forName(clazzName);
if (TbVersionedNode.class.isAssignableFrom(clazz)) {
tbVersionedNodes.add((TbVersionedNode) clazz.getDeclaredConstructor().newInstance());
}
} catch (NoSuchMethodException |
InstantiationException |
IllegalAccessException |
InvocationTargetException |
ClassNotFoundException e
) {
TbVersionedNode tbVersionedNode = (TbVersionedNode) clazz.getDeclaredConstructor().newInstance();
tbVersionedNodes.put(clazz, tbVersionedNode.getCurrentVersion());
}
} catch (Exception e) {
log.warn("Failed to create instance of rule node type: {} due to: ", clazzName, e);
}
}

31
dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java

@ -199,10 +199,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
int toVersion = tbVersionedNode.getCurrentVersion();
if (fromVersion < toVersion) {
log.debug("Going to upgrade rule node with id: {} type: {} fromVersion: {} toVersion: {}",
ruleNodeId,
ruleNodeType,
fromVersion,
toVersion);
ruleNodeId, ruleNodeType, fromVersion, toVersion);
try {
TbPair<Boolean, JsonNode> upgradeResult = tbVersionedNode.upgrade(fromVersion, node.getConfiguration());
if (upgradeResult.getFirst()) {
@ -210,33 +207,19 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
}
node.setConfigurationVersion(toVersion);
log.debug("Successfully upgrade rule node with id: {} type: {}, rule chain id: {} fromVersion: {} toVersion: {}",
ruleNodeId,
ruleNodeType,
ruleChainId,
fromVersion,
toVersion);
ruleNodeId, ruleNodeType, ruleChainId, fromVersion, toVersion);
} catch (TbNodeException e) {
log.warn("Failed to upgrade rule node with id: {} type: {} rule chain id: {} fromVersion: {} toVersion: {} due to: ",
ruleNodeId,
ruleNodeType,
ruleChainId,
fromVersion,
toVersion,
e);
ruleNodeId, ruleNodeType, ruleChainId, fromVersion, toVersion, e);
}
} else {
log.debug("Rule node with id: {} type: {} ruleChainId: {} already set to latest version!",
ruleNodeId,
ruleChainId,
ruleNodeType);
ruleNodeId, ruleChainId, ruleNodeType);
}
}
} catch (ClassNotFoundException |
InvocationTargetException |
InstantiationException |
IllegalAccessException |
NoSuchMethodException e) {
log.error("Failed to create instance of rule node with id: {} type: {}, rule chain id: {}", ruleNodeId, ruleNodeType, ruleChainId);
} catch (Exception e) {
log.error("Failed to create instance of rule node with id: {} type: {}, rule chain id: {}",
ruleNodeId, ruleNodeType, ruleChainId);
}
RuleNode savedNode = ruleNodeDao.save(tenantId, node);
relations.add(new EntityRelation(ruleChainMetaData.getRuleChainId(), savedNode.getId(),

Loading…
Cancel
Save