diff --git a/application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json b/application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json index 6b7603c026..04a5f52f4e 100644 --- a/application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json +++ b/application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json @@ -20,7 +20,7 @@ }, "type": "org.thingsboard.rule.engine.profile.TbDeviceProfileNode", "name": "Device Profile Node", - "debugMode": false, + "debugStrategy": "DISABLED", "configuration": { "persistAlarmRulesState": false, "fetchAlarmRulesStateOnStart": false @@ -34,7 +34,7 @@ }, "type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", "name": "Save Timeseries", - "debugMode": false, + "debugStrategy": "DISABLED", "configuration": { "defaultTTL": 0 }, @@ -47,7 +47,7 @@ }, "type": "org.thingsboard.rule.engine.telemetry.TbMsgAttributesNode", "name": "Save Client Attributes", - "debugMode": false, + "debugStrategy": "DISABLED", "configurationVersion": 2, "configuration": { "scope": "CLIENT_SCOPE", @@ -64,7 +64,7 @@ }, "type": "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode", "name": "Message Type Switch", - "debugMode": false, + "debugStrategy": "DISABLED", "configuration": { "version": 0 }, @@ -77,7 +77,7 @@ }, "type": "org.thingsboard.rule.engine.action.TbLogNode", "name": "Log RPC from Device", - "debugMode": false, + "debugStrategy": "DISABLED", "configuration": { "scriptLang": "TBEL", "jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);", @@ -92,7 +92,7 @@ }, "type": "org.thingsboard.rule.engine.action.TbLogNode", "name": "Log Other", - "debugMode": false, + "debugStrategy": "DISABLED", "configuration": { "scriptLang": "TBEL", "jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);", @@ -107,7 +107,7 @@ }, "type": "org.thingsboard.rule.engine.rpc.TbSendRPCRequestNode", "name": "RPC Call Request", - "debugMode": false, + "debugStrategy": "DISABLED", "configuration": { "timeoutInSeconds": 60 }, @@ -120,7 +120,7 @@ }, "type": "org.thingsboard.rule.engine.edge.TbMsgPushToCloudNode", "name": "Push to cloud", - "debugMode": false, + "debugStrategy": "DISABLED", "configuration": { "scope": "SERVER_SCOPE" }, @@ -133,7 +133,7 @@ }, "type": "org.thingsboard.rule.engine.edge.TbMsgPushToCloudNode", "name": "Push to cloud", - "debugMode": false, + "debugStrategy": "DISABLED", "configuration": { "scope": "SERVER_SCOPE" }, diff --git a/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json b/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json index 0b70d087e7..3b9898d0b1 100644 --- a/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json +++ b/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json @@ -18,7 +18,7 @@ }, "type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", "name": "Save Timeseries", - "debugMode": false, + "debugStrategy": "DISABLED", "configuration": { "defaultTTL": 0 } @@ -30,7 +30,7 @@ }, "type": "org.thingsboard.rule.engine.telemetry.TbMsgAttributesNode", "name": "Save Client Attributes", - "debugMode": false, + "debugStrategy": "DISABLED", "configurationVersion": 2, "configuration": { "scope": "CLIENT_SCOPE", @@ -46,7 +46,7 @@ }, "type": "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode", "name": "Message Type Switch", - "debugMode": false, + "debugStrategy": "DISABLED", "configuration": { "version": 0 } @@ -58,7 +58,7 @@ }, "type": "org.thingsboard.rule.engine.action.TbLogNode", "name": "Log RPC from Device", - "debugMode": false, + "debugStrategy": "DISABLED", "configuration": { "scriptLang": "TBEL", "jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);", @@ -72,7 +72,7 @@ }, "type": "org.thingsboard.rule.engine.action.TbLogNode", "name": "Log Other", - "debugMode": false, + "debugStrategy": "DISABLED", "configuration": { "scriptLang": "TBEL", "jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);", @@ -86,7 +86,7 @@ }, "type": "org.thingsboard.rule.engine.rpc.TbSendRPCRequestNode", "name": "RPC Call Request", - "debugMode": false, + "debugStrategy": "DISABLED", "configuration": { "timeoutInSeconds": 60 } @@ -99,7 +99,7 @@ }, "type": "org.thingsboard.rule.engine.profile.TbDeviceProfileNode", "name": "Device Profile Node", - "debugMode": false, + "debugStrategy": "DISABLED", "configuration": { "persistAlarmRulesState": false, "fetchAlarmRulesStateOnStart": false diff --git a/application/src/main/data/upgrade/3.8.0/schema_update.sql b/application/src/main/data/upgrade/3.8.0/schema_update.sql index 6b87dc6dde..dc3bf2386e 100644 --- a/application/src/main/data/upgrade/3.8.0/schema_update.sql +++ b/application/src/main/data/upgrade/3.8.0/schema_update.sql @@ -14,3 +14,24 @@ -- limitations under the License. -- +-- UPDATE RULE NODE DEBUG MODE TO DEBUG STRATEGY START + +ALTER TABLE rule_node + ADD COLUMN IF NOT EXISTS debug_strategy varchar(32) DEFAULT 'DISABLED'; +ALTER TABLE rule_node + ADD COLUMN IF NOT EXISTS last_update_ts bigint NOT NULL DEFAULT extract(epoch from now()) * 1000; +DO +$$ + BEGIN + IF EXISTS (SELECT 1 + FROM information_schema.columns + WHERE table_name = 'rule_node' AND column_name = 'debug_mode') THEN + UPDATE rule_node + SET debug_strategy = CASE WHEN debug_mode = true THEN 'ALL_EVENTS' ELSE 'DISABLED' END; + ALTER TABLE rule_node + DROP COLUMN debug_mode; + END IF; + END +$$; + +-- UPDATE RULE NODE DEBUG MODE TO DEBUG STRATEGY END diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 5966716ed8..2920fa400a 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -577,6 +577,10 @@ public class ActorSystemContext { @Getter private boolean externalNodeForceAck; + @Value("${actors.rule.node.max_debug_mode_duration:60}") + @Getter + private int maxRuleNodeDebugModeDurationMinutes; + @Value("${state.rule.node.deviceState.rateLimit:1:1,30:60,60:3600}") @Getter private String deviceStateNodeRateLimitConfig; diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index ec10402821..3be7524c85 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -63,6 +63,7 @@ import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.rule.DebugStrategy; import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.data.rule.RuleNodeState; import org.thingsboard.server.common.data.script.ScriptLanguage; @@ -129,7 +130,7 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.ENTITY_CREATED; * Created by ashvayka on 19.03.18. */ @Slf4j -class DefaultTbContext implements TbContext { +public class DefaultTbContext implements TbContext { private final ActorSystemContext mainCtx; private final String ruleChainName; @@ -143,25 +144,25 @@ class DefaultTbContext implements TbContext { @Override public void tellSuccess(TbMsg msg) { - tellNext(msg, Collections.singleton(TbNodeConnectionType.SUCCESS), null); + tellNext(msg, Collections.singleton(TbNodeConnectionType.SUCCESS)); } @Override public void tellNext(TbMsg msg, String relationType) { - tellNext(msg, Collections.singleton(relationType), null); + tellNext(msg, Collections.singleton(relationType)); } @Override public void tellNext(TbMsg msg, Set relationTypes) { - tellNext(msg, relationTypes, null); - } - - private void tellNext(TbMsg msg, Set relationTypes, Throwable th) { - if (nodeCtx.getSelf().isDebugMode()) { - relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th)); + RuleNode ruleNode = nodeCtx.getSelf(); + DebugStrategy debugStrategy = ruleNode.getDebugStrategy(); + if (debugStrategy.shouldPersistDebugOutputForAllEvents(ruleNode.getLastUpdateTs(), msg.getTs(), getMaxRuleNodeDebugDurationMinutes())) { + relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), ruleNode.getId(), msg, relationType)); + } else if (debugStrategy.shouldPersistDebugForFailureEventOnly(relationTypes)) { + mainCtx.persistDebugOutput(nodeCtx.getTenantId(), ruleNode.getId(), msg, TbNodeConnectionType.FAILURE); } - msg.getCallback().onProcessingEnd(nodeCtx.getSelf().getId()); - nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null)); + msg.getCallback().onProcessingEnd(ruleNode.getId()); + nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(ruleNode.getRuleChainId(), ruleNode.getId(), relationTypes, msg, null)); } @Override @@ -182,8 +183,12 @@ class DefaultTbContext implements TbContext { if (item == null) { ack(msg); } else { - if (nodeCtx.getSelf().isDebugMode()) { - mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType); + RuleNode ruleNode = nodeCtx.getSelf(); + DebugStrategy debugStrategy = ruleNode.getDebugStrategy(); + if (debugStrategy.shouldPersistDebugOutputForAllEvents(ruleNode.getLastUpdateTs(), msg.getTs(), getMaxRuleNodeDebugDurationMinutes())) { + mainCtx.persistDebugOutput(nodeCtx.getTenantId(), ruleNode.getId(), msg, relationType); + } else if (debugStrategy.shouldPersistDebugForFailureEventOnly(relationType)) { + mainCtx.persistDebugOutput(nodeCtx.getTenantId(), ruleNode.getId(), msg, relationType); } nodeCtx.getChainActor().tell(new RuleChainOutputMsg(item.getRuleChainId(), item.getRuleNodeId(), relationType, msg)); } @@ -213,11 +218,13 @@ class DefaultTbContext implements TbContext { .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) .setTbMsg(TbMsg.toByteString(tbMsg)).build(); - if (nodeCtx.getSelf().isDebugMode()) { - mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "To Root Rule Chain"); - } mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, new SimpleTbQueueCallback( metadata -> { + RuleNode ruleNode = nodeCtx.getSelf(); + DebugStrategy debugStrategy = ruleNode.getDebugStrategy(); + if (debugStrategy.shouldPersistDebugOutputForAllEvents(ruleNode.getLastUpdateTs(), tbMsg.getTs(), getMaxRuleNodeDebugDurationMinutes())) { + mainCtx.persistDebugOutput(nodeCtx.getTenantId(), ruleNode.getId(), tbMsg, TbNodeConnectionType.TO_ROOT_RULE_CHAIN); + } if (onSuccess != null) { onSuccess.run(); } @@ -299,8 +306,9 @@ class DefaultTbContext implements TbContext { } return; } - RuleChainId ruleChainId = nodeCtx.getSelf().getRuleChainId(); - RuleNodeId ruleNodeId = nodeCtx.getSelf().getId(); + RuleNode ruleNode = nodeCtx.getSelf(); + RuleChainId ruleChainId = ruleNode.getRuleChainId(); + RuleNodeId ruleNodeId = ruleNode.getId(); TbMsg tbMsg = TbMsg.newMsg(source, queueName, ruleChainId, ruleNodeId); TransportProtos.ToRuleEngineMsg.Builder msg = TransportProtos.ToRuleEngineMsg.newBuilder() .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) @@ -310,12 +318,15 @@ class DefaultTbContext implements TbContext { if (failureMessage != null) { msg.setFailureMessage(failureMessage); } - if (nodeCtx.getSelf().isDebugMode()) { - relationTypes.forEach(relationType -> - mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, relationType, null, failureMessage)); - } mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg.build(), new SimpleTbQueueCallback( metadata -> { + DebugStrategy debugStrategy = ruleNode.getDebugStrategy(); + if (debugStrategy.shouldPersistDebugOutputForAllEvents(ruleNode.getLastUpdateTs(), tbMsg.getTs(), getMaxRuleNodeDebugDurationMinutes())) { + relationTypes.forEach(relationType -> + mainCtx.persistDebugOutput(nodeCtx.getTenantId(), ruleNode.getId(), tbMsg, relationType, null, failureMessage)); + } else if (debugStrategy.shouldPersistDebugForFailureEventOnly(relationTypes)) { + mainCtx.persistDebugOutput(nodeCtx.getTenantId(), ruleNode.getId(), tbMsg, TbNodeConnectionType.FAILURE, null, failureMessage); + } if (onSuccess != null) { onSuccess.run(); } @@ -331,10 +342,12 @@ class DefaultTbContext implements TbContext { @Override public void ack(TbMsg tbMsg) { - if (nodeCtx.getSelf().isDebugMode()) { - mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "ACK", null); + RuleNode ruleNode = nodeCtx.getSelf(); + DebugStrategy debugStrategy = ruleNode.getDebugStrategy(); + if (debugStrategy.shouldPersistDebugOutputForAllEvents(ruleNode.getLastUpdateTs(), tbMsg.getTs(), getMaxRuleNodeDebugDurationMinutes())) { + mainCtx.persistDebugOutput(nodeCtx.getTenantId(), ruleNode.getId(), tbMsg, TbNodeConnectionType.ACK); } - tbMsg.getCallback().onProcessingEnd(nodeCtx.getSelf().getId()); + tbMsg.getCallback().onProcessingEnd(ruleNode.getId()); tbMsg.getCallback().onSuccess(); } @@ -349,12 +362,13 @@ class DefaultTbContext implements TbContext { @Override public void tellFailure(TbMsg msg, Throwable th) { - if (nodeCtx.getSelf().isDebugMode()) { - mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, TbNodeConnectionType.FAILURE, th); + RuleNode ruleNode = nodeCtx.getSelf(); + if (ruleNode.getDebugStrategy().shouldPersistDebugForFailureEventOnly(ruleNode.getLastUpdateTs(), msg.getTs(), getMaxRuleNodeDebugDurationMinutes())) { + mainCtx.persistDebugOutput(nodeCtx.getTenantId(), ruleNode.getId(), msg, TbNodeConnectionType.FAILURE, th); } String failureMessage = getFailureMessage(th); - nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getRuleChainId(), - nodeCtx.getSelf().getId(), Collections.singleton(TbNodeConnectionType.FAILURE), + nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(ruleNode.getRuleChainId(), + ruleNode.getId(), Collections.singleton(TbNodeConnectionType.FAILURE), msg, failureMessage)); } @@ -1004,4 +1018,14 @@ class DefaultTbContext implements TbContext { return failureMessage; } + private int getMaxRuleNodeDebugDurationMinutes() { + if (!DebugStrategy.ALL_EVENTS.equals(nodeCtx.getSelf().getDebugStrategy())) { + return 0; + } + var configuration = mainCtx.getTenantProfileCache() + .get(getTenantId()).getProfileData().getConfiguration(); + int systemMaxRuleNodeDebugModeDurationMinutes = mainCtx.getMaxRuleNodeDebugModeDurationMinutes(); + return configuration.getMaxRuleNodeDebugModeDurationMinutes(systemMaxRuleNodeDebugModeDurationMinutes); + } + } diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index eaa2218116..2213daa770 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -43,7 +43,6 @@ import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; import org.thingsboard.server.common.msg.queue.RuleEngineException; import org.thingsboard.server.common.msg.queue.RuleNodeException; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; -import org.thingsboard.server.common.stats.TbApiUsageReportClient; import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.queue.TbQueueCallback; @@ -72,7 +71,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor> nodeRoutes; private final RuleChainService service; private final TbClusterService clusterService; - private final TbApiUsageReportClient apiUsageClient; private String ruleChainName; private RuleNodeId firstId; @@ -81,7 +79,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor existingNodes = ruleNodeList.stream().map(RuleNode::getId).collect(Collectors.toSet()); - List removedRules = nodeActors.keySet().stream().filter(node -> !existingNodes.contains(node)).collect(Collectors.toList()); + List removedRules = nodeActors.keySet().stream().filter(node -> !existingNodes.contains(node)).toList(); removedRules.forEach(ruleNodeId -> { log.trace("[{}][{}] Removing rule node [{}]", tenantId, entityId, ruleNodeId); RuleNodeCtx removed = nodeActors.remove(ruleNodeId); @@ -177,7 +174,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor relations = service.getRuleNodeRelations(TenantId.SYS_TENANT_ID, ruleNode.getId()); log.trace("[{}][{}][{}] Processing rule node relations [{}]", tenantId, entityId, ruleNode.getId(), relations.size()); - if (relations.size() == 0) { + if (relations.isEmpty()) { nodeRoutes.put(ruleNode.getId(), Collections.emptyList()); } else { for (EntityRelation relation : relations) { @@ -238,45 +235,53 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor relationTypes; - @Getter private final String failureMessage; public RuleNodeToRuleChainTellNextMsg(RuleChainId ruleChainId, RuleNodeId originator, Set relationTypes, TbMsg tbMsg, String failureMessage) { diff --git a/application/src/main/java/org/thingsboard/server/controller/TenantProfileController.java b/application/src/main/java/org/thingsboard/server/controller/TenantProfileController.java index 007f77aa20..85660b5914 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TenantProfileController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TenantProfileController.java @@ -151,6 +151,7 @@ public class TenantProfileController extends BaseController { " \"maxJSExecutions\": 5000000,\n" + " \"maxDPStorageDays\": 0,\n" + " \"maxRuleNodeExecutionsPerMessage\": 50,\n" + + " \"maxRuleNodeDebugDurationMinutes\": 15,\n" + " \"maxEmails\": 0,\n" + " \"maxSms\": 0,\n" + " \"maxCreatedAlarms\": 1000,\n" + diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/BaseRuleChainMetadataConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/BaseRuleChainMetadataConstructor.java index cb66992565..38b2d959ea 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/BaseRuleChainMetadataConstructor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/BaseRuleChainMetadataConstructor.java @@ -24,6 +24,7 @@ import org.thingsboard.server.common.data.rule.NodeConnectionInfo; import org.thingsboard.server.common.data.rule.RuleChainConnectionInfo; import org.thingsboard.server.common.data.rule.RuleChainMetaData; import org.thingsboard.server.common.data.rule.RuleNode; +import org.thingsboard.server.gen.edge.v1.DebugStrategy; import org.thingsboard.server.gen.edge.v1.EdgeVersion; import org.thingsboard.server.gen.edge.v1.NodeConnectionInfoProto; import org.thingsboard.server.gen.edge.v1.RuleChainConnectionInfoProto; @@ -88,7 +89,8 @@ public abstract class BaseRuleChainMetadataConstructor implements RuleChainMetad .setIdLSB(node.getId().getId().getLeastSignificantBits()) .setType(node.getType()) .setName(node.getName()) - .setDebugMode(node.isDebugMode()) + .setLastUpdateTs(node.getLastUpdateTs()) + .setDebugStrategy(DebugStrategy.forNumber(node.getDebugStrategy().getProtoNumber())) .setConfiguration(JacksonUtil.toString(node.getConfiguration())) .setAdditionalInfo(JacksonUtil.toString(node.getAdditionalInfo())) .setSingletonMode(node.isSingletonMode()) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/tenant/TenantMsgConstructorV1.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/tenant/TenantMsgConstructorV1.java index fc244417bc..5d60c69903 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/tenant/TenantMsgConstructorV1.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/tenant/TenantMsgConstructorV1.java @@ -88,6 +88,7 @@ public class TenantMsgConstructorV1 implements TenantMsgConstructor { configuration.setMaxTransportDataPoints(0); configuration.setRuleEngineExceptionsTtlDays(0); configuration.setMaxRuleNodeExecutionsPerMessage(0); + configuration.setMaxRuleNodeDebugDurationMinutes(0); tenantProfileData.setConfiguration(configuration); tenantProfile.setProfileData(tenantProfileData); diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index e9c8668958..581a59fa36 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -468,6 +468,9 @@ actors: node: # Errors for particular actor are persisted once per specified amount of milliseconds error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}" + # The maximum allowed duration (in minutes) for the debug mode to be enabled if the debug strategy is set to ALL_EVENTS. + # If a specific value is set in the tenant profile, the minimum between value from profile and this setting will be used. + max_debug_mode_duration: "${ACTORS_RULE_NODE_MAX_DEBUG_MODE_DURATION_MINUTES:60}" transaction: # Size of queues that store messages for transaction rule nodes queue_size: "${ACTORS_RULE_TRANSACTION_QUEUE_SIZE:15000}" diff --git a/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java b/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java new file mode 100644 index 0000000000..e9b344866f --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/actors/rule/DefaultTbContextTest.java @@ -0,0 +1,836 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.rule; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.TbActorRef; +import org.thingsboard.server.actors.ruleChain.DefaultTbContext; +import org.thingsboard.server.actors.ruleChain.RuleChainOutputMsg; +import org.thingsboard.server.actors.ruleChain.RuleNodeCtx; +import org.thingsboard.server.actors.ruleChain.RuleNodeToRuleChainTellNextMsg; +import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.RuleChainId; +import org.thingsboard.server.common.data.id.RuleNodeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.data.msg.TbNodeConnectionType; +import org.thingsboard.server.common.data.rule.DebugStrategy; +import org.thingsboard.server.common.data.rule.RuleNode; +import org.thingsboard.server.common.data.tenant.profile.TenantProfileConfiguration; +import org.thingsboard.server.common.data.tenant.profile.TenantProfileData; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.TbMsgProcessingStackItem; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.common.msg.queue.TbMsgCallback; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.dao.tenant.TbTenantProfileCache; +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; +import org.thingsboard.server.queue.common.SimpleTbQueueCallback; + +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.ArgumentMatchers.notNull; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; + +@SuppressWarnings("ResultOfMethodCallIgnored") +@ExtendWith(MockitoExtension.class) +class DefaultTbContextTest { + + private final String EXCEPTION_MSG = "Some runtime exception!"; + private final RuntimeException EXCEPTION = new RuntimeException(EXCEPTION_MSG); + + private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("c7bf4c85-923c-4688-a4b5-0f8a0feb7cd5")); + private final RuleNodeId RULE_NODE_ID = new RuleNodeId(UUID.fromString("1ca5e2ef-1309-41d9-bafa-709e9df0e2a6")); + private final RuleChainId RULE_CHAIN_ID = new RuleChainId(UUID.fromString("b87c4123-f9f2-41a6-9a09-e3a5b6580b11")); + + @Mock + private ActorSystemContext mainCtxMock; + @Mock + private RuleNodeCtx nodeCtxMock; + @Mock + private TbActorRef chainActorMock; + + private DefaultTbContext defaultTbContext; + + @BeforeEach + public void setUp() { + defaultTbContext = new DefaultTbContext(mainCtxMock, "Test rule chain name", nodeCtxMock); + } + + @Test + public void givenDebugStrategyOnlyFailureEvents_whenTellSuccess_thenVerifyDebugOutputNotPersisted() { + // GIVEN + var callbackMock = mock(TbMsgCallback.class); + var msg = getTbMsgWithCallback(callbackMock); + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugStrategy(DebugStrategy.ONLY_FAILURE_EVENTS); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + given(nodeCtxMock.getChainActor()).willReturn(chainActorMock); + + // WHEN + defaultTbContext.tellSuccess(msg); + + // THEN + then(nodeCtxMock).should().getChainActor(); + then(nodeCtxMock).shouldHaveNoMoreInteractions(); + then(mainCtxMock).shouldHaveNoInteractions(); + checkTellNextCommonLogic(callbackMock, TbNodeConnectionType.SUCCESS, msg); + } + + @Test + public void givenDebugStrategyOnlyFailureEventsAndSuccessConnection_whenTellNext_thenVerifyDebugOutputNotPersisted() { + // GIVEN + var callbackMock = mock(TbMsgCallback.class); + var msg = getTbMsgWithCallback(callbackMock); + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugStrategy(DebugStrategy.ONLY_FAILURE_EVENTS); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + given(nodeCtxMock.getChainActor()).willReturn(chainActorMock); + + // WHEN + defaultTbContext.tellNext(msg, TbNodeConnectionType.SUCCESS); + + // THEN + then(nodeCtxMock).should().getChainActor(); + then(nodeCtxMock).shouldHaveNoMoreInteractions(); + then(mainCtxMock).shouldHaveNoInteractions(); + checkTellNextCommonLogic(callbackMock, TbNodeConnectionType.SUCCESS, msg); + } + + @MethodSource + @ParameterizedTest + void givenDebugStrategyOnlyFailureEventsAndConnections_whenTellNext_thenVerifyDebugOutputPersisted(Set connections) { + // GIVEN + var callbackMock = mock(TbMsgCallback.class); + var msg = getTbMsgWithCallback(callbackMock); + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugStrategy(DebugStrategy.ONLY_FAILURE_EVENTS); + given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + given(nodeCtxMock.getChainActor()).willReturn(chainActorMock); + + // WHEN + defaultTbContext.tellNext(msg, connections); + + // THEN + then(nodeCtxMock).should().getChainActor(); + then(nodeCtxMock).shouldHaveNoMoreInteractions(); + then(mainCtxMock).should().persistDebugOutput(TENANT_ID, RULE_NODE_ID, msg, TbNodeConnectionType.FAILURE); + then(mainCtxMock).shouldHaveNoMoreInteractions(); + checkTellNextCommonLogic(callbackMock, connections, msg); + } + + private static Stream> givenDebugStrategyOnlyFailureEventsAndConnections_whenTellNext_thenVerifyDebugOutputPersisted() { + return Stream.of( + Collections.singleton(TbNodeConnectionType.FAILURE), + Set.of(TbNodeConnectionType.FAILURE, TbNodeConnectionType.SUCCESS) + ); + } + + @MethodSource + @ParameterizedTest + void givenDebugStrategyDisabledAndConnections_whenTellNext_thenVerifyDebugOutputNotPersisted(Set connections) { + // GIVEN + var callbackMock = mock(TbMsgCallback.class); + var msg = getTbMsgWithCallback(callbackMock); + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugStrategy(DebugStrategy.DISABLED); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + given(nodeCtxMock.getChainActor()).willReturn(chainActorMock); + + // WHEN + defaultTbContext.tellNext(msg, connections); + + // THEN + then(nodeCtxMock).should().getChainActor(); + then(nodeCtxMock).shouldHaveNoMoreInteractions(); + then(mainCtxMock).shouldHaveNoInteractions(); + checkTellNextCommonLogic(callbackMock, connections, msg); + } + + private static Stream> givenDebugStrategyDisabledAndConnections_whenTellNext_thenVerifyDebugOutputNotPersisted() { + return Stream.of( + Collections.singleton(TbNodeConnectionType.FAILURE), + Collections.singleton(TbNodeConnectionType.SUCCESS), + Set.of(TbNodeConnectionType.FAILURE, TbNodeConnectionType.SUCCESS) + ); + } + + @MethodSource + @ParameterizedTest + void givenDebugStrategyAllEventsAndConnection_whenTellNext_thenVerifyDebugOutputPersisted(String connection) { + // GIVEN + var callbackMock = mock(TbMsgCallback.class); + var msg = getTbMsgWithCallback(callbackMock); + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setLastUpdateTs(System.currentTimeMillis()); + ruleNode.setDebugStrategy(DebugStrategy.ALL_EVENTS); + given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + given(nodeCtxMock.getChainActor()).willReturn(chainActorMock); + mockGetMaxRuleNodeDebugModeDurationMinutes(); + + // WHEN + defaultTbContext.tellNext(msg, connection); + + // THEN + then(nodeCtxMock).should().getChainActor(); + then(nodeCtxMock).shouldHaveNoMoreInteractions(); + then(mainCtxMock).should().getTenantProfileCache(); + then(mainCtxMock).should().getMaxRuleNodeDebugModeDurationMinutes(); + then(mainCtxMock).should().persistDebugOutput(TENANT_ID, RULE_NODE_ID, msg, connection); + then(mainCtxMock).shouldHaveNoMoreInteractions(); + checkTellNextCommonLogic(callbackMock, connection, msg); + } + + private static Stream givenDebugStrategyAllEventsAndConnection_whenTellNext_thenVerifyDebugOutputPersisted() { + return failureAndSuccessConnection(); + } + + @Test + public void givenDebugStrategyAllEventsAndFailureAndSuccessConnection_whenTellNext_thenVerifyDebugOutputPersistedForAllEvents() { + // GIVEN + var callbackMock = mock(TbMsgCallback.class); + var msg = getTbMsgWithCallback(callbackMock); + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setLastUpdateTs(System.currentTimeMillis()); + ruleNode.setDebugStrategy(DebugStrategy.ALL_EVENTS); + given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + given(nodeCtxMock.getChainActor()).willReturn(chainActorMock); + mockGetMaxRuleNodeDebugModeDurationMinutes(); + + // WHEN + Set connections = failureAndSuccessConnection().collect(Collectors.toSet()); + defaultTbContext.tellNext(msg, connections); + + // THEN + then(nodeCtxMock).should().getChainActor(); + then(nodeCtxMock).shouldHaveNoMoreInteractions(); + then(mainCtxMock).should().getTenantProfileCache(); + then(mainCtxMock).should().getMaxRuleNodeDebugModeDurationMinutes(); + var nodeConnectionsCaptor = ArgumentCaptor.forClass(String.class); + int wantedNumberOfInvocations = connections.size(); + then(mainCtxMock).should(times(wantedNumberOfInvocations)).persistDebugOutput(eq(TENANT_ID), eq(RULE_NODE_ID), eq(msg), nodeConnectionsCaptor.capture()); + then(mainCtxMock).shouldHaveNoMoreInteractions(); + assertThat(nodeConnectionsCaptor.getAllValues()).hasSize(wantedNumberOfInvocations); + assertThat(nodeConnectionsCaptor.getAllValues()).containsExactlyInAnyOrderElementsOf(connections); + checkTellNextCommonLogic(callbackMock, connections, msg); + } + + private static Stream failureAndSuccessConnection() { + return Stream.of(TbNodeConnectionType.FAILURE, TbNodeConnectionType.SUCCESS); + } + + @Test + public void givenDebugStrategyOnlyFailureEventsAndFailureConnection_whenOutput_thenVerifyDebugOutputPersisted() { + // GIVEN + var msgMock = mock(TbMsg.class); + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugStrategy(DebugStrategy.ONLY_FAILURE_EVENTS); + given(msgMock.popFormStack()).willReturn(new TbMsgProcessingStackItem(RULE_CHAIN_ID, RULE_NODE_ID)); + given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + given(nodeCtxMock.getChainActor()).willReturn(chainActorMock); + + // WHEN + defaultTbContext.output(msgMock, TbNodeConnectionType.FAILURE); + + // THEN + checkOutputCommonLogic(msgMock, TbNodeConnectionType.FAILURE); + then(mainCtxMock).should().persistDebugOutput(TENANT_ID, RULE_NODE_ID, msgMock, TbNodeConnectionType.FAILURE); + then(mainCtxMock).shouldHaveNoMoreInteractions(); + then(nodeCtxMock).shouldHaveNoMoreInteractions(); + } + + @Test + public void givenDebugStrategyOnlyFailureEventsAndSuccessConnection_whenOutput_thenVerifyDebugOutputNotPersisted() { + // GIVEN + var msgMock = mock(TbMsg.class); + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugStrategy(DebugStrategy.ONLY_FAILURE_EVENTS); + given(msgMock.popFormStack()).willReturn(new TbMsgProcessingStackItem(RULE_CHAIN_ID, RULE_NODE_ID)); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + given(nodeCtxMock.getChainActor()).willReturn(chainActorMock); + + // WHEN + defaultTbContext.output(msgMock, TbNodeConnectionType.SUCCESS); + + // THEN + checkOutputCommonLogic(msgMock, TbNodeConnectionType.SUCCESS); + then(mainCtxMock).shouldHaveNoMoreInteractions(); + then(nodeCtxMock).shouldHaveNoMoreInteractions(); + } + + @ParameterizedTest + @ValueSource(strings = {TbNodeConnectionType.SUCCESS, TbNodeConnectionType.FAILURE}) + void givenDebugStrategyDisabled_whenOutput_thenVerifyDebugOutputNotPersisted(String nodeConnection) { + // GIVEN + var msgMock = mock(TbMsg.class); + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugStrategy(DebugStrategy.DISABLED); + given(msgMock.popFormStack()).willReturn(new TbMsgProcessingStackItem(RULE_CHAIN_ID, RULE_NODE_ID)); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + given(nodeCtxMock.getChainActor()).willReturn(chainActorMock); + + // WHEN + defaultTbContext.output(msgMock, nodeConnection); + + // THEN + checkOutputCommonLogic(msgMock, nodeConnection); + then(mainCtxMock).shouldHaveNoMoreInteractions(); + then(nodeCtxMock).shouldHaveNoMoreInteractions(); + } + + @ParameterizedTest + @ValueSource(strings = {TbNodeConnectionType.SUCCESS, TbNodeConnectionType.FAILURE}) + void givenDebugStrategyAllEvents_whenOutput_thenVerifyDebugOutputPersisted(String nodeConnection) { + // GIVEN + var msgMock = mock(TbMsg.class); + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugStrategy(DebugStrategy.ALL_EVENTS); + given(msgMock.popFormStack()).willReturn(new TbMsgProcessingStackItem(RULE_CHAIN_ID, RULE_NODE_ID)); + given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + given(nodeCtxMock.getChainActor()).willReturn(chainActorMock); + mockGetMaxRuleNodeDebugModeDurationMinutes(); + + // WHEN + defaultTbContext.output(msgMock, nodeConnection); + + // THEN + checkOutputCommonLogic(msgMock, nodeConnection); + then(mainCtxMock).should().getTenantProfileCache(); + then(mainCtxMock).should().getMaxRuleNodeDebugModeDurationMinutes(); + then(mainCtxMock).should().persistDebugOutput(TENANT_ID, RULE_NODE_ID, msgMock, nodeConnection); + then(mainCtxMock).shouldHaveNoMoreInteractions(); + then(nodeCtxMock).shouldHaveNoMoreInteractions(); + } + + @Test + public void givenEmptyStack_whenOutput_thenVerifyMsgAck() { + // GIVEN + var msgMock = mock(TbMsg.class); + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugStrategy(DebugStrategy.DISABLED); + given(msgMock.popFormStack()).willReturn(null); + TbMsgCallback callbackMock = mock(TbMsgCallback.class); + given(msgMock.getCallback()).willReturn(callbackMock); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + + // WHEN + defaultTbContext.output(msgMock, TbNodeConnectionType.SUCCESS); + + // THEN + then(msgMock).should().popFormStack(); + then(callbackMock).should().onProcessingEnd(RULE_NODE_ID); + then(callbackMock).should().onSuccess(); + then(nodeCtxMock).should(never()).getChainActor(); + } + + @Test + public void givenEmptyStackAndDebugStrategyAllEvents_whenOutput_thenVerifyMsgAckAndDebugOutputPersisted() { + // GIVEN + var msgMock = mock(TbMsg.class); + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugStrategy(DebugStrategy.ALL_EVENTS); + ruleNode.setLastUpdateTs(System.currentTimeMillis()); + given(msgMock.popFormStack()).willReturn(null); + TbMsgCallback callbackMock = mock(TbMsgCallback.class); + given(msgMock.getCallback()).willReturn(callbackMock); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); + mockGetMaxRuleNodeDebugModeDurationMinutes(); + + // WHEN + defaultTbContext.output(msgMock, TbNodeConnectionType.SUCCESS); + + // THEN + then(msgMock).should().popFormStack(); + then(callbackMock).should().onProcessingEnd(RULE_NODE_ID); + then(callbackMock).should().onSuccess(); + then(nodeCtxMock).should(never()).getChainActor(); + then(mainCtxMock).should().persistDebugOutput(TENANT_ID, RULE_NODE_ID, msgMock, TbNodeConnectionType.ACK); + } + + @Test + public void givenDebugStrategyOnlyFailureEvents_whenEnqueueForTellFailure_thenVerifyDebugOutputPersisted() { + // GIVEN + var msg = getTbMsgWithQueueName(); + var tpi = new TopicPartitionInfo(DataConstants.MAIN_QUEUE_TOPIC, TENANT_ID, 0, true); + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugStrategy(DebugStrategy.ONLY_FAILURE_EVENTS); + var tbClusterServiceMock = mock(TbClusterService.class); + + given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + given(mainCtxMock.resolve(any(ServiceType.class), anyString(), any(TenantId.class), any(EntityId.class))).willReturn(tpi); + given(mainCtxMock.getClusterService()).willReturn(tbClusterServiceMock); + + // WHEN + defaultTbContext.enqueueForTellFailure(msg, EXCEPTION); + + // THEN + then(mainCtxMock).should().resolve(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, TENANT_ID, TENANT_ID); + TbMsg expectedTbMsg = TbMsg.newMsg(msg, msg.getQueueName(), RULE_CHAIN_ID, RULE_NODE_ID); + checkEnqueueForTellFailurePushMsgToRuleEngine(tbClusterServiceMock, tpi, expectedTbMsg); + ArgumentCaptor tbMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); + then(mainCtxMock).should().persistDebugOutput(eq(TENANT_ID), eq(RULE_NODE_ID), tbMsgCaptor.capture(), eq(TbNodeConnectionType.FAILURE), isNull(), eq(EXCEPTION_MSG)); + TbMsg actualTbMsg = tbMsgCaptor.getValue(); + assertThat(actualTbMsg).usingRecursiveComparison() + .ignoringFields("id", "ctx") + .isEqualTo(expectedTbMsg); + then(mainCtxMock).should().getClusterService(); + then(mainCtxMock).shouldHaveNoMoreInteractions(); + then(tbClusterServiceMock).shouldHaveNoMoreInteractions(); + } + + @Test + public void givenDebugStrategyDisabled_whenEnqueueForTellFailure_thenVerifyDebugOutputNotPersisted() { + // GIVEN + var msg = getTbMsgWithQueueName(); + var tpi = new TopicPartitionInfo(DataConstants.MAIN_QUEUE_TOPIC, TENANT_ID, 0, true); + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugStrategy(DebugStrategy.DISABLED); + var tbClusterServiceMock = mock(TbClusterService.class); + + given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + given(mainCtxMock.resolve(any(ServiceType.class), anyString(), any(TenantId.class), any(EntityId.class))).willReturn(tpi); + given(mainCtxMock.getClusterService()).willReturn(tbClusterServiceMock); + + // WHEN + defaultTbContext.enqueueForTellFailure(msg, EXCEPTION); + + // THEN + then(mainCtxMock).should().resolve(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, TENANT_ID, TENANT_ID); + TbMsg expectedTbMsg = TbMsg.newMsg(msg, msg.getQueueName(), RULE_CHAIN_ID, RULE_NODE_ID); + checkEnqueueForTellFailurePushMsgToRuleEngine(tbClusterServiceMock, tpi, expectedTbMsg); + then(mainCtxMock).should().getClusterService(); + then(mainCtxMock).shouldHaveNoMoreInteractions(); + then(tbClusterServiceMock).shouldHaveNoMoreInteractions(); + } + + @Test + public void givenDebugStrategyAllEvents_whenEnqueueForTellFailure_thenVerifyDebugOutputPersisted() { + // GIVEN + var msg = getTbMsgWithQueueName(); + var tpi = new TopicPartitionInfo(DataConstants.MAIN_QUEUE_TOPIC, TENANT_ID, 0, true); + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugStrategy(DebugStrategy.ALL_EVENTS); + ruleNode.setLastUpdateTs(System.currentTimeMillis()); + var tbClusterServiceMock = mock(TbClusterService.class); + + given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + given(mainCtxMock.resolve(any(ServiceType.class), anyString(), any(TenantId.class), any(EntityId.class))).willReturn(tpi); + given(mainCtxMock.getClusterService()).willReturn(tbClusterServiceMock); + mockGetMaxRuleNodeDebugModeDurationMinutes(); + + // WHEN + defaultTbContext.enqueueForTellFailure(msg, EXCEPTION); + + // THEN + then(mainCtxMock).should().resolve(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, TENANT_ID, TENANT_ID); + TbMsg expectedTbMsg = TbMsg.newMsg(msg, msg.getQueueName(), RULE_CHAIN_ID, RULE_NODE_ID); + checkEnqueueForTellFailurePushMsgToRuleEngine(tbClusterServiceMock, tpi, expectedTbMsg); + ArgumentCaptor tbMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); + then(mainCtxMock).should().persistDebugOutput(eq(TENANT_ID), eq(RULE_NODE_ID), tbMsgCaptor.capture(), eq(TbNodeConnectionType.FAILURE), isNull(), eq(EXCEPTION_MSG)); + TbMsg actualTbMsg = tbMsgCaptor.getValue(); + assertThat(actualTbMsg).usingRecursiveComparison() + .ignoringFields("id", "ctx") + .isEqualTo(expectedTbMsg); + then(mainCtxMock).should().getClusterService(); + then(mainCtxMock).should().getTenantProfileCache(); + then(mainCtxMock).should().getMaxRuleNodeDebugModeDurationMinutes(); + then(mainCtxMock).shouldHaveNoMoreInteractions(); + then(tbClusterServiceMock).shouldHaveNoMoreInteractions(); + } + + @Test + public void givenInvalidMsg_whenEnqueueForTellFailure_thenDoNothing() { + // GIVEN + var msgMock = mock(TbMsg.class); + var tpi = new TopicPartitionInfo(DataConstants.MAIN_QUEUE_TOPIC, TENANT_ID, 0, true); + + given(msgMock.getOriginator()).willReturn(TENANT_ID); + given(msgMock.getQueueName()).willReturn(DataConstants.MAIN_QUEUE_NAME); + given(msgMock.isValid()).willReturn(false); + given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); + given(mainCtxMock.resolve(any(ServiceType.class), anyString(), any(TenantId.class), any(EntityId.class))).willReturn(tpi); + + // WHEN + defaultTbContext.enqueueForTellFailure(msgMock, EXCEPTION); + + // THEN + then(msgMock).should(times(2)).getQueueName(); + then(msgMock).should().getOriginator(); + then(msgMock).should().isValid(); + then(msgMock).shouldHaveNoMoreInteractions(); + + then(mainCtxMock).should().resolve(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, TENANT_ID, TENANT_ID); + then(mainCtxMock).shouldHaveNoMoreInteractions(); + + then(nodeCtxMock).should(times(2)).getTenantId(); + then(nodeCtxMock).shouldHaveNoMoreInteractions(); + then(chainActorMock).shouldHaveNoInteractions(); + } + + @MethodSource + @ParameterizedTest + void givenDebugStrategyOptions_whenEnqueueForTellNext_thenVerifyDebugOutputPersistedOnlyForAllEventsDebugStrategy(DebugStrategy debugStrategy, String connectionType) { + // GIVEN + var msg = getTbMsgWithQueueName(); + var tpi = new TopicPartitionInfo(DataConstants.MAIN_QUEUE_TOPIC, TENANT_ID, 0, true); + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugStrategy(debugStrategy); + ruleNode.setLastUpdateTs(System.currentTimeMillis()); + var tbClusterServiceMock = mock(TbClusterService.class); + + given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + given(mainCtxMock.resolve(any(ServiceType.class), anyString(), any(TenantId.class), any(EntityId.class))).willReturn(tpi); + given(mainCtxMock.getClusterService()).willReturn(tbClusterServiceMock); + if (DebugStrategy.ALL_EVENTS.equals(debugStrategy)) { + mockGetMaxRuleNodeDebugModeDurationMinutes(); + } + + // WHEN + defaultTbContext.enqueueForTellNext(msg, connectionType); + + // THEN + then(mainCtxMock).should().resolve(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, TENANT_ID, TENANT_ID); + TbMsg expectedTbMsg = TbMsg.newMsg(msg, msg.getQueueName(), RULE_CHAIN_ID, RULE_NODE_ID); + + ArgumentCaptor toRuleEngineMsgCaptor = ArgumentCaptor.forClass(ToRuleEngineMsg.class); + ArgumentCaptor simpleTbQueueCallbackCaptor = ArgumentCaptor.forClass(SimpleTbQueueCallback.class); + then(tbClusterServiceMock).should().pushMsgToRuleEngine(eq(tpi), notNull(UUID.class), toRuleEngineMsgCaptor.capture(), simpleTbQueueCallbackCaptor.capture()); + + ToRuleEngineMsg actualToRuleEngineMsg = toRuleEngineMsgCaptor.getValue(); + assertThat(actualToRuleEngineMsg).usingRecursiveComparison() + .ignoringFields("tbMsg_") + .isEqualTo(ToRuleEngineMsg.newBuilder() + .setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits()) + .setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits()) + .setTbMsg(TbMsg.toByteString(expectedTbMsg)) + .addAllRelationTypes(List.of(connectionType)).build()); + + var simpleTbQueueCallback = simpleTbQueueCallbackCaptor.getValue(); + assertThat(simpleTbQueueCallback).isNotNull(); + simpleTbQueueCallback.onSuccess(null); + + if (DebugStrategy.ALL_EVENTS.equals(debugStrategy)) { + then(mainCtxMock).should().getTenantProfileCache(); + then(mainCtxMock).should().getMaxRuleNodeDebugModeDurationMinutes(); + ArgumentCaptor tbMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); + then(mainCtxMock).should().persistDebugOutput(eq(TENANT_ID), eq(RULE_NODE_ID), tbMsgCaptor.capture(), eq(connectionType), isNull(), isNull()); + TbMsg actualTbMsg = tbMsgCaptor.getValue(); + assertThat(actualTbMsg).usingRecursiveComparison() + .ignoringFields("id", "ctx") + .isEqualTo(expectedTbMsg); + } + then(mainCtxMock).should().getClusterService(); + then(mainCtxMock).shouldHaveNoMoreInteractions(); + then(tbClusterServiceMock).shouldHaveNoMoreInteractions(); + } + + @MethodSource + @ParameterizedTest + void givenDebugStrategyOptions_whenEnqueue_thenVerifyDebugOutputPersistedOnlyForAllEventsDebugStrategy(DebugStrategy debugStrategy) { + // GIVEN + var msg = getTbMsgWithQueueName(); + var tpi = new TopicPartitionInfo(DataConstants.MAIN_QUEUE_TOPIC, TENANT_ID, 0, true); + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setQueueName(DataConstants.MAIN_QUEUE_NAME); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugStrategy(debugStrategy); + ruleNode.setLastUpdateTs(System.currentTimeMillis()); + var tbClusterServiceMock = mock(TbClusterService.class); + + given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + given(mainCtxMock.resolve(any(ServiceType.class), anyString(), any(TenantId.class), any(EntityId.class))).willReturn(tpi); + given(mainCtxMock.getClusterService()).willReturn(tbClusterServiceMock); + if (DebugStrategy.ALL_EVENTS.equals(debugStrategy)) { + mockGetMaxRuleNodeDebugModeDurationMinutes(); + } + + Consumer onFailure = mock(Consumer.class); + Runnable onSuccess = mock(Runnable.class); + + // WHEN + defaultTbContext.enqueue(msg, onSuccess, onFailure); + + // THEN + then(mainCtxMock).should().resolve(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, TENANT_ID, TENANT_ID); + TbMsg expectedTbMsg = TbMsg.newMsg(msg, msg.getQueueName(), RULE_CHAIN_ID, RULE_NODE_ID); + + ArgumentCaptor toRuleEngineMsgCaptor = ArgumentCaptor.forClass(ToRuleEngineMsg.class); + ArgumentCaptor simpleTbQueueCallbackCaptor = ArgumentCaptor.forClass(SimpleTbQueueCallback.class); + then(tbClusterServiceMock).should().pushMsgToRuleEngine(eq(tpi), notNull(UUID.class), toRuleEngineMsgCaptor.capture(), simpleTbQueueCallbackCaptor.capture()); + + ToRuleEngineMsg actualToRuleEngineMsg = toRuleEngineMsgCaptor.getValue(); + assertThat(actualToRuleEngineMsg).usingRecursiveComparison() + .ignoringFields("tbMsg_") + .isEqualTo(ToRuleEngineMsg.newBuilder() + .setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits()) + .setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits()) + .setTbMsg(TbMsg.toByteString(expectedTbMsg)) + .build()); + + var simpleTbQueueCallback = simpleTbQueueCallbackCaptor.getValue(); + assertThat(simpleTbQueueCallback).isNotNull(); + simpleTbQueueCallback.onSuccess(null); + + if (DebugStrategy.ALL_EVENTS.equals(debugStrategy)) { + then(mainCtxMock).should().getTenantProfileCache(); + then(mainCtxMock).should().getMaxRuleNodeDebugModeDurationMinutes(); + then(mainCtxMock).should().persistDebugOutput(eq(TENANT_ID), eq(RULE_NODE_ID), eq(msg), eq(TbNodeConnectionType.TO_ROOT_RULE_CHAIN)); + } + then(mainCtxMock).should().getClusterService(); + then(mainCtxMock).shouldHaveNoMoreInteractions(); + then(tbClusterServiceMock).shouldHaveNoMoreInteractions(); + } + + @Test + public void givenDebugStrategyOnlyFailures_whenTellFailure_thenVerifyDebugOutputPersisted() { + // GIVEN + var msg = getTbMsg(); + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugStrategy(DebugStrategy.ONLY_FAILURE_EVENTS); + given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + given(nodeCtxMock.getChainActor()).willReturn(chainActorMock); + + // WHEN + defaultTbContext.tellFailure(msg, EXCEPTION); + + // THEN + var expectedRuleNodeToRuleChainTellNextMsg = new RuleNodeToRuleChainTellNextMsg( + RULE_CHAIN_ID, + RULE_NODE_ID, + Collections.singleton(TbNodeConnectionType.FAILURE), + msg, + EXCEPTION_MSG + ); + then(chainActorMock).should().tell(expectedRuleNodeToRuleChainTellNextMsg); + then(chainActorMock).shouldHaveNoMoreInteractions(); + then(nodeCtxMock).should().getChainActor(); + then(mainCtxMock).should().persistDebugOutput(TENANT_ID, RULE_NODE_ID, msg, TbNodeConnectionType.FAILURE, EXCEPTION); + then(mainCtxMock).shouldHaveNoMoreInteractions(); + then(nodeCtxMock).shouldHaveNoMoreInteractions(); + } + + @Test + public void givenDebugStrategyDisabled_whenTellFailure_thenVerifyDebugOutputNotPersisted() { + // GIVEN + var msg = getTbMsg(); + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugStrategy(DebugStrategy.DISABLED); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + given(nodeCtxMock.getChainActor()).willReturn(chainActorMock); + + // WHEN + defaultTbContext.tellFailure(msg, EXCEPTION); + + // THEN + var expectedRuleNodeToRuleChainTellNextMsg = new RuleNodeToRuleChainTellNextMsg( + RULE_CHAIN_ID, + RULE_NODE_ID, + Collections.singleton(TbNodeConnectionType.FAILURE), + msg, + EXCEPTION_MSG + ); + then(chainActorMock).should().tell(expectedRuleNodeToRuleChainTellNextMsg); + then(chainActorMock).shouldHaveNoMoreInteractions(); + then(nodeCtxMock).should().getChainActor(); + then(mainCtxMock).shouldHaveNoInteractions(); + then(nodeCtxMock).shouldHaveNoMoreInteractions(); + } + + @Test + public void givenDebugStrategyAllEvents_whenTellFailure_thenVerifyDebugOutputPersisted() { + // GIVEN + var msg = getTbMsg(); + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugStrategy(DebugStrategy.ALL_EVENTS); + ruleNode.setLastUpdateTs(System.currentTimeMillis()); + given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + given(nodeCtxMock.getChainActor()).willReturn(chainActorMock); + mockGetMaxRuleNodeDebugModeDurationMinutes(); + + // WHEN + defaultTbContext.tellFailure(msg, EXCEPTION); + + // THEN + var expectedRuleNodeToRuleChainTellNextMsg = new RuleNodeToRuleChainTellNextMsg( + RULE_CHAIN_ID, + RULE_NODE_ID, + Collections.singleton(TbNodeConnectionType.FAILURE), + msg, + EXCEPTION_MSG + ); + then(chainActorMock).should().tell(expectedRuleNodeToRuleChainTellNextMsg); + then(chainActorMock).shouldHaveNoMoreInteractions(); + then(nodeCtxMock).should().getChainActor(); + then(mainCtxMock).should().persistDebugOutput(TENANT_ID, RULE_NODE_ID, msg, TbNodeConnectionType.FAILURE, EXCEPTION); + then(mainCtxMock).should().getTenantProfileCache(); + then(mainCtxMock).should().getMaxRuleNodeDebugModeDurationMinutes(); + then(mainCtxMock).shouldHaveNoMoreInteractions(); + then(nodeCtxMock).shouldHaveNoMoreInteractions(); + } + + private void checkTellNextCommonLogic(TbMsgCallback callbackMock, String nodeConnection, TbMsg msg) { + checkTellNextCommonLogic(callbackMock, Collections.singleton(nodeConnection), msg); + } + + private void checkTellNextCommonLogic(TbMsgCallback callbackMock, Set nodeConnections, TbMsg msg) { + then(callbackMock).should().onProcessingEnd(RULE_NODE_ID); + then(callbackMock).shouldHaveNoMoreInteractions(); + var expectedRuleNodeToRuleChainTellNextMsg = new RuleNodeToRuleChainTellNextMsg( + RULE_CHAIN_ID, + RULE_NODE_ID, + nodeConnections, + msg, + null); + then(chainActorMock).should().tell(expectedRuleNodeToRuleChainTellNextMsg); + then(chainActorMock).shouldHaveNoMoreInteractions(); + } + + private void checkOutputCommonLogic(TbMsg msg, String nodeConnection) { + then(msg).should().popFormStack(); + var expectedRuleChainOutputMsg = new RuleChainOutputMsg( + RULE_CHAIN_ID, + RULE_NODE_ID, + nodeConnection, + msg); + then(chainActorMock).should().tell(expectedRuleChainOutputMsg); + then(chainActorMock).shouldHaveNoMoreInteractions(); + then(nodeCtxMock).should().getChainActor(); + } + + private void checkEnqueueForTellFailurePushMsgToRuleEngine(TbClusterService tbClusterService, TopicPartitionInfo tpi, TbMsg expectedTbMsg) { + ArgumentCaptor toRuleEngineMsgCaptor = ArgumentCaptor.forClass(ToRuleEngineMsg.class); + ArgumentCaptor simpleTbQueueCallbackCaptor = ArgumentCaptor.forClass(SimpleTbQueueCallback.class); + then(tbClusterService).should().pushMsgToRuleEngine(eq(tpi), notNull(UUID.class), toRuleEngineMsgCaptor.capture(), simpleTbQueueCallbackCaptor.capture()); + + ToRuleEngineMsg actualToRuleEngineMsg = toRuleEngineMsgCaptor.getValue(); + assertThat(actualToRuleEngineMsg).usingRecursiveComparison() + .ignoringFields("tbMsg_") + .isEqualTo(ToRuleEngineMsg.newBuilder() + .setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits()) + .setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits()) + .setTbMsg(TbMsg.toByteString(expectedTbMsg)) + .setFailureMessage(EXCEPTION_MSG) + .addAllRelationTypes(List.of(TbNodeConnectionType.FAILURE)).build()); + + var simpleTbQueueCallback = simpleTbQueueCallbackCaptor.getValue(); + assertThat(simpleTbQueueCallback).isNotNull(); + simpleTbQueueCallback.onSuccess(null); + } + + private static Stream givenDebugStrategyOptions_whenEnqueueForTellNext_thenVerifyDebugOutputPersistedOnlyForAllEventsDebugStrategy() { + return Stream.of( + Arguments.of(DebugStrategy.ALL_EVENTS, TbNodeConnectionType.OTHER), + Arguments.of(DebugStrategy.ONLY_FAILURE_EVENTS, TbNodeConnectionType.TRUE), + Arguments.of(DebugStrategy.DISABLED, TbNodeConnectionType.FALSE) + ); + } + + private static Stream givenDebugStrategyOptions_whenEnqueue_thenVerifyDebugOutputPersistedOnlyForAllEventsDebugStrategy() { + return Stream.of( + Arguments.of(DebugStrategy.ALL_EVENTS), + Arguments.of(DebugStrategy.ONLY_FAILURE_EVENTS), + Arguments.of(DebugStrategy.DISABLED) + ); + } + + private TbMsg getTbMsgWithCallback(TbMsgCallback callback) { + return TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, TENANT_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING, callback); + } + + private TbMsg getTbMsgWithQueueName() { + return TbMsg.newMsg(DataConstants.MAIN_QUEUE_NAME, TbMsgType.POST_TELEMETRY_REQUEST, TENANT_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING); + } + + private TbMsg getTbMsg() { + return TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, TENANT_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING); + } + + private void mockGetMaxRuleNodeDebugModeDurationMinutes() { + var tbTenantProfileCacheMock = mock(TbTenantProfileCache.class); + var tenantProfileMock = mock(TenantProfile.class); + var tenantProfileDataMock = mock(TenantProfileData.class); + var tenantProfileConfigurationMock = mock(TenantProfileConfiguration.class); + + given(mainCtxMock.getTenantProfileCache()).willReturn(tbTenantProfileCacheMock); + given(tbTenantProfileCacheMock.get(TENANT_ID)).willReturn(tenantProfileMock); + given(tenantProfileMock.getProfileData()).willReturn(tenantProfileDataMock); + given(tenantProfileDataMock.getConfiguration()).willReturn(tenantProfileConfigurationMock); + given(tenantProfileConfigurationMock.getMaxRuleNodeDebugModeDurationMinutes(anyInt())).willReturn(15); + } + +} diff --git a/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java index 5484c29305..a09ca1d148 100644 --- a/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java @@ -71,6 +71,7 @@ import org.thingsboard.server.common.data.query.EntityKeyValueType; import org.thingsboard.server.common.data.query.FilterPredicateValue; import org.thingsboard.server.common.data.query.NumericFilterPredicate; import org.thingsboard.server.common.data.queue.Queue; +import org.thingsboard.server.common.data.rule.DebugStrategy; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainMetaData; import org.thingsboard.server.common.data.rule.RuleChainType; @@ -103,7 +104,6 @@ import org.thingsboard.server.gen.edge.v1.UserUpdateMsg; import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.Random; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -125,8 +125,6 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { protected EdgeImitator edgeImitator; protected Edge edge; - private Random random = new Random(); - @Autowired protected EdgeEventService edgeEventService; @@ -210,7 +208,7 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { protected void updateRootRuleChainMetadata() throws Exception { RuleChainId rootRuleChainId = getEdgeRootRuleChainId(); RuleChainMetaData rootRuleChainMetadata = doGet("/api/ruleChain/" + rootRuleChainId.getId().toString() + "/metadata", RuleChainMetaData.class); - rootRuleChainMetadata.getNodes().forEach(n -> n.setDebugMode(random.nextBoolean())); + rootRuleChainMetadata.getNodes().forEach(n -> n.setDebugStrategy(DebugStrategy.ALL_EVENTS)); doPost("/api/ruleChain/metadata", rootRuleChainMetadata, RuleChainMetaData.class); } diff --git a/application/src/test/java/org/thingsboard/server/edge/RuleChainEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/RuleChainEdgeTest.java index eaf4468156..2dc2bd42f3 100644 --- a/application/src/test/java/org/thingsboard/server/edge/RuleChainEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/RuleChainEdgeTest.java @@ -23,6 +23,7 @@ import org.thingsboard.rule.engine.metadata.TbGetAttributesNodeConfiguration; import org.thingsboard.rule.engine.util.TbMsgSource; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.RuleChainId; +import org.thingsboard.server.common.data.rule.DebugStrategy; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainMetaData; import org.thingsboard.server.common.data.rule.RuleChainType; @@ -228,7 +229,7 @@ public class RuleChainEdgeTest extends AbstractEdgeTest { // update metadata for root rule chain edgeImitator.expectMessageAmount(1); - metaData.getNodes().forEach(n -> n.setDebugMode(true)); + metaData.getNodes().forEach(n -> n.setDebugStrategy(DebugStrategy.ALL_EVENTS)); doPost("/api/ruleChain/metadata", metaData, RuleChainMetaData.class); Assert.assertTrue(edgeImitator.waitForMessages()); ruleChainUpdateMsgOpt = edgeImitator.findMessageByType(RuleChainUpdateMsg.class); diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java index 9e36fed677..3af4c012e0 100644 --- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java @@ -43,6 +43,7 @@ import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.rule.DebugStrategy; import org.thingsboard.server.common.data.rule.NodeConnectionInfo; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainMetaData; @@ -142,7 +143,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule ruleNode1.setName("Simple Rule Node 1"); ruleNode1.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName()); ruleNode1.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version()); - ruleNode1.setDebugMode(true); + ruleNode1.setDebugStrategy(DebugStrategy.ALL_EVENTS); TbGetAttributesNodeConfiguration configuration1 = new TbGetAttributesNodeConfiguration(); configuration1.setFetchTo(TbMsgSource.METADATA); configuration1.setServerAttributeNames(Collections.singletonList("serverAttributeKey1")); @@ -152,7 +153,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule ruleNode2.setName("Simple Rule Node 2"); ruleNode2.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName()); ruleNode2.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version()); - ruleNode2.setDebugMode(true); + ruleNode2.setDebugStrategy(DebugStrategy.ALL_EVENTS); TbGetAttributesNodeConfiguration configuration2 = new TbGetAttributesNodeConfiguration(); configuration2.setFetchTo(TbMsgSource.METADATA); configuration2.setServerAttributeNames(Collections.singletonList("serverAttributeKey2")); @@ -248,7 +249,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule ruleNode1.setName("Simple Rule Node 1"); ruleNode1.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName()); ruleNode1.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version()); - ruleNode1.setDebugMode(true); + ruleNode1.setDebugStrategy(DebugStrategy.ALL_EVENTS); TbGetAttributesNodeConfiguration configuration1 = new TbGetAttributesNodeConfiguration(); configuration1.setFetchTo(TbMsgSource.METADATA); configuration1.setServerAttributeNames(Collections.singletonList("serverAttributeKey1")); @@ -257,7 +258,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule RuleNode ruleNode12 = new RuleNode(); ruleNode12.setName("Simple Rule Node 1"); ruleNode12.setType(org.thingsboard.rule.engine.flow.TbRuleChainInputNode.class.getName()); - ruleNode12.setDebugMode(true); + ruleNode12.setDebugStrategy(DebugStrategy.ALL_EVENTS); TbRuleChainInputNodeConfiguration configuration12 = new TbRuleChainInputNodeConfiguration(); configuration12.setRuleChainId(secondaryRuleChain.getId().getId().toString()); ruleNode12.setConfiguration(JacksonUtil.valueToTree(configuration12)); @@ -282,7 +283,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule ruleNode2.setName("Simple Rule Node 2"); ruleNode2.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName()); ruleNode2.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version()); - ruleNode2.setDebugMode(true); + ruleNode2.setDebugStrategy(DebugStrategy.ALL_EVENTS); TbGetAttributesNodeConfiguration configuration2 = new TbGetAttributesNodeConfiguration(); configuration2.setFetchTo(TbMsgSource.METADATA); configuration2.setServerAttributeNames(Collections.singletonList("serverAttributeKey2")); diff --git a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java index 4c4e887644..dd148748ca 100644 --- a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java @@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.event.EventType; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.data.rule.DebugStrategy; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainMetaData; import org.thingsboard.server.common.data.rule.RuleNode; @@ -97,7 +98,7 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac ruleNode.setName("Simple Rule Node"); ruleNode.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName()); ruleNode.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version()); - ruleNode.setDebugMode(true); + ruleNode.setDebugStrategy(DebugStrategy.ALL_EVENTS); TbGetAttributesNodeConfiguration configuration = new TbGetAttributesNodeConfiguration(); configuration.setFetchTo(TbMsgSource.METADATA); configuration.setServerAttributeNames(Collections.singletonList("serverAttributeKey")); diff --git a/application/src/test/java/org/thingsboard/server/service/housekeeper/HousekeeperServiceTest.java b/application/src/test/java/org/thingsboard/server/service/housekeeper/HousekeeperServiceTest.java index d3b1916def..752624fa51 100644 --- a/application/src/test/java/org/thingsboard/server/service/housekeeper/HousekeeperServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/housekeeper/HousekeeperServiceTest.java @@ -58,6 +58,7 @@ import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; +import org.thingsboard.server.common.data.rule.DebugStrategy; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainMetaData; import org.thingsboard.server.common.data.rule.RuleChainType; @@ -462,7 +463,7 @@ public class HousekeeperServiceTest extends AbstractControllerTest { ruleNode1.setName("Simple Rule Node 1"); ruleNode1.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName()); ruleNode1.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version()); - ruleNode1.setDebugMode(true); + ruleNode1.setDebugStrategy(DebugStrategy.ALL_EVENTS); TbGetAttributesNodeConfiguration configuration1 = new TbGetAttributesNodeConfiguration(); configuration1.setServerAttributeNames(Collections.singletonList("serverAttributeKey1")); ruleNode1.setConfiguration(JacksonUtil.valueToTree(configuration1)); @@ -471,7 +472,7 @@ public class HousekeeperServiceTest extends AbstractControllerTest { ruleNode2.setName("Simple Rule Node 2"); ruleNode2.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName()); ruleNode2.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version()); - ruleNode2.setDebugMode(true); + ruleNode2.setDebugStrategy(DebugStrategy.ALL_EVENTS); TbGetAttributesNodeConfiguration configuration2 = new TbGetAttributesNodeConfiguration(); configuration2.setServerAttributeNames(Collections.singletonList("serverAttributeKey2")); ruleNode2.setConfiguration(JacksonUtil.valueToTree(configuration2)); diff --git a/application/src/test/java/org/thingsboard/server/service/sync/ie/ExportImportServiceSqlTest.java b/application/src/test/java/org/thingsboard/server/service/sync/ie/ExportImportServiceSqlTest.java index 55f15516f4..06bc670804 100644 --- a/application/src/test/java/org/thingsboard/server/service/sync/ie/ExportImportServiceSqlTest.java +++ b/application/src/test/java/org/thingsboard/server/service/sync/ie/ExportImportServiceSqlTest.java @@ -65,6 +65,7 @@ import org.thingsboard.server.common.data.ota.OtaPackageType; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; +import org.thingsboard.server.common.data.rule.DebugStrategy; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainMetaData; import org.thingsboard.server.common.data.rule.RuleChainType; @@ -471,7 +472,7 @@ public class ExportImportServiceSqlTest extends AbstractControllerTest { RuleNode ruleNode1 = new RuleNode(); ruleNode1.setName("Generator 1"); ruleNode1.setType(TbMsgGeneratorNode.class.getName()); - ruleNode1.setDebugMode(true); + ruleNode1.setDebugStrategy(DebugStrategy.ALL_EVENTS); TbMsgGeneratorNodeConfiguration configuration1 = new TbMsgGeneratorNodeConfiguration(); configuration1.setOriginatorType(originatorId.getEntityType()); configuration1.setOriginatorId(originatorId.getId().toString()); @@ -481,7 +482,7 @@ public class ExportImportServiceSqlTest extends AbstractControllerTest { ruleNode2.setName("Simple Rule Node 2"); ruleNode2.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName()); ruleNode2.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version()); - ruleNode2.setDebugMode(true); + ruleNode2.setDebugStrategy(DebugStrategy.ALL_EVENTS); TbGetAttributesNodeConfiguration configuration2 = new TbGetAttributesNodeConfiguration(); configuration2.setServerAttributeNames(Collections.singletonList("serverAttributeKey2")); ruleNode2.setConfiguration(JacksonUtil.valueToTree(configuration2)); @@ -510,7 +511,7 @@ public class ExportImportServiceSqlTest extends AbstractControllerTest { ruleNode1.setName("Simple Rule Node 1"); ruleNode1.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName()); ruleNode1.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version()); - ruleNode1.setDebugMode(true); + ruleNode1.setDebugStrategy(DebugStrategy.ALL_EVENTS); TbGetAttributesNodeConfiguration configuration1 = new TbGetAttributesNodeConfiguration(); configuration1.setServerAttributeNames(Collections.singletonList("serverAttributeKey1")); ruleNode1.setConfiguration(JacksonUtil.valueToTree(configuration1)); @@ -519,7 +520,7 @@ public class ExportImportServiceSqlTest extends AbstractControllerTest { ruleNode2.setName("Simple Rule Node 2"); ruleNode2.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName()); ruleNode2.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version()); - ruleNode2.setDebugMode(true); + ruleNode2.setDebugStrategy(DebugStrategy.ALL_EVENTS); TbGetAttributesNodeConfiguration configuration2 = new TbGetAttributesNodeConfiguration(); configuration2.setServerAttributeNames(Collections.singletonList("serverAttributeKey2")); ruleNode2.setConfiguration(JacksonUtil.valueToTree(configuration2)); diff --git a/application/src/test/java/org/thingsboard/server/service/sync/vc/VersionControlTest.java b/application/src/test/java/org/thingsboard/server/service/sync/vc/VersionControlTest.java index 2e01d700a9..21f6363f5f 100644 --- a/application/src/test/java/org/thingsboard/server/service/sync/vc/VersionControlTest.java +++ b/application/src/test/java/org/thingsboard/server/service/sync/vc/VersionControlTest.java @@ -65,6 +65,7 @@ import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; +import org.thingsboard.server.common.data.rule.DebugStrategy; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainMetaData; import org.thingsboard.server.common.data.rule.RuleChainType; @@ -870,7 +871,7 @@ public class VersionControlTest extends AbstractControllerTest { RuleNode ruleNode1 = new RuleNode(); ruleNode1.setName("Generator 1"); ruleNode1.setType(TbMsgGeneratorNode.class.getName()); - ruleNode1.setDebugMode(true); + ruleNode1.setDebugStrategy(DebugStrategy.ALL_EVENTS); TbMsgGeneratorNodeConfiguration configuration1 = new TbMsgGeneratorNodeConfiguration(); configuration1.setOriginatorType(originatorId.getEntityType()); configuration1.setOriginatorId(originatorId.getId().toString()); @@ -880,7 +881,7 @@ public class VersionControlTest extends AbstractControllerTest { ruleNode2.setName("Simple Rule Node 2"); ruleNode2.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName()); ruleNode2.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version()); - ruleNode2.setDebugMode(true); + ruleNode2.setDebugStrategy(DebugStrategy.ALL_EVENTS); TbGetAttributesNodeConfiguration configuration2 = new TbGetAttributesNodeConfiguration(); configuration2.setServerAttributeNames(Collections.singletonList("serverAttributeKey2")); ruleNode2.setConfiguration(JacksonUtil.valueToTree(configuration2)); @@ -908,7 +909,7 @@ public class VersionControlTest extends AbstractControllerTest { ruleNode1.setName("Simple Rule Node 1"); ruleNode1.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName()); ruleNode1.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version()); - ruleNode1.setDebugMode(true); + ruleNode1.setDebugStrategy(DebugStrategy.ALL_EVENTS); TbGetAttributesNodeConfiguration configuration1 = new TbGetAttributesNodeConfiguration(); configuration1.setServerAttributeNames(Collections.singletonList("serverAttributeKey1")); ruleNode1.setConfiguration(JacksonUtil.valueToTree(configuration1)); @@ -917,7 +918,7 @@ public class VersionControlTest extends AbstractControllerTest { ruleNode2.setName("Simple Rule Node 2"); ruleNode2.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName()); ruleNode2.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version()); - ruleNode2.setDebugMode(true); + ruleNode2.setDebugStrategy(DebugStrategy.ALL_EVENTS); TbGetAttributesNodeConfiguration configuration2 = new TbGetAttributesNodeConfiguration(); configuration2.setServerAttributeNames(Collections.singletonList("serverAttributeKey2")); ruleNode2.setConfiguration(JacksonUtil.valueToTree(configuration2)); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/msg/TbNodeConnectionType.java b/common/data/src/main/java/org/thingsboard/server/common/data/msg/TbNodeConnectionType.java index 2f466c76b0..a6d56ec221 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/msg/TbNodeConnectionType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/msg/TbNodeConnectionType.java @@ -23,9 +23,13 @@ public final class TbNodeConnectionType { public static final String SUCCESS = "Success"; public static final String FAILURE = "Failure"; + public static final String ACK = "ACK"; + public static final String TRUE = "True"; public static final String FALSE = "False"; public static final String OTHER = "Other"; + public static final String TO_ROOT_RULE_CHAIN = "To Root Rule Chain"; + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rule/DebugStrategy.java b/common/data/src/main/java/org/thingsboard/server/common/data/rule/DebugStrategy.java new file mode 100644 index 0000000000..333ee9c670 --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/rule/DebugStrategy.java @@ -0,0 +1,72 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.rule; + +import lombok.Getter; +import org.thingsboard.server.common.data.msg.TbNodeConnectionType; + +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@Getter +public enum DebugStrategy { + DISABLED(0), ALL_EVENTS(1), ONLY_FAILURE_EVENTS(2); + + private final int protoNumber; + + DebugStrategy(int protoNumber) { + this.protoNumber = protoNumber; + } + + public boolean shouldPersistDebugInput(long lastUpdateTs, long msgTs, int debugModeDurationMinutes) { + return isAllEventsStrategyAndMsgTsWithinDebugDuration(lastUpdateTs, msgTs, debugModeDurationMinutes); + } + + public boolean shouldPersistDebugOutputForAllEvents(long lastUpdateTs, long msgTs, int debugModeDurationMinutes) { + return isAllEventsStrategyAndMsgTsWithinDebugDuration(lastUpdateTs, msgTs, debugModeDurationMinutes); + } + + public boolean shouldPersistDebugForFailureEventOnly(long lastUpdateTs, long msgTs, int debugModeDurationMinutes) { + return shouldPersistDebugOutputForAllEvents(lastUpdateTs, msgTs, debugModeDurationMinutes) || isFailureOnlyStrategy(); + } + + public boolean shouldPersistDebugForFailureEventOnly(Set nodeConnections) { + return isFailureOnlyStrategy() && nodeConnections.contains(TbNodeConnectionType.FAILURE); + } + + public boolean shouldPersistDebugForFailureEventOnly(String nodeConnection) { + return isFailureOnlyStrategy() && TbNodeConnectionType.FAILURE.equals(nodeConnection); + } + + private boolean isFailureOnlyStrategy() { + return DebugStrategy.ONLY_FAILURE_EVENTS.equals(this); + } + + private boolean isAllEventsStrategyAndMsgTsWithinDebugDuration(long lastUpdateTs, long msgTs, int debugModeDurationMinutes) { + if (!DebugStrategy.ALL_EVENTS.equals(this)) { + return false; + } + return isMsgTsWithinDebugDuration(lastUpdateTs, msgTs, debugModeDurationMinutes); + } + + private boolean isMsgTsWithinDebugDuration(long lastUpdateTs, long msgCreationTs, int debugModeDurationMinutes) { + if (debugModeDurationMinutes <= 0) { + return true; + } + return msgCreationTs < lastUpdateTs + TimeUnit.MINUTES.toMillis(debugModeDurationMinutes); + } + +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java index 542debcfdd..c147da1359 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java @@ -45,15 +45,17 @@ public class RuleNode extends BaseDataWithAdditionalInfo implements @Length(fieldName = "name") @Schema(description = "User defined name of the rule node. Used on UI and for logging. ", example = "Process sensor reading") private String name; - @Schema(description = "Enable/disable debug. ", example = "false") - private boolean debugMode; + @Schema(description = "Timestamp of the last rule node update.") + private long lastUpdateTs; + @Schema(description = "Debug strategy. ", example = "ALL_EVENTS") + private DebugStrategy debugStrategy; @Schema(description = "Enable/disable singleton mode. ", example = "false") private boolean singletonMode; @Schema(description = "Queue name. ", example = "Main") private String queueName; @Schema(description = "Version of rule node configuration. ", example = "0") private int configurationVersion; - @Schema(description = "JSON with the rule node configuration. Structure depends on the rule node implementation.", implementation = com.fasterxml.jackson.databind.JsonNode.class) + @Schema(description = "JSON with the rule node configuration. Structure depends on the rule node implementation.", implementation = JsonNode.class) private transient JsonNode configuration; @JsonIgnore private byte[] configurationBytes; @@ -73,7 +75,8 @@ public class RuleNode extends BaseDataWithAdditionalInfo implements this.ruleChainId = ruleNode.getRuleChainId(); this.type = ruleNode.getType(); this.name = ruleNode.getName(); - this.debugMode = ruleNode.isDebugMode(); + this.lastUpdateTs = ruleNode.getLastUpdateTs(); + this.debugStrategy = ruleNode.getDebugStrategy(); this.singletonMode = ruleNode.isSingletonMode(); this.setConfiguration(ruleNode.getConfiguration()); this.externalId = ruleNode.getExternalId(); @@ -84,6 +87,10 @@ public class RuleNode extends BaseDataWithAdditionalInfo implements return name; } + public DebugStrategy getDebugStrategy() { + return debugStrategy == null ? DebugStrategy.DISABLED : debugStrategy; + } + public JsonNode getConfiguration() { return BaseDataWithAdditionalInfo.getJson(() -> configuration, () -> configurationBytes); } @@ -93,9 +100,9 @@ public class RuleNode extends BaseDataWithAdditionalInfo implements } @Schema(description = "JSON object with the Rule Node Id. " + - "Specify this field to update the Rule Node. " + - "Referencing non-existing Rule Node Id will cause error. " + - "Omit this field to create new rule node.") + "Specify this field to update the Rule Node. " + + "Referencing non-existing Rule Node Id will cause error. " + + "Omit this field to create new rule node.") @Override public RuleNodeId getId() { return super.getId(); @@ -107,10 +114,9 @@ public class RuleNode extends BaseDataWithAdditionalInfo implements return super.getCreatedTime(); } - @Schema(description = "Additional parameters of the rule node. Contains 'layoutX' and 'layoutY' properties for visualization.", implementation = com.fasterxml.jackson.databind.JsonNode.class) + @Schema(description = "Additional parameters of the rule node. Contains 'layoutX' and 'layoutY' properties for visualization.", implementation = JsonNode.class) @Override public JsonNode getAdditionalInfo() { return super.getAdditionalInfo(); } - } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java index 51738c75be..ff5c0ea426 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java @@ -24,6 +24,8 @@ import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.TenantProfileType; +import java.io.Serial; + @Schema @AllArgsConstructor @NoArgsConstructor @@ -31,6 +33,7 @@ import org.thingsboard.server.common.data.TenantProfileType; @Data public class DefaultTenantProfileConfiguration implements TenantProfileConfiguration { + @Serial private static final long serialVersionUID = -7134932690332578595L; private long maxDevices; @@ -91,6 +94,8 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura private long maxDPStorageDays; @Schema(example = "50") private int maxRuleNodeExecutionsPerMessage; + @Schema(example = "15") + private int maxRuleNodeDebugDurationMinutes; @Schema(example = "0") private long maxEmails; @Schema(example = "true") @@ -197,4 +202,9 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura public int getMaxRuleNodeExecsPerMessage() { return maxRuleNodeExecutionsPerMessage; } + + @Override + public int getMaxRuleNodeDebugModeDurationMinutes(int systemMaxRuleNodeDebugModeDurationMinutes) { + return Math.min(systemMaxRuleNodeDebugModeDurationMinutes, maxRuleNodeDebugDurationMinutes); + } } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/TenantProfileConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/TenantProfileConfiguration.java index 0662765455..af36a15c4f 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/TenantProfileConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/TenantProfileConfiguration.java @@ -51,4 +51,7 @@ public interface TenantProfileConfiguration extends Serializable { @JsonIgnore int getMaxRuleNodeExecsPerMessage(); + @JsonIgnore + int getMaxRuleNodeDebugModeDurationMinutes(int systemMaxRuleNodeDebugModeDurationMinutes); + } diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index 5fb51de2ab..f5765f3103 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -162,17 +162,25 @@ message RuleChainMetadataUpdateMsg { string entity = 8; } +enum DebugStrategy { + DISABLED = 0; + ALL_EVENTS = 1; + ONLY_FAILURE_EVENTS = 2; +} + message RuleNodeProto { option deprecated = true; int64 idMSB = 1; int64 idLSB = 2; string type = 3; string name = 4; - bool debugMode = 5; + bool debugMode = 5 [deprecated = true]; string configuration = 6; string additionalInfo = 7; bool singletonMode = 8; int32 configurationVersion = 9; + int64 lastUpdateTs = 10; + DebugStrategy debugStrategy = 11; } message NodeConnectionInfoProto { diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java index b920160cc0..a7da9bf516 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java @@ -397,6 +397,7 @@ public class ModelConstants { public static final String EVENT_MESSAGE_COLUMN_NAME = "e_message"; public static final String DEBUG_MODE = "debug_mode"; + public static final String DEBUG_STRATEGY = "debug_strategy"; public static final String SINGLETON_MODE = "singleton_mode"; public static final String QUEUE_NAME = "queue_name"; diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeEntity.java index bdba4549bb..278fde0d6d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeEntity.java @@ -19,11 +19,14 @@ import com.fasterxml.jackson.databind.JsonNode; import jakarta.persistence.Column; import jakarta.persistence.Convert; import jakarta.persistence.Entity; +import jakarta.persistence.EnumType; +import jakarta.persistence.Enumerated; import jakarta.persistence.Table; import lombok.Data; import lombok.EqualsAndHashCode; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleNodeId; +import org.thingsboard.server.common.data.rule.DebugStrategy; import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.model.BaseSqlEntity; @@ -58,8 +61,12 @@ public class RuleNodeEntity extends BaseSqlEntity { @Column(name = ModelConstants.ADDITIONAL_INFO_PROPERTY) private JsonNode additionalInfo; - @Column(name = ModelConstants.DEBUG_MODE) - private boolean debugMode; + @Column(name = ModelConstants.LAST_UPDATE_TS_COLUMN) + private long lastUpdateTs; + + @Enumerated(EnumType.STRING) + @Column(name = ModelConstants.DEBUG_STRATEGY) + private DebugStrategy debugStrategy; @Column(name = ModelConstants.SINGLETON_MODE) private boolean singletonMode; @@ -83,7 +90,8 @@ public class RuleNodeEntity extends BaseSqlEntity { } this.type = ruleNode.getType(); this.name = ruleNode.getName(); - this.debugMode = ruleNode.isDebugMode(); + this.lastUpdateTs = ruleNode.getLastUpdateTs(); + this.debugStrategy = ruleNode.getDebugStrategy(); this.singletonMode = ruleNode.isSingletonMode(); this.queueName = ruleNode.getQueueName(); this.configurationVersion = ruleNode.getConfigurationVersion(); @@ -103,7 +111,8 @@ public class RuleNodeEntity extends BaseSqlEntity { } ruleNode.setType(type); ruleNode.setName(name); - ruleNode.setDebugMode(debugMode); + ruleNode.setLastUpdateTs(lastUpdateTs); + ruleNode.setDebugStrategy(debugStrategy); ruleNode.setSingletonMode(singletonMode); ruleNode.setQueueName(queueName); ruleNode.setConfigurationVersion(configurationVersion); diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java index 6f34437e0d..f141ef627e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java @@ -44,6 +44,7 @@ import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.plugin.ComponentClusteringMode; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; +import org.thingsboard.server.common.data.rule.DebugStrategy; import org.thingsboard.server.common.data.rule.NodeConnectionInfo; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainConnectionInfo; @@ -214,9 +215,11 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC } RuleChainId ruleChainId = ruleChain.getId(); if (nodes != null) { + long lastUpdateTs = System.currentTimeMillis(); for (RuleNode node : toAddOrUpdate) { node.setRuleChainId(ruleChainId); node = ruleNodeUpdater.apply(node); + node.setLastUpdateTs(lastUpdateTs); RuleChainDataValidator.validateRuleNode(node); RuleNode savedNode = ruleNodeDao.save(tenantId, node); relations.add(new EntityRelation(ruleChainMetaData.getRuleChainId(), savedNode.getId(), @@ -261,7 +264,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC layout.remove("description"); layout.remove("ruleChainNodeId"); targetNode.setAdditionalInfo(layout); - targetNode.setDebugMode(false); + targetNode.setDebugStrategy(DebugStrategy.DISABLED); targetNode = ruleNodeDao.save(tenantId, targetNode); EntityRelation sourceRuleChainToRuleNode = new EntityRelation(); diff --git a/dao/src/main/resources/sql/schema-entities.sql b/dao/src/main/resources/sql/schema-entities.sql index 9c95f385f8..9772736191 100644 --- a/dao/src/main/resources/sql/schema-entities.sql +++ b/dao/src/main/resources/sql/schema-entities.sql @@ -193,7 +193,8 @@ CREATE TABLE IF NOT EXISTS rule_node ( configuration varchar(10000000), type varchar(255), name varchar(255), - debug_mode boolean, + last_update_ts bigint NOT NULL, + debug_strategy varchar(32) DEFAULT 'DISABLED', singleton_mode boolean, queue_name varchar(255), external_id uuid