Browse Source

refactored due to comments

pull/8414/head
YevhenBondarenko 3 years ago
parent
commit
b5641a3c6c
  1. 19
      application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
  2. 66
      application/src/test/java/org/thingsboard/server/service/RuleChainDataValidatorTest.java
  3. 29
      dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java
  4. 17
      dao/src/main/java/org/thingsboard/server/dao/service/validator/RuleChainDataValidator.java

19
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java

@ -60,7 +60,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
@Override
public void start(TbActorCtx context) throws Exception {
if (isMySingletonNode()) {
if (isMyNodePartition()) {
tbNode = initComponent(ruleNode);
if (tbNode != null) {
state = ComponentLifecycleState.ACTIVE;
@ -70,7 +70,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
@Override
public void onUpdate(TbActorCtx context) throws Exception {
if (isMySingletonNode()) {
if (isMyNodePartition()) {
RuleNode newRuleNode = systemContext.getRuleChainService().findRuleNodeById(tenantId, entityId);
this.info = new RuleNodeInfo(entityId, ruleChainName, newRuleNode != null ? newRuleNode.getName() : "Unknown");
boolean restartRequired = state != ComponentLifecycleState.ACTIVE ||
@ -104,13 +104,13 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
@Override
public void onPartitionChangeMsg(PartitionChangeMsg msg) throws Exception {
if (tbNode != null) {
if (!isMySingletonNode()) {
if (!isMyNodePartition()) {
stop(null);
tbNode = null;
} else {
tbNode.onPartitionChangeMsg(defaultCtx, msg);
}
} else if (isMySingletonNode()) {
} else if (isMyNodePartition()) {
start(null);
}
}
@ -136,8 +136,8 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
}
void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) throws Exception {
if (!isMySingletonNode()) {
putToQueue(msg.getMsg());
if (!isMyNodePartition()) {
putToNodePartition(msg.getMsg());
} else {
msg.getMsg().getCallback().onProcessingStart(info);
checkComponentStateActive(msg.getMsg());
@ -180,15 +180,16 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
return new RuleNodeException("Rule Node is not active! Failed to initialize.", ruleChainName, ruleNode);
}
private boolean isMySingletonNode() {
private boolean isMyNodePartition() {
return !ruleNode.isSingletonMode()
|| systemContext.getDiscoveryService().isMonolith()
|| defaultCtx.isLocalEntity(ruleNode.getId());
}
private void putToQueue(TbMsg source) {
//Message will return after processing. See RuleChainActorMessageProcessor.pushToTarget.
private void putToNodePartition(TbMsg source) {
TbMsg tbMsg = TbMsg.newMsg(source, source.getQueueName(), source.getRuleChainId(), entityId);
TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), tenantId, entityId);
TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), tenantId, ruleNode.getId());
TransportProtos.ToRuleEngineMsg toQueueMsg = TransportProtos.ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())

66
application/src/test/java/org/thingsboard/server/service/RuleChainDataValidatorTest.java

@ -1,66 +0,0 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.service.validator.RuleChainDataValidator;
public class RuleChainDataValidatorTest {
@Test
public void testSingletonSupport() {
String node = "org.thingsboard.rule.engine.mqtt.TbMqttNode";
RuleNode ruleNode = createRuleNode(node, false);
RuleChainDataValidator.validateRuleNode(ruleNode);
ruleNode.setSingletonMode(true);
RuleChainDataValidator.validateRuleNode(ruleNode);
}
@Test
public void testSingletonNotSupport() {
String node = "org.thingsboard.rule.engine.flow.TbAckNode";
RuleNode ruleNode = createRuleNode(node, false);
RuleChainDataValidator.validateRuleNode(ruleNode);
ruleNode.setSingletonMode(true);
Assertions.assertThrows(DataValidationException.class,
() -> RuleChainDataValidator.validateRuleNode(ruleNode),
String.format("Singleton mode not supported for [%s].", ruleNode.getType()));
}
@Test
public void testSingletonOnly() {
String node = "org.thingsboard.rule.engine.mqtt.azure.TbAzureIotHubNode";
RuleNode ruleNode = createRuleNode(node, true);
RuleChainDataValidator.validateRuleNode(ruleNode);
ruleNode.setSingletonMode(false);
Assertions.assertThrows(DataValidationException.class,
() -> RuleChainDataValidator.validateRuleNode(ruleNode),
String.format("Supported only singleton mode for [%s].", ruleNode.getType()));
}
private RuleNode createRuleNode(String type, boolean singletonMode) {
RuleNode ruleNode = new RuleNode();
ruleNode.setName("test node");
ruleNode.setType(type);
ruleNode.setSingletonMode(singletonMode);
ruleNode.setConfiguration(JacksonUtil.newObjectNode());
return ruleNode;
}
}

29
dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java

@ -38,6 +38,7 @@ import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.plugin.ComponentClusteringMode;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.rule.NodeConnectionInfo;
@ -50,6 +51,7 @@ import org.thingsboard.server.common.data.rule.RuleChainType;
import org.thingsboard.server.common.data.rule.RuleChainUpdateResult;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.data.rule.RuleNodeUpdateResult;
import org.thingsboard.server.common.data.util.ReflectionUtils;
import org.thingsboard.server.dao.entity.AbstractEntityService;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.service.DataValidator;
@ -146,6 +148,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
Map<RuleNodeId, Integer> ruleNodeIndexMap = new HashMap<>();
if (nodes != null) {
for (RuleNode node : nodes) {
setSingletonMode(node);
if (node.getId() != null) {
ruleNodeIndexMap.put(node.getId(), nodes.indexOf(node));
} else {
@ -769,4 +772,30 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
}
};
private void setSingletonMode(RuleNode ruleNode) {
boolean singletonMode;
try {
ComponentClusteringMode nodeConfigType = ReflectionUtils.getAnnotationProperty(ruleNode.getType(),
"org.thingsboard.rule.engine.api.RuleNode", "clusteringMode");
switch (nodeConfigType) {
case ENABLED:
singletonMode = false;
break;
case SINGLETON:
singletonMode = true;
break;
case USER_PREFERENCE:
default:
singletonMode = ruleNode.isSingletonMode();
break;
}
} catch (Exception e) {
log.warn("Failed to get clustering mode: {}", ExceptionUtils.getRootCauseMessage(e));
singletonMode = false;
}
ruleNode.setSingletonMode(singletonMode);
}
}

17
dao/src/main/java/org/thingsboard/server/dao/service/validator/RuleChainDataValidator.java

@ -25,7 +25,6 @@ import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentClusteringMode;
import org.thingsboard.server.common.data.rule.NodeConnectionInfo;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
@ -113,22 +112,6 @@ public class RuleChainDataValidator extends DataValidator<RuleChain> {
return;
}
ConstraintValidator.validateFields(nodeConfig, errorPrefix);
ComponentClusteringMode nodeConfigType = null;
try {
nodeConfigType = ReflectionUtils.getAnnotationProperty(ruleNode.getType(),
"org.thingsboard.rule.engine.api.RuleNode", "clusteringMode");
} catch (Exception e) {
log.warn("Failed to validate singleton mode: {}", ExceptionUtils.getRootCauseMessage(e));
return;
}
if (ComponentClusteringMode.ENABLED.equals(nodeConfigType) && ruleNode.isSingletonMode()) {
throw new DataValidationException(String.format("Singleton mode not supported for [%s].", ruleNode.getType()));
}
if (ComponentClusteringMode.SINGLETON.equals(nodeConfigType) && !ruleNode.isSingletonMode()) {
throw new DataValidationException(String.format("Supported only singleton mode for [%s].", ruleNode.getType()));
}
}
private static void validateCircles(List<NodeConnectionInfo> connectionInfos) {

Loading…
Cancel
Save