diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNode.java index 3e2618de98..e554ad31e3 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNode.java @@ -42,6 +42,8 @@ import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import static org.thingsboard.common.util.DonAsynchron.*; + @Slf4j @RuleNode(type = ComponentType.ENRICHMENT, name = "calculate delta", relationTypes = {"Success", "Failure", "Other"}, @@ -75,53 +77,53 @@ public class CalculateDeltaNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { - if (msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())) { - JsonNode json = JacksonUtil.toJsonNode(msg.getData()); - if (!json.isObject()) { - throw new IllegalArgumentException("Message body is not an object!"); - } - String inputKey = config.getInputValueKey(); - if (json.has(inputKey)) { - DonAsynchron.withCallback(getLastValue(msg.getOriginator()), - previousData -> { - double currentValue = json.get(inputKey).asDouble(); - long currentTs = msg.getMetaDataTs(); - - if (useCache) { - cache.put(msg.getOriginator(), new ValueWithTs(currentTs, currentValue)); - } - - BigDecimal delta = BigDecimal.valueOf(previousData != null ? currentValue - previousData.value : 0.0); - - if (config.isTellFailureIfDeltaIsNegative() && delta.doubleValue() < 0) { - ctx.tellNext(msg, TbRelationTypes.FAILURE); - return; - } - - if (config.getRound() != null) { - delta = delta.setScale(config.getRound(), RoundingMode.HALF_UP); - } - - ObjectNode result = (ObjectNode) json; - if (delta.stripTrailingZeros().scale() > 0) { - result.put(config.getOutputValueKey(), delta.doubleValue()); - } else { - result.put(config.getOutputValueKey(), delta.longValueExact()); - } - - if (config.isAddPeriodBetweenMsgs()) { - long period = previousData != null ? currentTs - previousData.ts : 0; - result.put(config.getPeriodValueKey(), period); - } - ctx.tellSuccess(TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), JacksonUtil.toString(result))); - }, - t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor()); - } else { - ctx.tellNext(msg, "Other"); - } - } else { + if (!msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())) { + ctx.tellNext(msg, "Other"); + return; + } + JsonNode json = JacksonUtil.toJsonNode(msg.getData()); + if (!json.isObject()) { + throw new IllegalArgumentException("Message body is not an object!"); + } + String inputKey = config.getInputValueKey(); + if (!json.has(inputKey)) { ctx.tellNext(msg, "Other"); + return; } + withCallback(getLastValue(msg.getOriginator()), + previousData -> { + double currentValue = json.get(inputKey).asDouble(); + long currentTs = msg.getMetaDataTs(); + + if (useCache) { + cache.put(msg.getOriginator(), new ValueWithTs(currentTs, currentValue)); + } + + BigDecimal delta = BigDecimal.valueOf(previousData != null ? currentValue - previousData.value : 0.0); + + if (config.isTellFailureIfDeltaIsNegative() && delta.doubleValue() < 0) { + ctx.tellNext(msg, TbRelationTypes.FAILURE); + return; + } + + if (config.getRound() != null) { + delta = delta.setScale(config.getRound(), RoundingMode.HALF_UP); + } + + ObjectNode result = (ObjectNode) json; + if (delta.stripTrailingZeros().scale() > 0) { + result.put(config.getOutputValueKey(), delta.doubleValue()); + } else { + result.put(config.getOutputValueKey(), delta.longValueExact()); + } + + if (config.isAddPeriodBetweenMsgs()) { + long period = previousData != null ? currentTs - previousData.ts : 0; + result.put(config.getPeriodValueKey(), period); + } + ctx.tellSuccess(TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), JacksonUtil.toString(result))); + }, + t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor()); } @Override