diff --git a/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java b/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java index 3b9af2f0f2..39692922c1 100644 --- a/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java +++ b/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java @@ -400,7 +400,7 @@ public class RuleChainController extends BaseController { output = Boolean.toString(result); break; case "switch": - Set states = engine.executeSwitch(inMsg); + Set states = engine.executeSwitchAsync(inMsg).get(TIMEOUT, TimeUnit.SECONDS); output = objectMapper.writeValueAsString(states); break; case "json": diff --git a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java index c688d6978d..e9cf8d621f 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java @@ -195,9 +195,7 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S }, MoreExecutors.directExecutor()); } - @Override - public Set executeSwitch(TbMsg msg) throws ScriptException { - JsonNode result = executeScript(msg); + Set executeSwitchPostProcessFunction(JsonNode result) throws ScriptException { if (result.isTextual()) { return Collections.singleton(result.asText()); } else if (result.isArray()) { @@ -217,6 +215,14 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S } } + @Override + public ListenableFuture> executeSwitchAsync(TbMsg msg) { + log.trace("execute switch async, msg {}", msg); + return Futures.transformAsync(executeScriptAsync(msg), + result -> Futures.immediateFuture(executeSwitchPostProcessFunction(result)), + MoreExecutors.directExecutor()); //usually runs in a callbackExecutor + } + private JsonNode executeScript(TbMsg msg) throws ScriptException { try { String[] inArgs = prepareArgs(msg); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ScriptEngine.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ScriptEngine.java index a5dcf535e4..da9bd93f2b 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ScriptEngine.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ScriptEngine.java @@ -35,7 +35,7 @@ public interface ScriptEngine { ListenableFuture executeFilterAsync(TbMsg msg); - Set executeSwitch(TbMsg msg) throws ScriptException; + ListenableFuture> executeSwitchAsync(TbMsg msg); JsonNode executeJson(TbMsg msg) throws ScriptException; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java index f677b90158..5af4ccb29d 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java @@ -15,7 +15,11 @@ */ package org.thingsboard.rule.engine.filter; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; +import org.checkerframework.checker.nullness.qual.Nullable; import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.ScriptEngine; @@ -58,17 +62,20 @@ public class TbJsSwitchNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { - ListeningExecutor jsExecutor = ctx.getJsExecutor(); ctx.logJsEvalRequest(); - withCallback(jsExecutor.executeAsync(() -> jsEngine.executeSwitch(msg)), - result -> { - ctx.logJsEvalResponse(); - processSwitch(ctx, msg, result); - }, - t -> { - ctx.logJsEvalFailure(); - ctx.tellFailure(msg, t); - }, ctx.getDbCallbackExecutor()); + Futures.addCallback(jsEngine.executeSwitchAsync(msg), new FutureCallback>() { + @Override + public void onSuccess(@Nullable Set result) { + ctx.logJsEvalResponse(); + processSwitch(ctx, msg, result); + } + + @Override + public void onFailure(Throwable t) { + ctx.logJsEvalFailure(); + ctx.tellFailure(msg, t); + } + }, MoreExecutors.directExecutor()); //usually runs in a callbackExecutor } private void processSwitch(TbContext ctx, TbMsg msg, Set nextRelations) { diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java index cfacf4cb85..fc2e5aa041 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java @@ -73,7 +73,7 @@ public class TbJsSwitchNodeTest { TbMsg msg = TbMsg.newMsg( "USER", null, metaData, TbMsgDataType.JSON, rawJson, ruleChainId, ruleNodeId); mockJsExecutor(); - when(scriptEngine.executeSwitch(msg)).thenReturn(Sets.newHashSet("one", "three")); + when(scriptEngine.executeSwitchAsync(msg)).thenReturn(Futures.immediateFuture(Sets.newHashSet("one", "three"))); node.onMsg(ctx, msg); verify(ctx).getJsExecutor();