Browse Source

Rule node debug strategies

pull/11861/head
ShvaykaD 2 years ago
parent
commit
19fa00a5fb
  1. 18
      application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json
  2. 14
      application/src/main/data/json/tenant/rule_chains/root_rule_chain.json
  3. 21
      application/src/main/data/upgrade/3.8.0/schema_update.sql
  4. 4
      application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
  5. 82
      application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
  6. 44
      application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
  7. 6
      application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
  8. 37
      application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
  9. 17
      application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeCtx.java
  10. 10
      application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
  11. 1
      application/src/main/java/org/thingsboard/server/controller/TenantProfileController.java
  12. 4
      application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/BaseRuleChainMetadataConstructor.java
  13. 1
      application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/tenant/TenantMsgConstructorV1.java
  14. 3
      application/src/main/resources/thingsboard.yml
  15. 836
      application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java
  16. 6
      application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java
  17. 3
      application/src/test/java/org/thingsboard/server/edge/RuleChainEdgeTest.java
  18. 11
      application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
  19. 3
      application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
  20. 5
      application/src/test/java/org/thingsboard/server/service/housekeeper/HousekeeperServiceTest.java
  21. 9
      application/src/test/java/org/thingsboard/server/service/sync/ie/ExportImportServiceSqlTest.java
  22. 9
      application/src/test/java/org/thingsboard/server/service/sync/vc/VersionControlTest.java
  23. 4
      common/data/src/main/java/org/thingsboard/server/common/data/msg/TbNodeConnectionType.java
  24. 72
      common/data/src/main/java/org/thingsboard/server/common/data/rule/DebugStrategy.java
  25. 24
      common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java
  26. 10
      common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java
  27. 3
      common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/TenantProfileConfiguration.java
  28. 10
      common/edge-api/src/main/proto/edge.proto
  29. 1
      dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
  30. 17
      dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeEntity.java
  31. 5
      dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java
  32. 3
      dao/src/main/resources/sql/schema-entities.sql

18
application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json

@ -20,7 +20,7 @@
},
"type": "org.thingsboard.rule.engine.profile.TbDeviceProfileNode",
"name": "Device Profile Node",
"debugMode": false,
"debugStrategy": "DISABLED",
"configuration": {
"persistAlarmRulesState": false,
"fetchAlarmRulesStateOnStart": false
@ -34,7 +34,7 @@
},
"type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode",
"name": "Save Timeseries",
"debugMode": false,
"debugStrategy": "DISABLED",
"configuration": {
"defaultTTL": 0
},
@ -47,7 +47,7 @@
},
"type": "org.thingsboard.rule.engine.telemetry.TbMsgAttributesNode",
"name": "Save Client Attributes",
"debugMode": false,
"debugStrategy": "DISABLED",
"configurationVersion": 2,
"configuration": {
"scope": "CLIENT_SCOPE",
@ -64,7 +64,7 @@
},
"type": "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode",
"name": "Message Type Switch",
"debugMode": false,
"debugStrategy": "DISABLED",
"configuration": {
"version": 0
},
@ -77,7 +77,7 @@
},
"type": "org.thingsboard.rule.engine.action.TbLogNode",
"name": "Log RPC from Device",
"debugMode": false,
"debugStrategy": "DISABLED",
"configuration": {
"scriptLang": "TBEL",
"jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);",
@ -92,7 +92,7 @@
},
"type": "org.thingsboard.rule.engine.action.TbLogNode",
"name": "Log Other",
"debugMode": false,
"debugStrategy": "DISABLED",
"configuration": {
"scriptLang": "TBEL",
"jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);",
@ -107,7 +107,7 @@
},
"type": "org.thingsboard.rule.engine.rpc.TbSendRPCRequestNode",
"name": "RPC Call Request",
"debugMode": false,
"debugStrategy": "DISABLED",
"configuration": {
"timeoutInSeconds": 60
},
@ -120,7 +120,7 @@
},
"type": "org.thingsboard.rule.engine.edge.TbMsgPushToCloudNode",
"name": "Push to cloud",
"debugMode": false,
"debugStrategy": "DISABLED",
"configuration": {
"scope": "SERVER_SCOPE"
},
@ -133,7 +133,7 @@
},
"type": "org.thingsboard.rule.engine.edge.TbMsgPushToCloudNode",
"name": "Push to cloud",
"debugMode": false,
"debugStrategy": "DISABLED",
"configuration": {
"scope": "SERVER_SCOPE"
},

14
application/src/main/data/json/tenant/rule_chains/root_rule_chain.json

@ -18,7 +18,7 @@
},
"type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode",
"name": "Save Timeseries",
"debugMode": false,
"debugStrategy": "DISABLED",
"configuration": {
"defaultTTL": 0
}
@ -30,7 +30,7 @@
},
"type": "org.thingsboard.rule.engine.telemetry.TbMsgAttributesNode",
"name": "Save Client Attributes",
"debugMode": false,
"debugStrategy": "DISABLED",
"configurationVersion": 2,
"configuration": {
"scope": "CLIENT_SCOPE",
@ -46,7 +46,7 @@
},
"type": "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode",
"name": "Message Type Switch",
"debugMode": false,
"debugStrategy": "DISABLED",
"configuration": {
"version": 0
}
@ -58,7 +58,7 @@
},
"type": "org.thingsboard.rule.engine.action.TbLogNode",
"name": "Log RPC from Device",
"debugMode": false,
"debugStrategy": "DISABLED",
"configuration": {
"scriptLang": "TBEL",
"jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);",
@ -72,7 +72,7 @@
},
"type": "org.thingsboard.rule.engine.action.TbLogNode",
"name": "Log Other",
"debugMode": false,
"debugStrategy": "DISABLED",
"configuration": {
"scriptLang": "TBEL",
"jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);",
@ -86,7 +86,7 @@
},
"type": "org.thingsboard.rule.engine.rpc.TbSendRPCRequestNode",
"name": "RPC Call Request",
"debugMode": false,
"debugStrategy": "DISABLED",
"configuration": {
"timeoutInSeconds": 60
}
@ -99,7 +99,7 @@
},
"type": "org.thingsboard.rule.engine.profile.TbDeviceProfileNode",
"name": "Device Profile Node",
"debugMode": false,
"debugStrategy": "DISABLED",
"configuration": {
"persistAlarmRulesState": false,
"fetchAlarmRulesStateOnStart": false

21
application/src/main/data/upgrade/3.8.0/schema_update.sql

@ -14,3 +14,24 @@
-- limitations under the License.
--
-- UPDATE RULE NODE DEBUG MODE TO DEBUG STRATEGY START
ALTER TABLE rule_node
ADD COLUMN IF NOT EXISTS debug_strategy varchar(32) DEFAULT 'DISABLED';
ALTER TABLE rule_node
ADD COLUMN IF NOT EXISTS last_update_ts bigint NOT NULL DEFAULT extract(epoch from now()) * 1000;
DO
$$
BEGIN
IF EXISTS (SELECT 1
FROM information_schema.columns
WHERE table_name = 'rule_node' AND column_name = 'debug_mode') THEN
UPDATE rule_node
SET debug_strategy = CASE WHEN debug_mode = true THEN 'ALL_EVENTS' ELSE 'DISABLED' END;
ALTER TABLE rule_node
DROP COLUMN debug_mode;
END IF;
END
$$;
-- UPDATE RULE NODE DEBUG MODE TO DEBUG STRATEGY END

4
application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java

@ -577,6 +577,10 @@ public class ActorSystemContext {
@Getter
private boolean externalNodeForceAck;
@Value("${actors.rule.node.max_debug_mode_duration:60}")
@Getter
private int maxRuleNodeDebugModeDurationMinutes;
@Value("${state.rule.node.deviceState.rateLimit:1:1,30:60,60:3600}")
@Getter
private String deviceStateNodeRateLimitConfig;

82
application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java

@ -63,6 +63,7 @@ import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.rule.DebugStrategy;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.data.rule.RuleNodeState;
import org.thingsboard.server.common.data.script.ScriptLanguage;
@ -129,7 +130,7 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.ENTITY_CREATED;
* Created by ashvayka on 19.03.18.
*/
@Slf4j
class DefaultTbContext implements TbContext {
public class DefaultTbContext implements TbContext {
private final ActorSystemContext mainCtx;
private final String ruleChainName;
@ -143,25 +144,25 @@ class DefaultTbContext implements TbContext {
@Override
public void tellSuccess(TbMsg msg) {
tellNext(msg, Collections.singleton(TbNodeConnectionType.SUCCESS), null);
tellNext(msg, Collections.singleton(TbNodeConnectionType.SUCCESS));
}
@Override
public void tellNext(TbMsg msg, String relationType) {
tellNext(msg, Collections.singleton(relationType), null);
tellNext(msg, Collections.singleton(relationType));
}
@Override
public void tellNext(TbMsg msg, Set<String> relationTypes) {
tellNext(msg, relationTypes, null);
}
private void tellNext(TbMsg msg, Set<String> relationTypes, Throwable th) {
if (nodeCtx.getSelf().isDebugMode()) {
relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th));
RuleNode ruleNode = nodeCtx.getSelf();
DebugStrategy debugStrategy = ruleNode.getDebugStrategy();
if (debugStrategy.shouldPersistDebugOutputForAllEvents(ruleNode.getLastUpdateTs(), msg.getTs(), getMaxRuleNodeDebugDurationMinutes())) {
relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), ruleNode.getId(), msg, relationType));
} else if (debugStrategy.shouldPersistDebugForFailureEventOnly(relationTypes)) {
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), ruleNode.getId(), msg, TbNodeConnectionType.FAILURE);
}
msg.getCallback().onProcessingEnd(nodeCtx.getSelf().getId());
nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null));
msg.getCallback().onProcessingEnd(ruleNode.getId());
nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(ruleNode.getRuleChainId(), ruleNode.getId(), relationTypes, msg, null));
}
@Override
@ -182,8 +183,12 @@ class DefaultTbContext implements TbContext {
if (item == null) {
ack(msg);
} else {
if (nodeCtx.getSelf().isDebugMode()) {
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType);
RuleNode ruleNode = nodeCtx.getSelf();
DebugStrategy debugStrategy = ruleNode.getDebugStrategy();
if (debugStrategy.shouldPersistDebugOutputForAllEvents(ruleNode.getLastUpdateTs(), msg.getTs(), getMaxRuleNodeDebugDurationMinutes())) {
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), ruleNode.getId(), msg, relationType);
} else if (debugStrategy.shouldPersistDebugForFailureEventOnly(relationType)) {
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), ruleNode.getId(), msg, relationType);
}
nodeCtx.getChainActor().tell(new RuleChainOutputMsg(item.getRuleChainId(), item.getRuleNodeId(), relationType, msg));
}
@ -213,11 +218,13 @@ class DefaultTbContext implements TbContext {
.setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
.setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(tbMsg)).build();
if (nodeCtx.getSelf().isDebugMode()) {
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "To Root Rule Chain");
}
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, new SimpleTbQueueCallback(
metadata -> {
RuleNode ruleNode = nodeCtx.getSelf();
DebugStrategy debugStrategy = ruleNode.getDebugStrategy();
if (debugStrategy.shouldPersistDebugOutputForAllEvents(ruleNode.getLastUpdateTs(), tbMsg.getTs(), getMaxRuleNodeDebugDurationMinutes())) {
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), ruleNode.getId(), tbMsg, TbNodeConnectionType.TO_ROOT_RULE_CHAIN);
}
if (onSuccess != null) {
onSuccess.run();
}
@ -299,8 +306,9 @@ class DefaultTbContext implements TbContext {
}
return;
}
RuleChainId ruleChainId = nodeCtx.getSelf().getRuleChainId();
RuleNodeId ruleNodeId = nodeCtx.getSelf().getId();
RuleNode ruleNode = nodeCtx.getSelf();
RuleChainId ruleChainId = ruleNode.getRuleChainId();
RuleNodeId ruleNodeId = ruleNode.getId();
TbMsg tbMsg = TbMsg.newMsg(source, queueName, ruleChainId, ruleNodeId);
TransportProtos.ToRuleEngineMsg.Builder msg = TransportProtos.ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
@ -310,12 +318,15 @@ class DefaultTbContext implements TbContext {
if (failureMessage != null) {
msg.setFailureMessage(failureMessage);
}
if (nodeCtx.getSelf().isDebugMode()) {
relationTypes.forEach(relationType ->
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, relationType, null, failureMessage));
}
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg.build(), new SimpleTbQueueCallback(
metadata -> {
DebugStrategy debugStrategy = ruleNode.getDebugStrategy();
if (debugStrategy.shouldPersistDebugOutputForAllEvents(ruleNode.getLastUpdateTs(), tbMsg.getTs(), getMaxRuleNodeDebugDurationMinutes())) {
relationTypes.forEach(relationType ->
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), ruleNode.getId(), tbMsg, relationType, null, failureMessage));
} else if (debugStrategy.shouldPersistDebugForFailureEventOnly(relationTypes)) {
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), ruleNode.getId(), tbMsg, TbNodeConnectionType.FAILURE, null, failureMessage);
}
if (onSuccess != null) {
onSuccess.run();
}
@ -331,10 +342,12 @@ class DefaultTbContext implements TbContext {
@Override
public void ack(TbMsg tbMsg) {
if (nodeCtx.getSelf().isDebugMode()) {
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "ACK", null);
RuleNode ruleNode = nodeCtx.getSelf();
DebugStrategy debugStrategy = ruleNode.getDebugStrategy();
if (debugStrategy.shouldPersistDebugOutputForAllEvents(ruleNode.getLastUpdateTs(), tbMsg.getTs(), getMaxRuleNodeDebugDurationMinutes())) {
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), ruleNode.getId(), tbMsg, TbNodeConnectionType.ACK);
}
tbMsg.getCallback().onProcessingEnd(nodeCtx.getSelf().getId());
tbMsg.getCallback().onProcessingEnd(ruleNode.getId());
tbMsg.getCallback().onSuccess();
}
@ -349,12 +362,13 @@ class DefaultTbContext implements TbContext {
@Override
public void tellFailure(TbMsg msg, Throwable th) {
if (nodeCtx.getSelf().isDebugMode()) {
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, TbNodeConnectionType.FAILURE, th);
RuleNode ruleNode = nodeCtx.getSelf();
if (ruleNode.getDebugStrategy().shouldPersistDebugForFailureEventOnly(ruleNode.getLastUpdateTs(), msg.getTs(), getMaxRuleNodeDebugDurationMinutes())) {
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), ruleNode.getId(), msg, TbNodeConnectionType.FAILURE, th);
}
String failureMessage = getFailureMessage(th);
nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getRuleChainId(),
nodeCtx.getSelf().getId(), Collections.singleton(TbNodeConnectionType.FAILURE),
nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(ruleNode.getRuleChainId(),
ruleNode.getId(), Collections.singleton(TbNodeConnectionType.FAILURE),
msg, failureMessage));
}
@ -1004,4 +1018,14 @@ class DefaultTbContext implements TbContext {
return failureMessage;
}
private int getMaxRuleNodeDebugDurationMinutes() {
if (!DebugStrategy.ALL_EVENTS.equals(nodeCtx.getSelf().getDebugStrategy())) {
return 0;
}
var configuration = mainCtx.getTenantProfileCache()
.get(getTenantId()).getProfileData().getConfiguration();
int systemMaxRuleNodeDebugModeDurationMinutes = mainCtx.getMaxRuleNodeDebugModeDurationMinutes();
return configuration.getMaxRuleNodeDebugModeDurationMinutes(systemMaxRuleNodeDebugModeDurationMinutes);
}
}

