|
|
|
@ -42,6 +42,8 @@ import java.util.Collections; |
|
|
|
import java.util.Map; |
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
|
|
|
|
import static org.thingsboard.common.util.DonAsynchron.*; |
|
|
|
|
|
|
|
@Slf4j |
|
|
|
@RuleNode(type = ComponentType.ENRICHMENT, |
|
|
|
name = "calculate delta", relationTypes = {"Success", "Failure", "Other"}, |
|
|
|
@ -75,53 +77,53 @@ public class CalculateDeltaNode implements TbNode { |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onMsg(TbContext ctx, TbMsg msg) { |
|
|
|
if (msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())) { |
|
|
|
JsonNode json = JacksonUtil.toJsonNode(msg.getData()); |
|
|
|
if (!json.isObject()) { |
|
|
|
throw new IllegalArgumentException("Message body is not an object!"); |
|
|
|
} |
|
|
|
String inputKey = config.getInputValueKey(); |
|
|
|
if (json.has(inputKey)) { |
|
|
|
DonAsynchron.withCallback(getLastValue(msg.getOriginator()), |
|
|
|
previousData -> { |
|
|
|
double currentValue = json.get(inputKey).asDouble(); |
|
|
|
long currentTs = msg.getMetaDataTs(); |
|
|
|
|
|
|
|
if (useCache) { |
|
|
|
cache.put(msg.getOriginator(), new ValueWithTs(currentTs, currentValue)); |
|
|
|
} |
|
|
|
|
|
|
|
BigDecimal delta = BigDecimal.valueOf(previousData != null ? currentValue - previousData.value : 0.0); |
|
|
|
|
|
|
|
if (config.isTellFailureIfDeltaIsNegative() && delta.doubleValue() < 0) { |
|
|
|
ctx.tellNext(msg, TbRelationTypes.FAILURE); |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
if (config.getRound() != null) { |
|
|
|
delta = delta.setScale(config.getRound(), RoundingMode.HALF_UP); |
|
|
|
} |
|
|
|
|
|
|
|
ObjectNode result = (ObjectNode) json; |
|
|
|
if (delta.stripTrailingZeros().scale() > 0) { |
|
|
|
result.put(config.getOutputValueKey(), delta.doubleValue()); |
|
|
|
} else { |
|
|
|
result.put(config.getOutputValueKey(), delta.longValueExact()); |
|
|
|
} |
|
|
|
|
|
|
|
if (config.isAddPeriodBetweenMsgs()) { |
|
|
|
long period = previousData != null ? currentTs - previousData.ts : 0; |
|
|
|
result.put(config.getPeriodValueKey(), period); |
|
|
|
} |
|
|
|
ctx.tellSuccess(TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), JacksonUtil.toString(result))); |
|
|
|
}, |
|
|
|
t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor()); |
|
|
|
} else { |
|
|
|
ctx.tellNext(msg, "Other"); |
|
|
|
} |
|
|
|
} else { |
|
|
|
if (!msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())) { |
|
|
|
ctx.tellNext(msg, "Other"); |
|
|
|
return; |
|
|
|
} |
|
|
|
JsonNode json = JacksonUtil.toJsonNode(msg.getData()); |
|
|
|
if (!json.isObject()) { |
|
|
|
throw new IllegalArgumentException("Message body is not an object!"); |
|
|
|
} |
|
|
|
String inputKey = config.getInputValueKey(); |
|
|
|
if (!json.has(inputKey)) { |
|
|
|
ctx.tellNext(msg, "Other"); |
|
|
|
return; |
|
|
|
} |
|
|
|
withCallback(getLastValue(msg.getOriginator()), |
|
|
|
previousData -> { |
|
|
|
double currentValue = json.get(inputKey).asDouble(); |
|
|
|
long currentTs = msg.getMetaDataTs(); |
|
|
|
|
|
|
|
if (useCache) { |
|
|
|
cache.put(msg.getOriginator(), new ValueWithTs(currentTs, currentValue)); |
|
|
|
} |
|
|
|
|
|
|
|
BigDecimal delta = BigDecimal.valueOf(previousData != null ? currentValue - previousData.value : 0.0); |
|
|
|
|
|
|
|
if (config.isTellFailureIfDeltaIsNegative() && delta.doubleValue() < 0) { |
|
|
|
ctx.tellNext(msg, TbRelationTypes.FAILURE); |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
if (config.getRound() != null) { |
|
|
|
delta = delta.setScale(config.getRound(), RoundingMode.HALF_UP); |
|
|
|
} |
|
|
|
|
|
|
|
ObjectNode result = (ObjectNode) json; |
|
|
|
if (delta.stripTrailingZeros().scale() > 0) { |
|
|
|
result.put(config.getOutputValueKey(), delta.doubleValue()); |
|
|
|
} else { |
|
|
|
result.put(config.getOutputValueKey(), delta.longValueExact()); |
|
|
|
} |
|
|
|
|
|
|
|
if (config.isAddPeriodBetweenMsgs()) { |
|
|
|
long period = previousData != null ? currentTs - previousData.ts : 0; |
|
|
|
result.put(config.getPeriodValueKey(), period); |
|
|
|
} |
|
|
|
ctx.tellSuccess(TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), JacksonUtil.toString(result))); |
|
|
|
}, |
|
|
|
t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor()); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
|