|
|
|
@ -16,6 +16,7 @@ |
|
|
|
package org.thingsboard.server.actors.ruleChain; |
|
|
|
|
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.thingsboard.common.util.DebugModeUtil; |
|
|
|
import org.thingsboard.rule.engine.api.TbNode; |
|
|
|
import org.thingsboard.rule.engine.api.TbNodeConfiguration; |
|
|
|
import org.thingsboard.server.actors.ActorSystemContext; |
|
|
|
@ -27,7 +28,6 @@ import org.thingsboard.server.common.data.id.RuleNodeId; |
|
|
|
import org.thingsboard.server.common.data.id.TenantId; |
|
|
|
import org.thingsboard.server.common.data.plugin.ComponentLifecycleState; |
|
|
|
import org.thingsboard.server.common.data.rule.RuleNode; |
|
|
|
import org.thingsboard.common.util.DebugModeUtil; |
|
|
|
import org.thingsboard.server.common.msg.TbMsg; |
|
|
|
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; |
|
|
|
import org.thingsboard.server.common.msg.queue.RuleNodeException; |
|
|
|
@ -78,8 +78,8 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod |
|
|
|
if (isMyNodePartition(newRuleNode)) { |
|
|
|
this.info = new RuleNodeInfo(entityId, ruleChainName, getName(newRuleNode)); |
|
|
|
boolean restartRequired = state != ComponentLifecycleState.ACTIVE || |
|
|
|
!(ruleNode.getType().equals(newRuleNode.getType()) && |
|
|
|
ruleNode.getConfiguration().equals(newRuleNode.getConfiguration())); |
|
|
|
!(ruleNode.getType().equals(newRuleNode.getType()) && |
|
|
|
ruleNode.getConfiguration().equals(newRuleNode.getConfiguration())); |
|
|
|
this.ruleNode = newRuleNode; |
|
|
|
this.defaultCtx.updateSelf(newRuleNode); |
|
|
|
if (restartRequired) { |
|
|
|
@ -130,8 +130,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod |
|
|
|
int maxRuleNodeExecutionsPerMessage = tenantProfileConfiguration.getMaxRuleNodeExecsPerMessage(); |
|
|
|
if (maxRuleNodeExecutionsPerMessage == 0 || ruleNodeCount < maxRuleNodeExecutionsPerMessage) { |
|
|
|
apiUsageClient.report(tenantId, tbMsg.getCustomerId(), ApiUsageRecordKey.RE_EXEC_COUNT); |
|
|
|
persistDebugInputIfAllowed(msg.getMsg(), "Self", tenantProfileConfiguration |
|
|
|
.getMaxDebugModeDurationMinutes(systemContext.getMaxDebugModeDurationMinutes())); |
|
|
|
persistDebugInputIfAllowed(msg.getMsg(), "Self"); |
|
|
|
try { |
|
|
|
tbNode.onMsg(defaultCtx, msg.getMsg()); |
|
|
|
} catch (Exception e) { |
|
|
|
@ -154,8 +153,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod |
|
|
|
int maxRuleNodeExecutionsPerMessage = tenantProfileConfiguration.getMaxRuleNodeExecsPerMessage(); |
|
|
|
if (maxRuleNodeExecutionsPerMessage == 0 || ruleNodeCount < maxRuleNodeExecutionsPerMessage) { |
|
|
|
apiUsageClient.report(tenantId, tbMsg.getCustomerId(), ApiUsageRecordKey.RE_EXEC_COUNT); |
|
|
|
persistDebugInputIfAllowed(msg.getMsg(), msg.getFromRelationType(), tenantProfileConfiguration |
|
|
|
.getMaxDebugModeDurationMinutes(systemContext.getMaxDebugModeDurationMinutes())); |
|
|
|
persistDebugInputIfAllowed(msg.getMsg(), msg.getFromRelationType()); |
|
|
|
try { |
|
|
|
tbNode.onMsg(msg.getCtx(), msg.getMsg()); |
|
|
|
} catch (Exception e) { |
|
|
|
@ -197,8 +195,8 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod |
|
|
|
|
|
|
|
private boolean isMyNodePartition(RuleNode ruleNode) { |
|
|
|
boolean result = ruleNode == null || !ruleNode.isSingletonMode() |
|
|
|
|| systemContext.getDiscoveryService().isMonolith() |
|
|
|
|| defaultCtx.isLocalEntity(ruleNode.getId()); |
|
|
|
|| systemContext.getDiscoveryService().isMonolith() |
|
|
|
|| defaultCtx.isLocalEntity(ruleNode.getId()); |
|
|
|
if (!result) { |
|
|
|
log.trace("[{}][{}] Is not my node partition", tenantId, entityId); |
|
|
|
} |
|
|
|
@ -218,7 +216,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod |
|
|
|
defaultCtx.ack(source); |
|
|
|
} |
|
|
|
|
|
|
|
private void persistDebugInputIfAllowed(TbMsg msg, String fromNodeConnectionType, int debugModeDurationMinutes) { |
|
|
|
private void persistDebugInputIfAllowed(TbMsg msg, String fromNodeConnectionType) { |
|
|
|
if (DebugModeUtil.isDebugAllAvailable(ruleNode)) { |
|
|
|
systemContext.persistDebugInput(tenantId, entityId, msg, fromNodeConnectionType); |
|
|
|
} |
|
|
|
|