44
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java

@ -43,7 +43,6 @@ import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.common.msg.queue.RuleNodeException;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.stats.TbApiUsageReportClient;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.queue.TbQueueCallback;
@ -72,7 +71,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
private final Map<RuleNodeId, List<RuleNodeRelation>> nodeRoutes;
private final RuleChainService service;
private final TbClusterService clusterService;
private final TbApiUsageReportClient apiUsageClient;
private String ruleChainName;
private RuleNodeId firstId;
@ -81,7 +79,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
RuleChainActorMessageProcessor(TenantId tenantId, RuleChain ruleChain, ActorSystemContext systemContext, TbActorRef parent, TbActorRef self) {
super(systemContext, tenantId, ruleChain.getId());
this.apiUsageClient = systemContext.getApiUsageClient();
this.ruleChainName = ruleChain.getName();
this.parent = parent;
this.self = self;
@ -138,7 +135,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
}
Set<RuleNodeId> existingNodes = ruleNodeList.stream().map(RuleNode::getId).collect(Collectors.toSet());
List<RuleNodeId> removedRules = nodeActors.keySet().stream().filter(node -> !existingNodes.contains(node)).collect(Collectors.toList());
List<RuleNodeId> removedRules = nodeActors.keySet().stream().filter(node -> !existingNodes.contains(node)).toList();
removedRules.forEach(ruleNodeId -> {
log.trace("[{}][{}] Removing rule node [{}]", tenantId, entityId, ruleNodeId);
RuleNodeCtx removed = nodeActors.remove(ruleNodeId);
@ -177,7 +174,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
for (RuleNode ruleNode : ruleNodeList) {
List<EntityRelation> relations = service.getRuleNodeRelations(TenantId.SYS_TENANT_ID, ruleNode.getId());
log.trace("[{}][{}][{}] Processing rule node relations [{}]", tenantId, entityId, ruleNode.getId(), relations.size());
if (relations.size() == 0) {
if (relations.isEmpty()) {
nodeRoutes.put(ruleNode.getId(), Collections.emptyList());
} else {
for (EntityRelation relation : relations) {
@ -238,45 +235,53 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
}
public void onRuleChainInputMsg(RuleChainInputMsg envelope) {
var msg = envelope.getMsg();
if (!checkMsgValid(msg)) {
var tbMsg = envelope.getMsg();
if (!checkMsgValid(tbMsg)) {
return;
}
if (entityId.equals(envelope.getRuleChainId())) {
onTellNext(envelope.getMsg(), false);
onTellNext(tbMsg, false);
} else {
parent.tell(envelope);
}
}
public void onRuleChainOutputMsg(RuleChainOutputMsg envelope) {
var msg = envelope.getMsg();
if (!checkMsgValid(msg)) {
var tbMsg = envelope.getMsg();
if (!checkMsgValid(tbMsg)) {
return;
}
if (entityId.equals(envelope.getRuleChainId())) {
var originatorNodeId = envelope.getTargetRuleNodeId();
RuleNodeCtx ruleNodeCtx = nodeActors.get(originatorNodeId);
if (ruleNodeCtx != null && ruleNodeCtx.getSelf().isDebugMode()) {
systemContext.persistDebugOutput(tenantId, originatorNodeId, envelope.getMsg(), envelope.getRelationType());
if (ruleNodeCtx != null) {
var ruleNode = ruleNodeCtx.getSelf();
var debugStrategy = ruleNode.getDebugStrategy();
int maxRuleNodeDebugModeDurationMinutes = getTenantProfileConfiguration()
.getMaxRuleNodeDebugModeDurationMinutes(systemContext.getMaxRuleNodeDebugModeDurationMinutes());
boolean shouldPersistDebugOutput = debugStrategy.shouldPersistDebugOutputForAllEvents(ruleNode.getLastUpdateTs(), tbMsg.getTs(), maxRuleNodeDebugModeDurationMinutes) ||
debugStrategy.shouldPersistDebugForFailureEventOnly(envelope.getRelationType());
if (shouldPersistDebugOutput) {
systemContext.persistDebugOutput(tenantId, originatorNodeId, tbMsg, envelope.getRelationType());
}
}
onTellNext(envelope.getMsg(), originatorNodeId, Collections.singleton(envelope.getRelationType()), RuleNodeException.UNKNOWN);
onTellNext(tbMsg, originatorNodeId, Collections.singleton(envelope.getRelationType()), RuleNodeException.UNKNOWN);
} else {
parent.tell(envelope);
}
}
void onRuleChainToRuleChainMsg(RuleChainToRuleChainMsg envelope) {
var msg = envelope.getMsg();
if (!checkMsgValid(msg)) {
var tbMsg = envelope.getMsg();
if (!checkMsgValid(tbMsg)) {
return;
}
try {
checkComponentStateActive(envelope.getMsg());
checkComponentStateActive(tbMsg);
if (firstNode != null) {
pushMsgToNode(firstNode, envelope.getMsg(), envelope.getFromRelationType());
pushMsgToNode(firstNode, tbMsg, envelope.getFromRelationType());
} else {
envelope.getMsg().getCallback().onSuccess();
tbMsg.getCallback().onSuccess();
}
} catch (RuleNodeException e) {
log.debug("Rule Chain is not active. Current state [{}] for processor [{}][{}] tenant [{}]", state, entityId.getEntityType(), entityId, tenantId);
@ -389,7 +394,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
private void pushMsgToNode(RuleNodeCtx nodeCtx, TbMsg msg, String fromRelationType) {
if (nodeCtx != null) {
nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, ruleChainName, nodeCtx), msg, fromRelationType));
var tbCtx = new DefaultTbContext(systemContext, ruleChainName, nodeCtx);
nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(tbCtx, msg, fromRelationType));
} else {
log.error("[{}][{}] RuleNodeCtx is empty", entityId, ruleChainName);
msg.getCallback().onFailure(new RuleEngineException("Rule Node CTX is empty"));

6
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java

@ -37,7 +37,8 @@ public class RuleNodeActor extends RuleEngineComponentActor<RuleNodeId, RuleNode
private final RuleChainId ruleChainId;
private final RuleNodeId ruleNodeId;
private RuleNodeActor(ActorSystemContext systemContext, TenantId tenantId, RuleChainId ruleChainId, String ruleChainName, RuleNodeId ruleNodeId) {
private RuleNodeActor(ActorSystemContext systemContext, TenantId tenantId,
RuleChainId ruleChainId, String ruleChainName, RuleNodeId ruleNodeId) {
super(systemContext, tenantId, ruleNodeId);
this.ruleChainName = ruleChainName;
this.ruleChainId = ruleChainId;
@ -46,7 +47,7 @@ public class RuleNodeActor extends RuleEngineComponentActor<RuleNodeId, RuleNode
@Override
protected RuleNodeActorMessageProcessor createProcessor(TbActorCtx ctx) {
return new RuleNodeActorMessageProcessor(tenantId, this.ruleChainName, ruleNodeId, systemContext, ctx.getParentRef(), ctx);
return new RuleNodeActorMessageProcessor(tenantId, this.ruleChainName, ruleNodeId, systemContext, ctx);
}
@Override
@ -118,7 +119,6 @@ public class RuleNodeActor extends RuleEngineComponentActor<RuleNodeId, RuleNode
this.ruleChainId = ruleChainId;
this.ruleChainName = ruleChainName;
this.ruleNodeId = ruleNodeId;
}
@Override

37
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java

@ -20,7 +20,6 @@ import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.actors.TbRuleNodeUpdateException;
import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
@ -51,13 +50,13 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
private TbNode tbNode;
private RuleNodeInfo info;
RuleNodeActorMessageProcessor(TenantId tenantId, String ruleChainName, RuleNodeId ruleNodeId, ActorSystemContext systemContext
, TbActorRef parent, TbActorRef self) {
RuleNodeActorMessageProcessor(TenantId tenantId, String ruleChainName,
RuleNodeId ruleNodeId, ActorSystemContext systemContext, TbActorCtx selfActor) {
super(systemContext, tenantId, ruleNodeId);
this.apiUsageClient = systemContext.getApiUsageClient();
this.ruleChainName = ruleChainName;
this.ruleNode = systemContext.getRuleChainService().findRuleNodeById(tenantId, entityId);
this.defaultCtx = new DefaultTbContext(systemContext, ruleChainName, new RuleNodeCtx(tenantId, parent, self, ruleNode));
this.defaultCtx = new DefaultTbContext(systemContext, ruleChainName, new RuleNodeCtx(tenantId, selfActor, ruleNode));
this.info = new RuleNodeInfo(ruleNodeId, ruleChainName, getName(ruleNode));
}
@ -78,7 +77,8 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
if (isMyNodePartition(newRuleNode)) {
this.info = new RuleNodeInfo(entityId, ruleChainName, getName(newRuleNode));
boolean restartRequired = state != ComponentLifecycleState.ACTIVE ||
!(ruleNode.getType().equals(newRuleNode.getType()) && ruleNode.getConfiguration().equals(newRuleNode.getConfiguration()));
!(ruleNode.getType().equals(newRuleNode.getType()) &&
ruleNode.getConfiguration().equals(newRuleNode.getConfiguration()));
this.ruleNode = newRuleNode;
this.defaultCtx.updateSelf(newRuleNode);
if (restartRequired) {
@ -125,12 +125,12 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
checkComponentStateActive(msg.getMsg());
TbMsg tbMsg = msg.getMsg();
int ruleNodeCount = tbMsg.getAndIncrementRuleNodeCounter();
int maxRuleNodeExecutionsPerMessage = getTenantProfileConfiguration().getMaxRuleNodeExecsPerMessage();
var tenantProfileConfiguration = getTenantProfileConfiguration();
int maxRuleNodeExecutionsPerMessage = tenantProfileConfiguration.getMaxRuleNodeExecsPerMessage();
if (maxRuleNodeExecutionsPerMessage == 0 || ruleNodeCount < maxRuleNodeExecutionsPerMessage) {
apiUsageClient.report(tenantId, tbMsg.getCustomerId(), ApiUsageRecordKey.RE_EXEC_COUNT);
if (ruleNode.isDebugMode()) {
systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), "Self");
}
persistDebugInputIfAllowed(msg.getMsg(), "Self", tenantProfileConfiguration
.getMaxRuleNodeDebugModeDurationMinutes(systemContext.getMaxRuleNodeDebugModeDurationMinutes()));
try {
tbNode.onMsg(defaultCtx, msg.getMsg());
} catch (Exception e) {
@ -149,12 +149,12 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
checkComponentStateActive(msg.getMsg());
TbMsg tbMsg = msg.getMsg();
int ruleNodeCount = tbMsg.getAndIncrementRuleNodeCounter();
int maxRuleNodeExecutionsPerMessage = getTenantProfileConfiguration().getMaxRuleNodeExecsPerMessage();
var tenantProfileConfiguration = getTenantProfileConfiguration();
int maxRuleNodeExecutionsPerMessage = tenantProfileConfiguration.getMaxRuleNodeExecsPerMessage();
if (maxRuleNodeExecutionsPerMessage == 0 || ruleNodeCount < maxRuleNodeExecutionsPerMessage) {
apiUsageClient.report(tenantId, tbMsg.getCustomerId(), ApiUsageRecordKey.RE_EXEC_COUNT);
if (ruleNode.isDebugMode()) {
systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), msg.getFromRelationType());
}
persistDebugInputIfAllowed(msg.getMsg(), msg.getFromRelationType(), tenantProfileConfiguration
.getMaxRuleNodeDebugModeDurationMinutes(systemContext.getMaxRuleNodeDebugModeDurationMinutes()));
try {
tbNode.onMsg(msg.getCtx(), msg.getMsg());
} catch (Exception e) {
@ -196,8 +196,8 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
private boolean isMyNodePartition(RuleNode ruleNode) {
boolean result = ruleNode == null || !ruleNode.isSingletonMode()
|| systemContext.getDiscoveryService().isMonolith()
|| defaultCtx.isLocalEntity(ruleNode.getId());
|| systemContext.getDiscoveryService().isMonolith()
|| defaultCtx.isLocalEntity(ruleNode.getId());
if (!result) {
log.trace("[{}][{}] Is not my node partition", tenantId, entityId);
}
@ -216,4 +216,11 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
systemContext.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), toQueueMsg, null);
defaultCtx.ack(source);
}
private void persistDebugInputIfAllowed(TbMsg msg, String fromNodeConnectionType, int debugModeDurationMinutes) {
if (ruleNode.getDebugStrategy().shouldPersistDebugInput(ruleNode.getLastUpdateTs(), msg.getTs(), debugModeDurationMinutes)) {
systemContext.persistDebugInput(tenantId, entityId, msg, fromNodeConnectionType);
}
}
}

17
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeCtx.java

@ -15,8 +15,8 @@
*/
package org.thingsboard.server.actors.ruleChain;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rule.RuleNode;
@ -25,10 +25,21 @@ import org.thingsboard.server.common.data.rule.RuleNode;
* Created by ashvayka on 19.03.18.
*/
@Data
@AllArgsConstructor
final class RuleNodeCtx {
public final class RuleNodeCtx {
private final TenantId tenantId;
private final TbActorRef chainActor;
private final TbActorRef selfActor;
private RuleNode self;
RuleNodeCtx(TenantId tenantId, TbActorCtx selfActor, RuleNode self) {
this(tenantId, selfActor.getParentRef(), selfActor, self);
}
RuleNodeCtx(TenantId tenantId, TbActorRef chainActor, TbActorRef selfActor, RuleNode self) {
this.tenantId = tenantId;
this.chainActor = chainActor;
this.selfActor = selfActor;
this.self = self;
}
}

10
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java

@ -26,24 +26,24 @@ import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbRuleEngineActorMsg;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import java.io.Serial;
import java.io.Serializable;
import java.util.Set;
/**
* Created by ashvayka on 19.03.18.
*/
@Getter
@EqualsAndHashCode(callSuper = true)
@ToString
class RuleNodeToRuleChainTellNextMsg extends TbRuleEngineActorMsg implements Serializable {
public class RuleNodeToRuleChainTellNextMsg extends TbRuleEngineActorMsg implements Serializable {
@Serial
private static final long serialVersionUID = 4577026446412871820L;
@Getter
private final RuleChainId ruleChainId;
@Getter
private final RuleNodeId originator;
@Getter
private final Set<String> relationTypes;
@Getter
private final String failureMessage;
public RuleNodeToRuleChainTellNextMsg(RuleChainId ruleChainId, RuleNodeId originator, Set<String> relationTypes, TbMsg tbMsg, String failureMessage) {

1
application/src/main/java/org/thingsboard/server/controller/TenantProfileController.java

@ -151,6 +151,7 @@ public class TenantProfileController extends BaseController {
" \"maxJSExecutions\": 5000000,\n" +
" \"maxDPStorageDays\": 0,\n" +
" \"maxRuleNodeExecutionsPerMessage\": 50,\n" +
" \"maxRuleNodeDebugDurationMinutes\": 15,\n" +
" \"maxEmails\": 0,\n" +
" \"maxSms\": 0,\n" +
" \"maxCreatedAlarms\": 1000,\n" +

4
application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/BaseRuleChainMetadataConstructor.java

@ -24,6 +24,7 @@ import org.thingsboard.server.common.data.rule.NodeConnectionInfo;
import org.thingsboard.server.common.data.rule.RuleChainConnectionInfo;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.gen.edge.v1.DebugStrategy;
import org.thingsboard.server.gen.edge.v1.EdgeVersion;
import org.thingsboard.server.gen.edge.v1.NodeConnectionInfoProto;
import org.thingsboard.server.gen.edge.v1.RuleChainConnectionInfoProto;
@ -88,7 +89,8 @@ public abstract class BaseRuleChainMetadataConstructor implements RuleChainMetad
.setIdLSB(node.getId().getId().getLeastSignificantBits())
.setType(node.getType())
.setName(node.getName())
.setDebugMode(node.isDebugMode())
.setLastUpdateTs(node.getLastUpdateTs())
.setDebugStrategy(DebugStrategy.forNumber(node.getDebugStrategy().getProtoNumber()))
.setConfiguration(JacksonUtil.toString(node.getConfiguration()))
.setAdditionalInfo(JacksonUtil.toString(node.getAdditionalInfo()))
.setSingletonMode(node.isSingletonMode())

1
application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/tenant/TenantMsgConstructorV1.java

@ -88,6 +88,7 @@ public class TenantMsgConstructorV1 implements TenantMsgConstructor {
configuration.setMaxTransportDataPoints(0);
configuration.setRuleEngineExceptionsTtlDays(0);
configuration.setMaxRuleNodeExecutionsPerMessage(0);
configuration.setMaxRuleNodeDebugDurationMinutes(0);
tenantProfileData.setConfiguration(configuration);
tenantProfile.setProfileData(tenantProfileData);

3
application/src/main/resources/thingsboard.yml

@ -468,6 +468,9 @@ actors:
node:
# Errors for particular actor are persisted once per specified amount of milliseconds
error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}"
# The maximum allowed duration (in minutes) for the debug mode to be enabled if the debug strategy is set to ALL_EVENTS.
# If a specific value is set in the tenant profile, the minimum between value from profile and this setting will be used.
max_debug_mode_duration: "${ACTORS_RULE_NODE_MAX_DEBUG_MODE_DURATION_MINUTES:60}"
transaction:
# Size of queues that store messages for transaction rule nodes
queue_size: "${ACTORS_RULE_TRANSACTION_QUEUE_SIZE:15000}"

836
application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java

@ -0,0 +1,836 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.rule;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.actors.ruleChain.DefaultTbContext;
import org.thingsboard.server.actors.ruleChain.RuleChainOutputMsg;
import org.thingsboard.server.actors.ruleChain.RuleNodeCtx;
import org.thingsboard.server.actors.ruleChain.RuleNodeToRuleChainTellNextMsg;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.server.common.data.rule.DebugStrategy;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileConfiguration;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.TbMsgProcessingStackItem;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.queue.common.SimpleTbQueueCallback;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.ArgumentMatchers.notNull;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@SuppressWarnings("ResultOfMethodCallIgnored")
@ExtendWith(MockitoExtension.class)
class DefaultTbContextTest {
private final String EXCEPTION_MSG = "Some runtime exception!";
private final RuntimeException EXCEPTION = new RuntimeException(EXCEPTION_MSG);
private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("c7bf4c85-923c-4688-a4b5-0f8a0feb7cd5"));
private final RuleNodeId RULE_NODE_ID = new RuleNodeId(UUID.fromString("1ca5e2ef-1309-41d9-bafa-709e9df0e2a6"));
private final RuleChainId RULE_CHAIN_ID = new RuleChainId(UUID.fromString("b87c4123-f9f2-41a6-9a09-e3a5b6580b11"));
@Mock
private ActorSystemContext mainCtxMock;
@Mock
private RuleNodeCtx nodeCtxMock;
@Mock
private TbActorRef chainActorMock;
private DefaultTbContext defaultTbContext;
@BeforeEach
public void setUp() {
defaultTbContext = new DefaultTbContext(mainCtxMock, "Test rule chain name", nodeCtxMock);
}
@Test
public void givenDebugStrategyOnlyFailureEvents_whenTellSuccess_thenVerifyDebugOutputNotPersisted() {
// GIVEN
var callbackMock = mock(TbMsgCallback.class);
var msg = getTbMsgWithCallback(callbackMock);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugStrategy(DebugStrategy.ONLY_FAILURE_EVENTS);
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(nodeCtxMock.getChainActor()).willReturn(chainActorMock);
// WHEN
defaultTbContext.tellSuccess(msg);
// THEN
then(nodeCtxMock).should().getChainActor();
then(nodeCtxMock).shouldHaveNoMoreInteractions();
then(mainCtxMock).shouldHaveNoInteractions();
checkTellNextCommonLogic(callbackMock, TbNodeConnectionType.SUCCESS, msg);
}
@Test
public void givenDebugStrategyOnlyFailureEventsAndSuccessConnection_whenTellNext_thenVerifyDebugOutputNotPersisted() {
// GIVEN
var callbackMock = mock(TbMsgCallback.class);
var msg = getTbMsgWithCallback(callbackMock);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugStrategy(DebugStrategy.ONLY_FAILURE_EVENTS);
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(nodeCtxMock.getChainActor()).willReturn(chainActorMock);
// WHEN
defaultTbContext.tellNext(msg, TbNodeConnectionType.SUCCESS);
// THEN
then(nodeCtxMock).should().getChainActor();
then(nodeCtxMock).shouldHaveNoMoreInteractions();
then(mainCtxMock).shouldHaveNoInteractions();
checkTellNextCommonLogic(callbackMock, TbNodeConnectionType.SUCCESS, msg);
}
@MethodSource
@ParameterizedTest
void givenDebugStrategyOnlyFailureEventsAndConnections_whenTellNext_thenVerifyDebugOutputPersisted(Set<String> connections) {
// GIVEN
var callbackMock = mock(TbMsgCallback.class);
var msg = getTbMsgWithCallback(callbackMock);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugStrategy(DebugStrategy.ONLY_FAILURE_EVENTS);
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(nodeCtxMock.getChainActor()).willReturn(chainActorMock);
// WHEN
defaultTbContext.tellNext(msg, connections);
// THEN
then(nodeCtxMock).should().getChainActor();
then(nodeCtxMock).shouldHaveNoMoreInteractions();
then(mainCtxMock).should().persistDebugOutput(TENANT_ID, RULE_NODE_ID, msg, TbNodeConnectionType.FAILURE);
then(mainCtxMock).shouldHaveNoMoreInteractions();
checkTellNextCommonLogic(callbackMock, connections, msg);
}
private static Stream<Set<String>> givenDebugStrategyOnlyFailureEventsAndConnections_whenTellNext_thenVerifyDebugOutputPersisted() {
return Stream.of(
Collections.singleton(TbNodeConnectionType.FAILURE),
Set.of(TbNodeConnectionType.FAILURE, TbNodeConnectionType.SUCCESS)
);
}
@MethodSource
@ParameterizedTest
void givenDebugStrategyDisabledAndConnections_whenTellNext_thenVerifyDebugOutputNotPersisted(Set<String> connections) {
// GIVEN
var callbackMock = mock(TbMsgCallback.class);
var msg = getTbMsgWithCallback(callbackMock);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugStrategy(DebugStrategy.DISABLED);
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(nodeCtxMock.getChainActor()).willReturn(chainActorMock);
// WHEN
defaultTbContext.tellNext(msg, connections);
// THEN
then(nodeCtxMock).should().getChainActor();
then(nodeCtxMock).shouldHaveNoMoreInteractions();
then(mainCtxMock).shouldHaveNoInteractions();
checkTellNextCommonLogic(callbackMock, connections, msg);
}
private static Stream<Set<String>> givenDebugStrategyDisabledAndConnections_whenTellNext_thenVerifyDebugOutputNotPersisted() {
return Stream.of(
Collections.singleton(TbNodeConnectionType.FAILURE),
Collections.singleton(TbNodeConnectionType.SUCCESS),
Set.of(TbNodeConnectionType.FAILURE, TbNodeConnectionType.SUCCESS)
);
}
@MethodSource
@ParameterizedTest
void givenDebugStrategyAllEventsAndConnection_whenTellNext_thenVerifyDebugOutputPersisted(String connection) {
// GIVEN
var callbackMock = mock(TbMsgCallback.class);
var msg = getTbMsgWithCallback(callbackMock);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setLastUpdateTs(System.currentTimeMillis());
ruleNode.setDebugStrategy(DebugStrategy.ALL_EVENTS);
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(nodeCtxMock.getChainActor()).willReturn(chainActorMock);
mockGetMaxRuleNodeDebugModeDurationMinutes();
// WHEN
defaultTbContext.tellNext(msg, connection);
// THEN
then(nodeCtxMock).should().getChainActor();
then(nodeCtxMock).shouldHaveNoMoreInteractions();
then(mainCtxMock).should().getTenantProfileCache();
then(mainCtxMock).should().getMaxRuleNodeDebugModeDurationMinutes();
then(mainCtxMock).should().persistDebugOutput(TENANT_ID, RULE_NODE_ID, msg, connection);
then(mainCtxMock).shouldHaveNoMoreInteractions();
checkTellNextCommonLogic(callbackMock, connection, msg);
}
private static Stream<String> givenDebugStrategyAllEventsAndConnection_whenTellNext_thenVerifyDebugOutputPersisted() {
return failureAndSuccessConnection();
}
@Test
public void givenDebugStrategyAllEventsAndFailureAndSuccessConnection_whenTellNext_thenVerifyDebugOutputPersistedForAllEvents() {
// GIVEN
var callbackMock = mock(TbMsgCallback.class);
var msg = getTbMsgWithCallback(callbackMock);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setLastUpdateTs(System.currentTimeMillis());
ruleNode.setDebugStrategy(DebugStrategy.ALL_EVENTS);
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(nodeCtxMock.getChainActor()).willReturn(chainActorMock);
mockGetMaxRuleNodeDebugModeDurationMinutes();
// WHEN
Set<String> connections = failureAndSuccessConnection().collect(Collectors.toSet());
defaultTbContext.tellNext(msg, connections);
// THEN
then(nodeCtxMock).should().getChainActor();
then(nodeCtxMock).shouldHaveNoMoreInteractions();
then(mainCtxMock).should().getTenantProfileCache();
then(mainCtxMock).should().getMaxRuleNodeDebugModeDurationMinutes();
var nodeConnectionsCaptor = ArgumentCaptor.forClass(String.class);
int wantedNumberOfInvocations = connections.size();
then(mainCtxMock).should(times(wantedNumberOfInvocations)).persistDebugOutput(eq(TENANT_ID), eq(RULE_NODE_ID), eq(msg), nodeConnectionsCaptor.capture());
then(mainCtxMock).shouldHaveNoMoreInteractions();
assertThat(nodeConnectionsCaptor.getAllValues()).hasSize(wantedNumberOfInvocations);
assertThat(nodeConnectionsCaptor.getAllValues()).containsExactlyInAnyOrderElementsOf(connections);
checkTellNextCommonLogic(callbackMock, connections, msg);
}
private static Stream<String> failureAndSuccessConnection() {
return Stream.of(TbNodeConnectionType.FAILURE, TbNodeConnectionType.SUCCESS);
}
@Test
public void givenDebugStrategyOnlyFailureEventsAndFailureConnection_whenOutput_thenVerifyDebugOutputPersisted() {
// GIVEN
var msgMock = mock(TbMsg.class);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugStrategy(DebugStrategy.ONLY_FAILURE_EVENTS);
given(msgMock.popFormStack()).willReturn(new TbMsgProcessingStackItem(RULE_CHAIN_ID, RULE_NODE_ID));
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(nodeCtxMock.getChainActor()).willReturn(chainActorMock);
// WHEN
defaultTbContext.output(msgMock, TbNodeConnectionType.FAILURE);
// THEN
checkOutputCommonLogic(msgMock, TbNodeConnectionType.FAILURE);
then(mainCtxMock).should().persistDebugOutput(TENANT_ID, RULE_NODE_ID, msgMock, TbNodeConnectionType.FAILURE);
then(mainCtxMock).shouldHaveNoMoreInteractions();
then(nodeCtxMock).shouldHaveNoMoreInteractions();
}
@Test
public void givenDebugStrategyOnlyFailureEventsAndSuccessConnection_whenOutput_thenVerifyDebugOutputNotPersisted() {
// GIVEN
var msgMock = mock(TbMsg.class);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugStrategy(DebugStrategy.ONLY_FAILURE_EVENTS);
given(msgMock.popFormStack()).willReturn(new TbMsgProcessingStackItem(RULE_CHAIN_ID, RULE_NODE_ID));
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(nodeCtxMock.getChainActor()).willReturn(chainActorMock);
// WHEN
defaultTbContext.output(msgMock, TbNodeConnectionType.SUCCESS);
// THEN
checkOutputCommonLogic(msgMock, TbNodeConnectionType.SUCCESS);
then(mainCtxMock).shouldHaveNoMoreInteractions();
then(nodeCtxMock).shouldHaveNoMoreInteractions();
}
@ParameterizedTest
@ValueSource(strings = {TbNodeConnectionType.SUCCESS, TbNodeConnectionType.FAILURE})
void givenDebugStrategyDisabled_whenOutput_thenVerifyDebugOutputNotPersisted(String nodeConnection) {
// GIVEN
var msgMock = mock(TbMsg.class);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugStrategy(DebugStrategy.DISABLED);
given(msgMock.popFormStack()).willReturn(new TbMsgProcessingStackItem(RULE_CHAIN_ID, RULE_NODE_ID));
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(nodeCtxMock.getChainActor()).willReturn(chainActorMock);
// WHEN
defaultTbContext.output(msgMock, nodeConnection);
// THEN
checkOutputCommonLogic(msgMock, nodeConnection);
then(mainCtxMock).shouldHaveNoMoreInteractions();
then(nodeCtxMock).shouldHaveNoMoreInteractions();
}
@ParameterizedTest
@ValueSource(strings = {TbNodeConnectionType.SUCCESS, TbNodeConnectionType.FAILURE})
void givenDebugStrategyAllEvents_whenOutput_thenVerifyDebugOutputPersisted(String nodeConnection) {
// GIVEN
var msgMock = mock(TbMsg.class);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugStrategy(DebugStrategy.ALL_EVENTS);
given(msgMock.popFormStack()).willReturn(new TbMsgProcessingStackItem(RULE_CHAIN_ID, RULE_NODE_ID));
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(nodeCtxMock.getChainActor()).willReturn(chainActorMock);
mockGetMaxRuleNodeDebugModeDurationMinutes();
// WHEN
defaultTbContext.output(msgMock, nodeConnection);
// THEN
checkOutputCommonLogic(msgMock, nodeConnection);
then(mainCtxMock).should().getTenantProfileCache();
then(mainCtxMock).should().getMaxRuleNodeDebugModeDurationMinutes();
then(mainCtxMock).should().persistDebugOutput(TENANT_ID, RULE_NODE_ID, msgMock, nodeConnection);
then(mainCtxMock).shouldHaveNoMoreInteractions();
then(nodeCtxMock).shouldHaveNoMoreInteractions();
}
@Test
public void givenEmptyStack_whenOutput_thenVerifyMsgAck() {
// GIVEN
var msgMock = mock(TbMsg.class);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugStrategy(DebugStrategy.DISABLED);
given(msgMock.popFormStack()).willReturn(null);
TbMsgCallback callbackMock = mock(TbMsgCallback.class);
given(msgMock.getCallback()).willReturn(callbackMock);
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
// WHEN
defaultTbContext.output(msgMock, TbNodeConnectionType.SUCCESS);
// THEN
then(msgMock).should().popFormStack();
then(callbackMock).should().onProcessingEnd(RULE_NODE_ID);
then(callbackMock).should().onSuccess();
then(nodeCtxMock).should(never()).getChainActor();
}
@Test
public void givenEmptyStackAndDebugStrategyAllEvents_whenOutput_thenVerifyMsgAckAndDebugOutputPersisted() {
// GIVEN
var msgMock = mock(TbMsg.class);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugStrategy(DebugStrategy.ALL_EVENTS);
ruleNode.setLastUpdateTs(System.currentTimeMillis());
given(msgMock.popFormStack()).willReturn(null);
TbMsgCallback callbackMock = mock(TbMsgCallback.class);
given(msgMock.getCallback()).willReturn(callbackMock);
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
mockGetMaxRuleNodeDebugModeDurationMinutes();
// WHEN
defaultTbContext.output(msgMock, TbNodeConnectionType.SUCCESS);
// THEN
then(msgMock).should().popFormStack();
then(callbackMock).should().onProcessingEnd(RULE_NODE_ID);
then(callbackMock).should().onSuccess();
then(nodeCtxMock).should(never()).getChainActor();
then(mainCtxMock).should().persistDebugOutput(TENANT_ID, RULE_NODE_ID, msgMock, TbNodeConnectionType.ACK);
}
@Test
public void givenDebugStrategyOnlyFailureEvents_whenEnqueueForTellFailure_thenVerifyDebugOutputPersisted() {
// GIVEN
var msg = getTbMsgWithQueueName();
var tpi = new TopicPartitionInfo(DataConstants.MAIN_QUEUE_TOPIC, TENANT_ID, 0, true);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugStrategy(DebugStrategy.ONLY_FAILURE_EVENTS);
var tbClusterServiceMock = mock(TbClusterService.class);
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(mainCtxMock.resolve(any(ServiceType.class), anyString(), any(TenantId.class), any(EntityId.class))).willReturn(tpi);
given(mainCtxMock.getClusterService()).willReturn(tbClusterServiceMock);
// WHEN
defaultTbContext.enqueueForTellFailure(msg, EXCEPTION);
// THEN
then(mainCtxMock).should().resolve(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, TENANT_ID, TENANT_ID);
TbMsg expectedTbMsg = TbMsg.newMsg(msg, msg.getQueueName(), RULE_CHAIN_ID, RULE_NODE_ID);
checkEnqueueForTellFailurePushMsgToRuleEngine(tbClusterServiceMock, tpi, expectedTbMsg);
ArgumentCaptor<TbMsg> tbMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
then(mainCtxMock).should().persistDebugOutput(eq(TENANT_ID), eq(RULE_NODE_ID), tbMsgCaptor.capture(), eq(TbNodeConnectionType.FAILURE), isNull(), eq(EXCEPTION_MSG));
TbMsg actualTbMsg = tbMsgCaptor.getValue();
assertThat(actualTbMsg).usingRecursiveComparison()
.ignoringFields("id", "ctx")
.isEqualTo(expectedTbMsg);
then(mainCtxMock).should().getClusterService();
then(mainCtxMock).shouldHaveNoMoreInteractions();
then(tbClusterServiceMock).shouldHaveNoMoreInteractions();
}
@Test
public void givenDebugStrategyDisabled_whenEnqueueForTellFailure_thenVerifyDebugOutputNotPersisted() {
// GIVEN
var msg = getTbMsgWithQueueName();
var tpi = new TopicPartitionInfo(DataConstants.MAIN_QUEUE_TOPIC, TENANT_ID, 0, true);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugStrategy(DebugStrategy.DISABLED);
var tbClusterServiceMock = mock(TbClusterService.class);
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(mainCtxMock.resolve(any(ServiceType.class), anyString(), any(TenantId.class), any(EntityId.class))).willReturn(tpi);
given(mainCtxMock.getClusterService()).willReturn(tbClusterServiceMock);
// WHEN
defaultTbContext.enqueueForTellFailure(msg, EXCEPTION);
// THEN
then(mainCtxMock).should().resolve(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, TENANT_ID, TENANT_ID);
TbMsg expectedTbMsg = TbMsg.newMsg(msg, msg.getQueueName(), RULE_CHAIN_ID, RULE_NODE_ID);
checkEnqueueForTellFailurePushMsgToRuleEngine(tbClusterServiceMock, tpi, expectedTbMsg);
then(mainCtxMock).should().getClusterService();
then(mainCtxMock).shouldHaveNoMoreInteractions();
then(tbClusterServiceMock).shouldHaveNoMoreInteractions();
}
@Test
public void givenDebugStrategyAllEvents_whenEnqueueForTellFailure_thenVerifyDebugOutputPersisted() {
// GIVEN
var msg = getTbMsgWithQueueName();
var tpi = new TopicPartitionInfo(DataConstants.MAIN_QUEUE_TOPIC, TENANT_ID, 0, true);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugStrategy(DebugStrategy.ALL_EVENTS);
ruleNode.setLastUpdateTs(System.currentTimeMillis());
var tbClusterServiceMock = mock(TbClusterService.class);
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(mainCtxMock.resolve(any(ServiceType.class), anyString(), any(TenantId.class), any(EntityId.class))).willReturn(tpi);
given(mainCtxMock.getClusterService()).willReturn(tbClusterServiceMock);
mockGetMaxRuleNodeDebugModeDurationMinutes();
// WHEN
defaultTbContext.enqueueForTellFailure(msg, EXCEPTION);
// THEN
then(mainCtxMock).should().resolve(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, TENANT_ID, TENANT_ID);
TbMsg expectedTbMsg = TbMsg.newMsg(msg, msg.getQueueName(), RULE_CHAIN_ID, RULE_NODE_ID);
checkEnqueueForTellFailurePushMsgToRuleEngine(tbClusterServiceMock, tpi, expectedTbMsg);
ArgumentCaptor<TbMsg> tbMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
then(mainCtxMock).should().persistDebugOutput(eq(TENANT_ID), eq(RULE_NODE_ID), tbMsgCaptor.capture(), eq(TbNodeConnectionType.FAILURE), isNull(), eq(EXCEPTION_MSG));
TbMsg actualTbMsg = tbMsgCaptor.getValue();
assertThat(actualTbMsg).usingRecursiveComparison()
.ignoringFields("id", "ctx")
.isEqualTo(expectedTbMsg);
then(mainCtxMock).should().getClusterService();
then(mainCtxMock).should().getTenantProfileCache();
then(mainCtxMock).should().getMaxRuleNodeDebugModeDurationMinutes();
then(mainCtxMock).shouldHaveNoMoreInteractions();
then(tbClusterServiceMock).shouldHaveNoMoreInteractions();
}
@Test
public void givenInvalidMsg_whenEnqueueForTellFailure_thenDoNothing() {
// GIVEN
var msgMock = mock(TbMsg.class);
var tpi = new TopicPartitionInfo(DataConstants.MAIN_QUEUE_TOPIC, TENANT_ID, 0, true);
given(msgMock.getOriginator()).willReturn(TENANT_ID);
given(msgMock.getQueueName()).willReturn(DataConstants.MAIN_QUEUE_NAME);
given(msgMock.isValid()).willReturn(false);
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
given(mainCtxMock.resolve(any(ServiceType.class), anyString(), any(TenantId.class), any(EntityId.class))).willReturn(tpi);
// WHEN
defaultTbContext.enqueueForTellFailure(msgMock, EXCEPTION);
// THEN
then(msgMock).should(times(2)).getQueueName();
then(msgMock).should().getOriginator();
then(msgMock).should().isValid();
then(msgMock).shouldHaveNoMoreInteractions();
then(mainCtxMock).should().resolve(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, TENANT_ID, TENANT_ID);
then(mainCtxMock).shouldHaveNoMoreInteractions();
then(nodeCtxMock).should(times(2)).getTenantId();
then(nodeCtxMock).shouldHaveNoMoreInteractions();
then(chainActorMock).shouldHaveNoInteractions();
}
@MethodSource
@ParameterizedTest
void givenDebugStrategyOptions_whenEnqueueForTellNext_thenVerifyDebugOutputPersistedOnlyForAllEventsDebugStrategy(DebugStrategy debugStrategy, String connectionType) {
// GIVEN
var msg = getTbMsgWithQueueName();
var tpi = new TopicPartitionInfo(DataConstants.MAIN_QUEUE_TOPIC, TENANT_ID, 0, true);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugStrategy(debugStrategy);
ruleNode.setLastUpdateTs(System.currentTimeMillis());
var tbClusterServiceMock = mock(TbClusterService.class);
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(mainCtxMock.resolve(any(ServiceType.class), anyString(), any(TenantId.class), any(EntityId.class))).willReturn(tpi);
given(mainCtxMock.getClusterService()).willReturn(tbClusterServiceMock);
if (DebugStrategy.ALL_EVENTS.equals(debugStrategy)) {
mockGetMaxRuleNodeDebugModeDurationMinutes();
}
// WHEN
defaultTbContext.enqueueForTellNext(msg, connectionType);
// THEN
then(mainCtxMock).should().resolve(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, TENANT_ID, TENANT_ID);
TbMsg expectedTbMsg = TbMsg.newMsg(msg, msg.getQueueName(), RULE_CHAIN_ID, RULE_NODE_ID);
ArgumentCaptor<ToRuleEngineMsg> toRuleEngineMsgCaptor = ArgumentCaptor.forClass(ToRuleEngineMsg.class);
ArgumentCaptor<SimpleTbQueueCallback> simpleTbQueueCallbackCaptor = ArgumentCaptor.forClass(SimpleTbQueueCallback.class);
then(tbClusterServiceMock).should().pushMsgToRuleEngine(eq(tpi), notNull(UUID.class), toRuleEngineMsgCaptor.capture(), simpleTbQueueCallbackCaptor.capture());
ToRuleEngineMsg actualToRuleEngineMsg = toRuleEngineMsgCaptor.getValue();
assertThat(actualToRuleEngineMsg).usingRecursiveComparison()
.ignoringFields("tbMsg_")
.isEqualTo(ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(expectedTbMsg))
.addAllRelationTypes(List.of(connectionType)).build());
var simpleTbQueueCallback = simpleTbQueueCallbackCaptor.getValue();
assertThat(simpleTbQueueCallback).isNotNull();
simpleTbQueueCallback.onSuccess(null);
if (DebugStrategy.ALL_EVENTS.equals(debugStrategy)) {
then(mainCtxMock).should().getTenantProfileCache();
then(mainCtxMock).should().getMaxRuleNodeDebugModeDurationMinutes();
ArgumentCaptor<TbMsg> tbMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
then(mainCtxMock).should().persistDebugOutput(eq(TENANT_ID), eq(RULE_NODE_ID), tbMsgCaptor.capture(), eq(connectionType), isNull(), isNull());
TbMsg actualTbMsg = tbMsgCaptor.getValue();
assertThat(actualTbMsg).usingRecursiveComparison()
.ignoringFields("id", "ctx")
.isEqualTo(expectedTbMsg);
}
then(mainCtxMock).should().getClusterService();
then(mainCtxMock).shouldHaveNoMoreInteractions();
then(tbClusterServiceMock).shouldHaveNoMoreInteractions();
}
@MethodSource
@ParameterizedTest
void givenDebugStrategyOptions_whenEnqueue_thenVerifyDebugOutputPersistedOnlyForAllEventsDebugStrategy(DebugStrategy debugStrategy) {
// GIVEN
var msg = getTbMsgWithQueueName();
var tpi = new TopicPartitionInfo(DataConstants.MAIN_QUEUE_TOPIC, TENANT_ID, 0, true);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setQueueName(DataConstants.MAIN_QUEUE_NAME);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugStrategy(debugStrategy);
ruleNode.setLastUpdateTs(System.currentTimeMillis());
var tbClusterServiceMock = mock(TbClusterService.class);
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(mainCtxMock.resolve(any(ServiceType.class), anyString(), any(TenantId.class), any(EntityId.class))).willReturn(tpi);
given(mainCtxMock.getClusterService()).willReturn(tbClusterServiceMock);
if (DebugStrategy.ALL_EVENTS.equals(debugStrategy)) {
mockGetMaxRuleNodeDebugModeDurationMinutes();
}
Consumer<Throwable> onFailure = mock(Consumer.class);
Runnable onSuccess = mock(Runnable.class);
// WHEN
defaultTbContext.enqueue(msg, onSuccess, onFailure);
// THEN
then(mainCtxMock).should().resolve(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, TENANT_ID, TENANT_ID);
TbMsg expectedTbMsg = TbMsg.newMsg(msg, msg.getQueueName(), RULE_CHAIN_ID, RULE_NODE_ID);
ArgumentCaptor<ToRuleEngineMsg> toRuleEngineMsgCaptor = ArgumentCaptor.forClass(ToRuleEngineMsg.class);
ArgumentCaptor<SimpleTbQueueCallback> simpleTbQueueCallbackCaptor = ArgumentCaptor.forClass(SimpleTbQueueCallback.class);
then(tbClusterServiceMock).should().pushMsgToRuleEngine(eq(tpi), notNull(UUID.class), toRuleEngineMsgCaptor.capture(), simpleTbQueueCallbackCaptor.capture());
ToRuleEngineMsg actualToRuleEngineMsg = toRuleEngineMsgCaptor.getValue();
assertThat(actualToRuleEngineMsg).usingRecursiveComparison()
.ignoringFields("tbMsg_")
.isEqualTo(ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(expectedTbMsg))
.build());
var simpleTbQueueCallback = simpleTbQueueCallbackCaptor.getValue();
assertThat(simpleTbQueueCallback).isNotNull();
simpleTbQueueCallback.onSuccess(null);
if (DebugStrategy.ALL_EVENTS.equals(debugStrategy)) {
then(mainCtxMock).should().getTenantProfileCache();
then(mainCtxMock).should().getMaxRuleNodeDebugModeDurationMinutes();
then(mainCtxMock).should().persistDebugOutput(eq(TENANT_ID), eq(RULE_NODE_ID), eq(msg), eq(TbNodeConnectionType.TO_ROOT_RULE_CHAIN));
}
then(mainCtxMock).should().getClusterService();
then(mainCtxMock).shouldHaveNoMoreInteractions();
then(tbClusterServiceMock).shouldHaveNoMoreInteractions();
}
@Test
public void givenDebugStrategyOnlyFailures_whenTellFailure_thenVerifyDebugOutputPersisted() {
// GIVEN
var msg = getTbMsg();
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugStrategy(DebugStrategy.ONLY_FAILURE_EVENTS);
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(nodeCtxMock.getChainActor()).willReturn(chainActorMock);
// WHEN
defaultTbContext.tellFailure(msg, EXCEPTION);
// THEN
var expectedRuleNodeToRuleChainTellNextMsg = new RuleNodeToRuleChainTellNextMsg(
RULE_CHAIN_ID,
RULE_NODE_ID,
Collections.singleton(TbNodeConnectionType.FAILURE),
msg,
EXCEPTION_MSG
);
then(chainActorMock).should().tell(expectedRuleNodeToRuleChainTellNextMsg);
then(chainActorMock).shouldHaveNoMoreInteractions();
then(nodeCtxMock).should().getChainActor();
then(mainCtxMock).should().persistDebugOutput(TENANT_ID, RULE_NODE_ID, msg, TbNodeConnectionType.FAILURE, EXCEPTION);
then(mainCtxMock).shouldHaveNoMoreInteractions();
then(nodeCtxMock).shouldHaveNoMoreInteractions();
}
@Test
public void givenDebugStrategyDisabled_whenTellFailure_thenVerifyDebugOutputNotPersisted() {
// GIVEN
var msg = getTbMsg();
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugStrategy(DebugStrategy.DISABLED);
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(nodeCtxMock.getChainActor()).willReturn(chainActorMock);
// WHEN
defaultTbContext.tellFailure(msg, EXCEPTION);
// THEN
var expectedRuleNodeToRuleChainTellNextMsg = new RuleNodeToRuleChainTellNextMsg(
RULE_CHAIN_ID,
RULE_NODE_ID,
Collections.singleton(TbNodeConnectionType.FAILURE),
msg,
EXCEPTION_MSG
);
then(chainActorMock).should().tell(expectedRuleNodeToRuleChainTellNextMsg);
then(chainActorMock).shouldHaveNoMoreInteractions();
then(nodeCtxMock).should().getChainActor();
then(mainCtxMock).shouldHaveNoInteractions();
then(nodeCtxMock).shouldHaveNoMoreInteractions();
}
@Test
public void givenDebugStrategyAllEvents_whenTellFailure_thenVerifyDebugOutputPersisted() {
// GIVEN
var msg = getTbMsg();
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugStrategy(DebugStrategy.ALL_EVENTS);
ruleNode.setLastUpdateTs(System.currentTimeMillis());
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(nodeCtxMock.getChainActor()).willReturn(chainActorMock);
mockGetMaxRuleNodeDebugModeDurationMinutes();
// WHEN
defaultTbContext.tellFailure(msg, EXCEPTION);
// THEN
var expectedRuleNodeToRuleChainTellNextMsg = new RuleNodeToRuleChainTellNextMsg(
RULE_CHAIN_ID,
RULE_NODE_ID,
Collections.singleton(TbNodeConnectionType.FAILURE),
msg,
EXCEPTION_MSG
);
then(chainActorMock).should().tell(expectedRuleNodeToRuleChainTellNextMsg);
then(chainActorMock).shouldHaveNoMoreInteractions();
then(nodeCtxMock).should().getChainActor();
then(mainCtxMock).should().persistDebugOutput(TENANT_ID, RULE_NODE_ID, msg, TbNodeConnectionType.FAILURE, EXCEPTION);
then(mainCtxMock).should().getTenantProfileCache();
then(mainCtxMock).should().getMaxRuleNodeDebugModeDurationMinutes();
then(mainCtxMock).shouldHaveNoMoreInteractions();
then(nodeCtxMock).shouldHaveNoMoreInteractions();
}
private void checkTellNextCommonLogic(TbMsgCallback callbackMock, String nodeConnection, TbMsg msg) {
checkTellNextCommonLogic(callbackMock, Collections.singleton(nodeConnection), msg);
}
private void checkTellNextCommonLogic(TbMsgCallback callbackMock, Set<String> nodeConnections, TbMsg msg) {
then(callbackMock).should().onProcessingEnd(RULE_NODE_ID);
then(callbackMock).shouldHaveNoMoreInteractions();
var expectedRuleNodeToRuleChainTellNextMsg = new RuleNodeToRuleChainTellNextMsg(
RULE_CHAIN_ID,
RULE_NODE_ID,
nodeConnections,
msg,
null);
then(chainActorMock).should().tell(expectedRuleNodeToRuleChainTellNextMsg);
then(chainActorMock).shouldHaveNoMoreInteractions();
}
private void checkOutputCommonLogic(TbMsg msg, String nodeConnection) {
then(msg).should().popFormStack();
var expectedRuleChainOutputMsg = new RuleChainOutputMsg(
RULE_CHAIN_ID,
RULE_NODE_ID,
nodeConnection,
msg);
then(chainActorMock).should().tell(expectedRuleChainOutputMsg);
then(chainActorMock).shouldHaveNoMoreInteractions();
then(nodeCtxMock).should().getChainActor();
}
private void checkEnqueueForTellFailurePushMsgToRuleEngine(TbClusterService tbClusterService, TopicPartitionInfo tpi, TbMsg expectedTbMsg) {
ArgumentCaptor<ToRuleEngineMsg> toRuleEngineMsgCaptor = ArgumentCaptor.forClass(ToRuleEngineMsg.class);
ArgumentCaptor<SimpleTbQueueCallback> simpleTbQueueCallbackCaptor = ArgumentCaptor.forClass(SimpleTbQueueCallback.class);
then(tbClusterService).should().pushMsgToRuleEngine(eq(tpi), notNull(UUID.class), toRuleEngineMsgCaptor.capture(), simpleTbQueueCallbackCaptor.capture());
ToRuleEngineMsg actualToRuleEngineMsg = toRuleEngineMsgCaptor.getValue();
assertThat(actualToRuleEngineMsg).usingRecursiveComparison()
.ignoringFields("tbMsg_")
.isEqualTo(ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(expectedTbMsg))
.setFailureMessage(EXCEPTION_MSG)
.addAllRelationTypes(List.of(TbNodeConnectionType.FAILURE)).build());
var simpleTbQueueCallback = simpleTbQueueCallbackCaptor.getValue();
assertThat(simpleTbQueueCallback).isNotNull();
simpleTbQueueCallback.onSuccess(null);
}
private static Stream<Arguments> givenDebugStrategyOptions_whenEnqueueForTellNext_thenVerifyDebugOutputPersistedOnlyForAllEventsDebugStrategy() {
return Stream.of(
Arguments.of(DebugStrategy.ALL_EVENTS, TbNodeConnectionType.OTHER),
Arguments.of(DebugStrategy.ONLY_FAILURE_EVENTS, TbNodeConnectionType.TRUE),
Arguments.of(DebugStrategy.DISABLED, TbNodeConnectionType.FALSE)
);
}
private static Stream<Arguments> givenDebugStrategyOptions_whenEnqueue_thenVerifyDebugOutputPersistedOnlyForAllEventsDebugStrategy() {
return Stream.of(
Arguments.of(DebugStrategy.ALL_EVENTS),
Arguments.of(DebugStrategy.ONLY_FAILURE_EVENTS),
Arguments.of(DebugStrategy.DISABLED)
);
}
private TbMsg getTbMsgWithCallback(TbMsgCallback callback) {
return TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, TENANT_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING, callback);
}
private TbMsg getTbMsgWithQueueName() {
return TbMsg.newMsg(DataConstants.MAIN_QUEUE_NAME, TbMsgType.POST_TELEMETRY_REQUEST, TENANT_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING);
}
private TbMsg getTbMsg() {
return TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, TENANT_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING);
}
private void mockGetMaxRuleNodeDebugModeDurationMinutes() {
var tbTenantProfileCacheMock = mock(TbTenantProfileCache.class);
var tenantProfileMock = mock(TenantProfile.class);
var tenantProfileDataMock = mock(TenantProfileData.class);
var tenantProfileConfigurationMock = mock(TenantProfileConfiguration.class);
given(mainCtxMock.getTenantProfileCache()).willReturn(tbTenantProfileCacheMock);
given(tbTenantProfileCacheMock.get(TENANT_ID)).willReturn(tenantProfileMock);
given(tenantProfileMock.getProfileData()).willReturn(tenantProfileDataMock);
given(tenantProfileDataMock.getConfiguration()).willReturn(tenantProfileConfigurationMock);
given(tenantProfileConfigurationMock.getMaxRuleNodeDebugModeDurationMinutes(anyInt())).willReturn(15);
}
}

6
application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java

@ -71,6 +71,7 @@ import org.thingsboard.server.common.data.query.EntityKeyValueType;
import org.thingsboard.server.common.data.query.FilterPredicateValue;
import org.thingsboard.server.common.data.query.NumericFilterPredicate;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.data.rule.DebugStrategy;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.rule.RuleChainType;
@ -103,7 +104,6 @@ import org.thingsboard.server.gen.edge.v1.UserUpdateMsg;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@ -125,8 +125,6 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
protected EdgeImitator edgeImitator;
protected Edge edge;
private Random random = new Random();
@Autowired
protected EdgeEventService edgeEventService;
@ -210,7 +208,7 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
protected void updateRootRuleChainMetadata() throws Exception {
RuleChainId rootRuleChainId = getEdgeRootRuleChainId();
RuleChainMetaData rootRuleChainMetadata = doGet("/api/ruleChain/" + rootRuleChainId.getId().toString() + "/metadata", RuleChainMetaData.class);
rootRuleChainMetadata.getNodes().forEach(n -> n.setDebugMode(random.nextBoolean()));
rootRuleChainMetadata.getNodes().forEach(n -> n.setDebugStrategy(DebugStrategy.ALL_EVENTS));
doPost("/api/ruleChain/metadata", rootRuleChainMetadata, RuleChainMetaData.class);
}

3
application/src/test/java/org/thingsboard/server/edge/RuleChainEdgeTest.java

@ -23,6 +23,7 @@ import org.thingsboard.rule.engine.metadata.TbGetAttributesNodeConfiguration;
import org.thingsboard.rule.engine.util.TbMsgSource;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.rule.DebugStrategy;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.rule.RuleChainType;
@ -228,7 +229,7 @@ public class RuleChainEdgeTest extends AbstractEdgeTest {
// update metadata for root rule chain
edgeImitator.expectMessageAmount(1);
metaData.getNodes().forEach(n -> n.setDebugMode(true));
metaData.getNodes().forEach(n -> n.setDebugStrategy(DebugStrategy.ALL_EVENTS));
doPost("/api/ruleChain/metadata", metaData, RuleChainMetaData.class);
Assert.assertTrue(edgeImitator.waitForMessages());
ruleChainUpdateMsgOpt = edgeImitator.findMessageByType(RuleChainUpdateMsg.class);

11
application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java

@ -43,6 +43,7 @@ import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.rule.DebugStrategy;
import org.thingsboard.server.common.data.rule.NodeConnectionInfo;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
@ -142,7 +143,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
ruleNode1.setName("Simple Rule Node 1");
ruleNode1.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
ruleNode1.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version());
ruleNode1.setDebugMode(true);
ruleNode1.setDebugStrategy(DebugStrategy.ALL_EVENTS);
TbGetAttributesNodeConfiguration configuration1 = new TbGetAttributesNodeConfiguration();
configuration1.setFetchTo(TbMsgSource.METADATA);
configuration1.setServerAttributeNames(Collections.singletonList("serverAttributeKey1"));
@ -152,7 +153,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
ruleNode2.setName("Simple Rule Node 2");
ruleNode2.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
ruleNode2.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version());
ruleNode2.setDebugMode(true);
ruleNode2.setDebugStrategy(DebugStrategy.ALL_EVENTS);
TbGetAttributesNodeConfiguration configuration2 = new TbGetAttributesNodeConfiguration();
configuration2.setFetchTo(TbMsgSource.METADATA);
configuration2.setServerAttributeNames(Collections.singletonList("serverAttributeKey2"));
@ -248,7 +249,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
ruleNode1.setName("Simple Rule Node 1");
ruleNode1.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
ruleNode1.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version());
ruleNode1.setDebugMode(true);
ruleNode1.setDebugStrategy(DebugStrategy.ALL_EVENTS);
TbGetAttributesNodeConfiguration configuration1 = new TbGetAttributesNodeConfiguration();
configuration1.setFetchTo(TbMsgSource.METADATA);
configuration1.setServerAttributeNames(Collections.singletonList("serverAttributeKey1"));
@ -257,7 +258,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
RuleNode ruleNode12 = new RuleNode();
ruleNode12.setName("Simple Rule Node 1");
ruleNode12.setType(org.thingsboard.rule.engine.flow.TbRuleChainInputNode.class.getName());
ruleNode12.setDebugMode(true);
ruleNode12.setDebugStrategy(DebugStrategy.ALL_EVENTS);
TbRuleChainInputNodeConfiguration configuration12 = new TbRuleChainInputNodeConfiguration();
configuration12.setRuleChainId(secondaryRuleChain.getId().getId().toString());
ruleNode12.setConfiguration(JacksonUtil.valueToTree(configuration12));
@ -282,7 +283,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
ruleNode2.setName("Simple Rule Node 2");
ruleNode2.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
ruleNode2.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version());
ruleNode2.setDebugMode(true);
ruleNode2.setDebugStrategy(DebugStrategy.ALL_EVENTS);
TbGetAttributesNodeConfiguration configuration2 = new TbGetAttributesNodeConfiguration();
configuration2.setFetchTo(TbMsgSource.METADATA);
configuration2.setServerAttributeNames(Collections.singletonList("serverAttributeKey2"));

3
application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java

@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.event.EventType;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.rule.DebugStrategy;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.rule.RuleNode;
@ -97,7 +98,7 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
ruleNode.setName("Simple Rule Node");
ruleNode.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
ruleNode.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version());
ruleNode.setDebugMode(true);
ruleNode.setDebugStrategy(DebugStrategy.ALL_EVENTS);
TbGetAttributesNodeConfiguration configuration = new TbGetAttributesNodeConfiguration();
configuration.setFetchTo(TbMsgSource.METADATA);
configuration.setServerAttributeNames(Collections.singletonList("serverAttributeKey"));

5
application/src/test/java/org/thingsboard/server/service/housekeeper/HousekeeperServiceTest.java

@ -58,6 +58,7 @@ import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.rule.DebugStrategy;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.rule.RuleChainType;
@ -462,7 +463,7 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
ruleNode1.setName("Simple Rule Node 1");
ruleNode1.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
ruleNode1.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version());
ruleNode1.setDebugMode(true);
ruleNode1.setDebugStrategy(DebugStrategy.ALL_EVENTS);
TbGetAttributesNodeConfiguration configuration1 = new TbGetAttributesNodeConfiguration();
configuration1.setServerAttributeNames(Collections.singletonList("serverAttributeKey1"));
ruleNode1.setConfiguration(JacksonUtil.valueToTree(configuration1));
@ -471,7 +472,7 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
ruleNode2.setName("Simple Rule Node 2");
ruleNode2.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
ruleNode2.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version());
ruleNode2.setDebugMode(true);
ruleNode2.setDebugStrategy(DebugStrategy.ALL_EVENTS);
TbGetAttributesNodeConfiguration configuration2 = new TbGetAttributesNodeConfiguration();
configuration2.setServerAttributeNames(Collections.singletonList("serverAttributeKey2"));
ruleNode2.setConfiguration(JacksonUtil.valueToTree(configuration2));

9
application/src/test/java/org/thingsboard/server/service/sync/ie/ExportImportServiceSqlTest.java

@ -65,6 +65,7 @@ import org.thingsboard.server.common.data.ota.OtaPackageType;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.rule.DebugStrategy;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.rule.RuleChainType;
@ -471,7 +472,7 @@ public class ExportImportServiceSqlTest extends AbstractControllerTest {
RuleNode ruleNode1 = new RuleNode();
ruleNode1.setName("Generator 1");
ruleNode1.setType(TbMsgGeneratorNode.class.getName());
ruleNode1.setDebugMode(true);
ruleNode1.setDebugStrategy(DebugStrategy.ALL_EVENTS);
TbMsgGeneratorNodeConfiguration configuration1 = new TbMsgGeneratorNodeConfiguration();
configuration1.setOriginatorType(originatorId.getEntityType());
configuration1.setOriginatorId(originatorId.getId().toString());
@ -481,7 +482,7 @@ public class ExportImportServiceSqlTest extends AbstractControllerTest {
ruleNode2.setName("Simple Rule Node 2");
ruleNode2.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
ruleNode2.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version());
ruleNode2.setDebugMode(true);
ruleNode2.setDebugStrategy(DebugStrategy.ALL_EVENTS);
TbGetAttributesNodeConfiguration configuration2 = new TbGetAttributesNodeConfiguration();
configuration2.setServerAttributeNames(Collections.singletonList("serverAttributeKey2"));
ruleNode2.setConfiguration(JacksonUtil.valueToTree(configuration2));
@ -510,7 +511,7 @@ public class ExportImportServiceSqlTest extends AbstractControllerTest {
ruleNode1.setName("Simple Rule Node 1");
ruleNode1.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
ruleNode1.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version());
ruleNode1.setDebugMode(true);
ruleNode1.setDebugStrategy(DebugStrategy.ALL_EVENTS);
TbGetAttributesNodeConfiguration configuration1 = new TbGetAttributesNodeConfiguration();
configuration1.setServerAttributeNames(Collections.singletonList("serverAttributeKey1"));
ruleNode1.setConfiguration(JacksonUtil.valueToTree(configuration1));
@ -519,7 +520,7 @@ public class ExportImportServiceSqlTest extends AbstractControllerTest {
ruleNode2.setName("Simple Rule Node 2");
ruleNode2.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
ruleNode2.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version());
ruleNode2.setDebugMode(true);
ruleNode2.setDebugStrategy(DebugStrategy.ALL_EVENTS);
TbGetAttributesNodeConfiguration configuration2 = new TbGetAttributesNodeConfiguration();
configuration2.setServerAttributeNames(Collections.singletonList("serverAttributeKey2"));
ruleNode2.setConfiguration(JacksonUtil.valueToTree(configuration2));

9
application/src/test/java/org/thingsboard/server/service/sync/vc/VersionControlTest.java

@ -65,6 +65,7 @@ import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.rule.DebugStrategy;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.rule.RuleChainType;
@ -870,7 +871,7 @@ public class VersionControlTest extends AbstractControllerTest {
RuleNode ruleNode1 = new RuleNode();
ruleNode1.setName("Generator 1");
ruleNode1.setType(TbMsgGeneratorNode.class.getName());
ruleNode1.setDebugMode(true);
ruleNode1.setDebugStrategy(DebugStrategy.ALL_EVENTS);
TbMsgGeneratorNodeConfiguration configuration1 = new TbMsgGeneratorNodeConfiguration();
configuration1.setOriginatorType(originatorId.getEntityType());
configuration1.setOriginatorId(originatorId.getId().toString());
@ -880,7 +881,7 @@ public class VersionControlTest extends AbstractControllerTest {
ruleNode2.setName("Simple Rule Node 2");
ruleNode2.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
ruleNode2.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version());
ruleNode2.setDebugMode(true);
ruleNode2.setDebugStrategy(DebugStrategy.ALL_EVENTS);
TbGetAttributesNodeConfiguration configuration2 = new TbGetAttributesNodeConfiguration();
configuration2.setServerAttributeNames(Collections.singletonList("serverAttributeKey2"));
ruleNode2.setConfiguration(JacksonUtil.valueToTree(configuration2));
@ -908,7 +909,7 @@ public class VersionControlTest extends AbstractControllerTest {
ruleNode1.setName("Simple Rule Node 1");
ruleNode1.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
ruleNode1.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version());
ruleNode1.setDebugMode(true);
ruleNode1.setDebugStrategy(DebugStrategy.ALL_EVENTS);
TbGetAttributesNodeConfiguration configuration1 = new TbGetAttributesNodeConfiguration();
configuration1.setServerAttributeNames(Collections.singletonList("serverAttributeKey1"));
ruleNode1.setConfiguration(JacksonUtil.valueToTree(configuration1));
@ -917,7 +918,7 @@ public class VersionControlTest extends AbstractControllerTest {
ruleNode2.setName("Simple Rule Node 2");
ruleNode2.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
ruleNode2.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version());
ruleNode2.setDebugMode(true);
ruleNode2.setDebugStrategy(DebugStrategy.ALL_EVENTS);
TbGetAttributesNodeConfiguration configuration2 = new TbGetAttributesNodeConfiguration();
configuration2.setServerAttributeNames(Collections.singletonList("serverAttributeKey2"));
ruleNode2.setConfiguration(JacksonUtil.valueToTree(configuration2));

4
common/data/src/main/java/org/thingsboard/server/common/data/msg/TbNodeConnectionType.java

@ -23,9 +23,13 @@ public final class TbNodeConnectionType {
public static final String SUCCESS = "Success";
public static final String FAILURE = "Failure";
public static final String ACK = "ACK";
public static final String TRUE = "True";
public static final String FALSE = "False";
public static final String OTHER = "Other";
public static final String TO_ROOT_RULE_CHAIN = "To Root Rule Chain";
}

72
common/data/src/main/java/org/thingsboard/server/common/data/rule/DebugStrategy.java

@ -0,0 +1,72 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.rule;
import lombok.Getter;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Getter
public enum DebugStrategy {
DISABLED(0), ALL_EVENTS(1), ONLY_FAILURE_EVENTS(2);
private final int protoNumber;
DebugStrategy(int protoNumber) {
this.protoNumber = protoNumber;
}
public boolean shouldPersistDebugInput(long lastUpdateTs, long msgTs, int debugModeDurationMinutes) {
return isAllEventsStrategyAndMsgTsWithinDebugDuration(lastUpdateTs, msgTs, debugModeDurationMinutes);
}
public boolean shouldPersistDebugOutputForAllEvents(long lastUpdateTs, long msgTs, int debugModeDurationMinutes) {
return isAllEventsStrategyAndMsgTsWithinDebugDuration(lastUpdateTs, msgTs, debugModeDurationMinutes);
}
public boolean shouldPersistDebugForFailureEventOnly(long lastUpdateTs, long msgTs, int debugModeDurationMinutes) {
return shouldPersistDebugOutputForAllEvents(lastUpdateTs, msgTs, debugModeDurationMinutes) || isFailureOnlyStrategy();
}
public boolean shouldPersistDebugForFailureEventOnly(Set<String> nodeConnections) {
return isFailureOnlyStrategy() && nodeConnections.contains(TbNodeConnectionType.FAILURE);
}
public boolean shouldPersistDebugForFailureEventOnly(String nodeConnection) {
return isFailureOnlyStrategy() && TbNodeConnectionType.FAILURE.equals(nodeConnection);
}
private boolean isFailureOnlyStrategy() {
return DebugStrategy.ONLY_FAILURE_EVENTS.equals(this);
}
private boolean isAllEventsStrategyAndMsgTsWithinDebugDuration(long lastUpdateTs, long msgTs, int debugModeDurationMinutes) {
if (!DebugStrategy.ALL_EVENTS.equals(this)) {
return false;
}
return isMsgTsWithinDebugDuration(lastUpdateTs, msgTs, debugModeDurationMinutes);
}
private boolean isMsgTsWithinDebugDuration(long lastUpdateTs, long msgCreationTs, int debugModeDurationMinutes) {
if (debugModeDurationMinutes <= 0) {
return true;
}
return msgCreationTs < lastUpdateTs + TimeUnit.MINUTES.toMillis(debugModeDurationMinutes);
}
}

24
common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java

@ -45,15 +45,17 @@ public class RuleNode extends BaseDataWithAdditionalInfo<RuleNodeId> implements
@Length(fieldName = "name")
@Schema(description = "User defined name of the rule node. Used on UI and for logging. ", example = "Process sensor reading")
private String name;
@Schema(description = "Enable/disable debug. ", example = "false")
private boolean debugMode;
@Schema(description = "Timestamp of the last rule node update.")
private long lastUpdateTs;
@Schema(description = "Debug strategy. ", example = "ALL_EVENTS")
private DebugStrategy debugStrategy;
@Schema(description = "Enable/disable singleton mode. ", example = "false")
private boolean singletonMode;
@Schema(description = "Queue name. ", example = "Main")
private String queueName;
@Schema(description = "Version of rule node configuration. ", example = "0")
private int configurationVersion;
@Schema(description = "JSON with the rule node configuration. Structure depends on the rule node implementation.", implementation = com.fasterxml.jackson.databind.JsonNode.class)
@Schema(description = "JSON with the rule node configuration. Structure depends on the rule node implementation.", implementation = JsonNode.class)
private transient JsonNode configuration;
@JsonIgnore
private byte[] configurationBytes;
@ -73,7 +75,8 @@ public class RuleNode extends BaseDataWithAdditionalInfo<RuleNodeId> implements
this.ruleChainId = ruleNode.getRuleChainId();
this.type = ruleNode.getType();
this.name = ruleNode.getName();
this.debugMode = ruleNode.isDebugMode();
this.lastUpdateTs = ruleNode.getLastUpdateTs();
this.debugStrategy = ruleNode.getDebugStrategy();
this.singletonMode = ruleNode.isSingletonMode();
this.setConfiguration(ruleNode.getConfiguration());
this.externalId = ruleNode.getExternalId();
@ -84,6 +87,10 @@ public class RuleNode extends BaseDataWithAdditionalInfo<RuleNodeId> implements
return name;
}
public DebugStrategy getDebugStrategy() {
return debugStrategy == null ? DebugStrategy.DISABLED : debugStrategy;
}
public JsonNode getConfiguration() {
return BaseDataWithAdditionalInfo.getJson(() -> configuration, () -> configurationBytes);
}
@ -93,9 +100,9 @@ public class RuleNode extends BaseDataWithAdditionalInfo<RuleNodeId> implements
}
@Schema(description = "JSON object with the Rule Node Id. " +
"Specify this field to update the Rule Node. " +
"Referencing non-existing Rule Node Id will cause error. " +
"Omit this field to create new rule node.")
"Specify this field to update the Rule Node. " +
"Referencing non-existing Rule Node Id will cause error. " +
"Omit this field to create new rule node.")
@Override
public RuleNodeId getId() {
return super.getId();
@ -107,10 +114,9 @@ public class RuleNode extends BaseDataWithAdditionalInfo<RuleNodeId> implements
return super.getCreatedTime();
}
@Schema(description = "Additional parameters of the rule node. Contains 'layoutX' and 'layoutY' properties for visualization.", implementation = com.fasterxml.jackson.databind.JsonNode.class)
@Schema(description = "Additional parameters of the rule node. Contains 'layoutX' and 'layoutY' properties for visualization.", implementation = JsonNode.class)
@Override
public JsonNode getAdditionalInfo() {
return super.getAdditionalInfo();
}
}

10
common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java

@ -24,6 +24,8 @@ import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.TenantProfileType;
import java.io.Serial;
@Schema
@AllArgsConstructor
@NoArgsConstructor
@ -31,6 +33,7 @@ import org.thingsboard.server.common.data.TenantProfileType;
@Data
public class DefaultTenantProfileConfiguration implements TenantProfileConfiguration {
@Serial
private static final long serialVersionUID = -7134932690332578595L;
private long maxDevices;
@ -91,6 +94,8 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura
private long maxDPStorageDays;
@Schema(example = "50")
private int maxRuleNodeExecutionsPerMessage;
@Schema(example = "15")
private int maxRuleNodeDebugDurationMinutes;
@Schema(example = "0")
private long maxEmails;
@Schema(example = "true")
@ -197,4 +202,9 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura
public int getMaxRuleNodeExecsPerMessage() {
return maxRuleNodeExecutionsPerMessage;
}
@Override
public int getMaxRuleNodeDebugModeDurationMinutes(int systemMaxRuleNodeDebugModeDurationMinutes) {
return Math.min(systemMaxRuleNodeDebugModeDurationMinutes, maxRuleNodeDebugDurationMinutes);
}
}

3
common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/TenantProfileConfiguration.java

@ -51,4 +51,7 @@ public interface TenantProfileConfiguration extends Serializable {
@JsonIgnore
int getMaxRuleNodeExecsPerMessage();
@JsonIgnore
int getMaxRuleNodeDebugModeDurationMinutes(int systemMaxRuleNodeDebugModeDurationMinutes);
}

10
common/edge-api/src/main/proto/edge.proto

@ -162,17 +162,25 @@ message RuleChainMetadataUpdateMsg {
string entity = 8;
}
enum DebugStrategy {
DISABLED = 0;
ALL_EVENTS = 1;
ONLY_FAILURE_EVENTS = 2;
}
message RuleNodeProto {
option deprecated = true;
int64 idMSB = 1;
int64 idLSB = 2;
string type = 3;
string name = 4;
bool debugMode = 5;
bool debugMode = 5 [deprecated = true];
string configuration = 6;
string additionalInfo = 7;
bool singletonMode = 8;
int32 configurationVersion = 9;
int64 lastUpdateTs = 10;
DebugStrategy debugStrategy = 11;
}
message NodeConnectionInfoProto {

1
dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java

@ -397,6 +397,7 @@ public class ModelConstants {
public static final String EVENT_MESSAGE_COLUMN_NAME = "e_message";
public static final String DEBUG_MODE = "debug_mode";
public static final String DEBUG_STRATEGY = "debug_strategy";
public static final String SINGLETON_MODE = "singleton_mode";
public static final String QUEUE_NAME = "queue_name";

17
dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeEntity.java

@ -19,11 +19,14 @@ import com.fasterxml.jackson.databind.JsonNode;
import jakarta.persistence.Column;
import jakarta.persistence.Convert;
import jakarta.persistence.Entity;
import jakarta.persistence.EnumType;
import jakarta.persistence.Enumerated;
import jakarta.persistence.Table;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.rule.DebugStrategy;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.model.BaseSqlEntity;
@ -58,8 +61,12 @@ public class RuleNodeEntity extends BaseSqlEntity<RuleNode> {
@Column(name = ModelConstants.ADDITIONAL_INFO_PROPERTY)
private JsonNode additionalInfo;
@Column(name = ModelConstants.DEBUG_MODE)
private boolean debugMode;
@Column(name = ModelConstants.LAST_UPDATE_TS_COLUMN)
private long lastUpdateTs;
@Enumerated(EnumType.STRING)
@Column(name = ModelConstants.DEBUG_STRATEGY)
private DebugStrategy debugStrategy;
@Column(name = ModelConstants.SINGLETON_MODE)
private boolean singletonMode;
@ -83,7 +90,8 @@ public class RuleNodeEntity extends BaseSqlEntity<RuleNode> {
}
this.type = ruleNode.getType();
this.name = ruleNode.getName();
this.debugMode = ruleNode.isDebugMode();
this.lastUpdateTs = ruleNode.getLastUpdateTs();
this.debugStrategy = ruleNode.getDebugStrategy();
this.singletonMode = ruleNode.isSingletonMode();
this.queueName = ruleNode.getQueueName();
this.configurationVersion = ruleNode.getConfigurationVersion();
@ -103,7 +111,8 @@ public class RuleNodeEntity extends BaseSqlEntity<RuleNode> {
}
ruleNode.setType(type);
ruleNode.setName(name);
ruleNode.setDebugMode(debugMode);
ruleNode.setLastUpdateTs(lastUpdateTs);
ruleNode.setDebugStrategy(debugStrategy);
ruleNode.setSingletonMode(singletonMode);
ruleNode.setQueueName(queueName);
ruleNode.setConfigurationVersion(configurationVersion);

5
dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java

@ -44,6 +44,7 @@ import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.plugin.ComponentClusteringMode;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.rule.DebugStrategy;
import org.thingsboard.server.common.data.rule.NodeConnectionInfo;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainConnectionInfo;
@ -214,9 +215,11 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
}
RuleChainId ruleChainId = ruleChain.getId();
if (nodes != null) {
long lastUpdateTs = System.currentTimeMillis();
for (RuleNode node : toAddOrUpdate) {
node.setRuleChainId(ruleChainId);
node = ruleNodeUpdater.apply(node);
node.setLastUpdateTs(lastUpdateTs);
RuleChainDataValidator.validateRuleNode(node);
RuleNode savedNode = ruleNodeDao.save(tenantId, node);
relations.add(new EntityRelation(ruleChainMetaData.getRuleChainId(), savedNode.getId(),
@ -261,7 +264,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
layout.remove("description");
layout.remove("ruleChainNodeId");
targetNode.setAdditionalInfo(layout);
targetNode.setDebugMode(false);
targetNode.setDebugStrategy(DebugStrategy.DISABLED);
targetNode = ruleNodeDao.save(tenantId, targetNode);
EntityRelation sourceRuleChainToRuleNode = new EntityRelation();

3
dao/src/main/resources/sql/schema-entities.sql

@ -193,7 +193,8 @@ CREATE TABLE IF NOT EXISTS rule_node (
configuration varchar(10000000),
type varchar(255),
name varchar(255),
debug_mode boolean,
last_update_ts bigint NOT NULL,
debug_strategy varchar(32) DEFAULT 'DISABLED',
singleton_mode boolean,
queue_name varchar(255),
external_id uuid

Loading…
Cancel
Save