From ae4a71346f862b3a71201c2d17f6265cf58f6dcf Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Mon, 24 Jul 2017 15:14:01 +0300 Subject: [PATCH 1/3] TB-73: Implementation --- .../rule/RuleActorMessageProcessor.java | 55 ++++++++++--------- .../server/dao/rule/BaseRuleService.java | 7 ++- 2 files changed, 35 insertions(+), 27 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java index 7a2b0e43a4..890ef25373 100644 --- a/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java @@ -18,6 +18,7 @@ package org.thingsboard.server.actors.rule; import java.util.*; import com.fasterxml.jackson.core.JsonProcessingException; +import org.springframework.util.StringUtils; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.plugin.RuleToPluginMsgWrapper; import org.thingsboard.server.actors.shared.ComponentMsgProcessor; @@ -113,8 +114,9 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor { } private void initAction() throws Exception { - JsonNode actionMd = ruleMd.getAction(); - action = initComponent(actionMd); + if (ruleMd.getAction() != null && !ruleMd.getAction().isNull()) { + action = initComponent(ruleMd.getAction()); + } } private void initProcessor() throws Exception { @@ -131,9 +133,11 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor { } private void fetchPluginInfo() { - PluginMetaData pluginMd = systemContext.getPluginService().findPluginByApiToken(ruleMd.getPluginToken()); - pluginTenantId = pluginMd.getTenantId(); - pluginId = pluginMd.getId(); + if (!StringUtils.isEmpty(ruleMd.getPluginToken())) { + PluginMetaData pluginMd = systemContext.getPluginService().findPluginByApiToken(ruleMd.getPluginToken()); + pluginTenantId = pluginMd.getTenantId(); + pluginId = pluginMd.getId(); + } } protected void onRuleProcessingMsg(ActorContext context, RuleProcessingMsg msg) throws RuleException { @@ -162,25 +166,26 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor { inMsgMd = new RuleProcessingMetaData(); } logger.debug("[{}] Going to convert in msg: {}", entityId, inMsg); - Optional> ruleToPluginMsgOptional = action.convert(ruleCtx, inMsg, inMsgMd); - if (ruleToPluginMsgOptional.isPresent()) { - RuleToPluginMsg ruleToPluginMsg = ruleToPluginMsgOptional.get(); - logger.debug("[{}] Device msg is converter to: {}", entityId, ruleToPluginMsg); - context.parent().tell(new RuleToPluginMsgWrapper(pluginTenantId, pluginId, tenantId, entityId, ruleToPluginMsg), context.self()); - if (action.isOneWayAction()) { - pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS); - } else { - pendingMsgMap.put(ruleToPluginMsg.getUid(), msg); - scheduleMsgWithDelay(context, new RuleToPluginTimeoutMsg(ruleToPluginMsg.getUid()), systemContext.getPluginProcessingTimeout()); + if (action != null) { + Optional> ruleToPluginMsgOptional = action.convert(ruleCtx, inMsg, inMsgMd); + if (ruleToPluginMsgOptional.isPresent()) { + RuleToPluginMsg ruleToPluginMsg = ruleToPluginMsgOptional.get(); + logger.debug("[{}] Device msg is converter to: {}", entityId, ruleToPluginMsg); + context.parent().tell(new RuleToPluginMsgWrapper(pluginTenantId, pluginId, tenantId, entityId, ruleToPluginMsg), context.self()); + if (action.isOneWayAction()) { + pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS); + } else { + pendingMsgMap.put(ruleToPluginMsg.getUid(), msg); + scheduleMsgWithDelay(context, new RuleToPluginTimeoutMsg(ruleToPluginMsg.getUid()), systemContext.getPluginProcessingTimeout()); + } } } else { logger.debug("[{}] Nothing to send to plugin: {}", entityId, pluginId); - pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_REQUEST_FROM_ACTIONS); - return; + pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS); } } - public void onPluginMsg(ActorContext context, PluginToRuleMsg msg) { + void onPluginMsg(ActorContext context, PluginToRuleMsg msg) { RuleProcessingMsg pendingMsg = pendingMsgMap.remove(msg.getUid()); if (pendingMsg != null) { ChainProcessingContext ctx = pendingMsg.getCtx(); @@ -196,7 +201,7 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor { } } - public void onTimeoutMsg(ActorContext context, RuleToPluginTimeoutMsg msg) { + void onTimeoutMsg(ActorContext context, RuleToPluginTimeoutMsg msg) { RuleProcessingMsg pendingMsg = pendingMsgMap.remove(msg.getMsgId()); if (pendingMsg != null) { logger.debug("[{}] Processing timeout detected [{}]: {}", entityId, msg.getMsgId(), pendingMsg); @@ -269,18 +274,16 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor { public void onActivate(ActorContext context) throws Exception { logger.info("[{}] Going to process onActivate rule.", entityId); this.state = ComponentLifecycleState.ACTIVE; - if (action != null) { - if (filters != null) { - filters.forEach(f -> f.resume()); - } else { - initFilters(); - } + if (filters != null) { + filters.forEach(RuleLifecycleComponent::resume); if (processor != null) { processor.resume(); } else { initProcessor(); } - action.resume(); + if (action != null) { + action.resume(); + } logger.info("[{}] Rule resumed.", entityId); } else { start(); diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java index b356c90e24..77d38b9ee1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java @@ -91,7 +91,9 @@ public class BaseRuleService extends AbstractEntityService implements RuleServic if (rule.getProcessor() != null && !rule.getProcessor().isNull()) { validateComponentJson(rule.getProcessor(), ComponentType.PROCESSOR); } - validateComponentJson(rule.getAction(), ComponentType.ACTION); + if (rule.getAction() != null && !rule.getAction().isNull()) { + validateComponentJson(rule.getAction(), ComponentType.ACTION); + } validateRuleAndPluginState(rule); return ruleDao.save(rule); } @@ -129,6 +131,9 @@ public class BaseRuleService extends AbstractEntityService implements RuleServic } private void validateRuleAndPluginState(RuleMetaData rule) { + if (org.springframework.util.StringUtils.isEmpty(rule.getPluginToken())) { + return; + } PluginMetaData pluginMd = pluginService.findPluginByApiToken(rule.getPluginToken()); if (pluginMd == null) { throw new IncorrectParameterException("Rule points to non-existent plugin!"); From d9023482d906df834a6133aea0dd304f1433269c Mon Sep 17 00:00:00 2001 From: Igor Kulikov Date: Mon, 24 Jul 2017 16:24:32 +0300 Subject: [PATCH 2/3] TB-73: Make Rule plugin and action optional. Fix rule deletion. --- .../server/actors/shared/rule/RuleManager.java | 9 ++++++--- application/src/main/resources/thingsboard.yml | 2 +- .../server/common/data/rule/RuleMetaData.java | 2 ++ ui/src/app/rule/rule-fieldset.tpl.html | 4 ++-- ui/src/app/rule/rule.directive.js | 10 ++++++---- 5 files changed, 17 insertions(+), 10 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java index db61b8155e..1e00a6de1b 100644 --- a/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java +++ b/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java @@ -72,16 +72,19 @@ public abstract class RuleManager { } public Optional update(ActorContext context, RuleId ruleId, ComponentLifecycleEvent event) { - RuleMetaData rule = null; + RuleMetaData rule; if (event != ComponentLifecycleEvent.DELETED) { rule = systemContext.getRuleService().findRuleById(ruleId); - } - if (rule == null) { + } else { rule = ruleMap.keySet().stream() .filter(r -> r.getId().equals(ruleId)) .peek(r -> r.setState(ComponentLifecycleState.SUSPENDED)) .findFirst() .orElse(null); + if (rule != null) { + ruleMap.remove(rule); + ruleActors.remove(ruleId); + } } if (rule != null) { RuleActorMetaData actorMd = ruleMap.get(rule); diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index b133f03439..7e2a7735ff 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -176,7 +176,7 @@ actors: statistics: # Enable/disable actor statistics enabled: "${ACTORS_STATISTICS_ENABLED:true}" - persist_frequency: "${ACTORS_STATISTICS_PERSIST_FREQUENCY:60000}" + persist_frequency: "${ACTORS_STATISTICS_PERSIST_FREQUENCY:3600000}" # Cache parameters cache: diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleMetaData.java b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleMetaData.java index 1451d2c1a6..0ed44d94de 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleMetaData.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleMetaData.java @@ -17,6 +17,7 @@ package org.thingsboard.server.common.data.rule; import com.fasterxml.jackson.databind.JsonNode; import lombok.Data; +import lombok.EqualsAndHashCode; import org.thingsboard.server.common.data.HasName; import org.thingsboard.server.common.data.SearchTextBased; import org.thingsboard.server.common.data.id.RuleId; @@ -24,6 +25,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.plugin.ComponentLifecycleState; @Data +@EqualsAndHashCode(callSuper = true) public class RuleMetaData extends SearchTextBased implements HasName { private static final long serialVersionUID = -5656679015122935465L; diff --git a/ui/src/app/rule/rule-fieldset.tpl.html b/ui/src/app/rule/rule-fieldset.tpl.html index 0646b3f51a..835b78344b 100644 --- a/ui/src/app/rule/rule-fieldset.tpl.html +++ b/ui/src/app/rule/rule-fieldset.tpl.html @@ -165,11 +165,11 @@
- + diff --git a/ui/src/app/rule/rule.directive.js b/ui/src/app/rule/rule.directive.js index bfd6b4c695..64502f9345 100644 --- a/ui/src/app/rule/rule.directive.js +++ b/ui/src/app/rule/rule.directive.js @@ -85,10 +85,11 @@ export default function RuleDirective($compile, $templateCache, $mdDialog, $docu if (scope.rule) { var valid = scope.rule.filters && scope.rule.filters.length > 0; scope.theForm.$setValidity('filters', valid); - valid = angular.isDefined(scope.rule.pluginToken) && scope.rule.pluginToken != null; - scope.theForm.$setValidity('plugin', valid); - valid = angular.isDefined(scope.rule.action) && scope.rule.action != null; - scope.theForm.$setValidity('action', valid); + var processorDefined = angular.isDefined(scope.rule.processor) && scope.rule.processor != null; + var pluginDefined = angular.isDefined(scope.rule.pluginToken) && scope.rule.pluginToken != null; + var pluginActionDefined = angular.isDefined(scope.rule.action) && scope.rule.action != null; + valid = processorDefined && !pluginDefined || (pluginDefined && pluginActionDefined); + scope.theForm.$setValidity('processorOrPlugin', valid); } }; @@ -160,6 +161,7 @@ export default function RuleDirective($compile, $templateCache, $mdDialog, $docu scope.$watch('rule.processor', function(newVal, prevVal) { if (scope.rule && scope.isEdit && !angular.equals(newVal, prevVal)) { scope.theForm.$setDirty(); + scope.updateValidity(); } }, true); From 2e019b52a63b15e035a199914b28363eae857c70 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Mon, 24 Jul 2017 18:47:49 +0300 Subject: [PATCH 3/3] TB-73: Implementation --- .../actors/rule/RuleActorMessageProcessor.java | 13 +++++++------ application/src/main/resources/logback.xml | 2 +- .../extensions/core/processor/AlarmProcessor.java | 4 ++-- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java index 890ef25373..5704b123cf 100644 --- a/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java @@ -174,15 +174,16 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor { context.parent().tell(new RuleToPluginMsgWrapper(pluginTenantId, pluginId, tenantId, entityId, ruleToPluginMsg), context.self()); if (action.isOneWayAction()) { pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS); + return; } else { pendingMsgMap.put(ruleToPluginMsg.getUid(), msg); scheduleMsgWithDelay(context, new RuleToPluginTimeoutMsg(ruleToPluginMsg.getUid()), systemContext.getPluginProcessingTimeout()); + return; } } - } else { - logger.debug("[{}] Nothing to send to plugin: {}", entityId, pluginId); - pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS); } + logger.debug("[{}] Nothing to send to plugin: {}", entityId, pluginId); + pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS); } void onPluginMsg(ActorContext context, PluginToRuleMsg msg) { @@ -215,13 +216,13 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor { ctx = ctx.withError(error); } if (ctx.isFailure()) { - logger.debug("[{}] Forwarding processing chain to device actor due to failure.", ctx.getInMsg().getDeviceId()); + logger.debug("[{}][{}] Forwarding processing chain to device actor due to failure.", ruleMd.getId(), ctx.getInMsg().getDeviceId()); ctx.getDeviceActor().tell(new RulesProcessedMsg(ctx), ActorRef.noSender()); } else if (!ctx.hasNext()) { - logger.debug("[{}] Forwarding processing chain to device actor due to end of chain.", ctx.getInMsg().getDeviceId()); + logger.debug("[{}][{}] Forwarding processing chain to device actor due to end of chain.", ruleMd.getId(), ctx.getInMsg().getDeviceId()); ctx.getDeviceActor().tell(new RulesProcessedMsg(ctx), ActorRef.noSender()); } else { - logger.debug("[{}] Forwarding processing chain to next rule actor.", ctx.getInMsg().getDeviceId()); + logger.debug("[{}][{}] Forwarding processing chain to next rule actor.", ruleMd.getId(), ctx.getInMsg().getDeviceId()); ChainProcessingContext nextTask = ctx.getNext(); nextTask.getCurrentActor().tell(new RuleProcessingMsg(nextTask), context.self()); } diff --git a/application/src/main/resources/logback.xml b/application/src/main/resources/logback.xml index 93ed50535c..7169894532 100644 --- a/application/src/main/resources/logback.xml +++ b/application/src/main/resources/logback.xml @@ -25,7 +25,7 @@ - + diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/processor/AlarmProcessor.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/processor/AlarmProcessor.java index 3dec45e482..e88247a2c7 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/processor/AlarmProcessor.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/processor/AlarmProcessor.java @@ -125,10 +125,10 @@ public class AlarmProcessor implements RuleProcessor