diff --git a/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java index f1575f8a1c..a504c287b8 100644 --- a/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java @@ -60,6 +60,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.function.Consumer; @@ -590,8 +591,7 @@ public class JsonConverter { public static Map> convertToSortedTelemetry(JsonElement jsonElement, long systemTs) throws JsonSyntaxException { - JsonElement timeseriesElement = jsonElement.isJsonObject() ? jsonElement.getAsJsonObject().get("timeseries") : null; - return convertToTelemetry(timeseriesElement != null ? timeseriesElement : jsonElement, systemTs, true); + return convertToTelemetry(jsonElement, systemTs, true); } public static Map> convertToTelemetry(JsonElement jsonElement, long systemTs, boolean sorted) throws diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java index bbce5e5dfe..193c179df2 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java @@ -15,6 +15,7 @@ */ package org.thingsboard.rule.engine.profile; +import com.google.gson.JsonElement; import com.google.gson.JsonParser; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.JacksonUtil; @@ -49,6 +50,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -149,8 +151,10 @@ class DeviceState { latestValues = fetchLatestValues(ctx, deviceId); } boolean stateChanged = false; - if (msg.isTypeOf(POST_TELEMETRY_REQUEST) || msg.isTypeOf(TIMESERIES_UPDATED)) { - stateChanged = processTelemetry(ctx, msg); + if (msg.isTypeOf(POST_TELEMETRY_REQUEST)) { + stateChanged = processTelemetryRequest(ctx, msg); + } else if (msg.isTypeOf(TIMESERIES_UPDATED)) { + stateChanged = processTelemetryUpdatedNotification(ctx, msg); } else if (msg.isTypeOf(POST_ATTRIBUTES_REQUEST)) { stateChanged = processAttributesUpdateRequest(ctx, msg); } else if (msg.isTypeOneOf(ACTIVITY_EVENT, INACTIVITY_EVENT)) { @@ -180,7 +184,7 @@ class DeviceState { private boolean processDeviceActivityEvent(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { String scope = msg.getMetaData().getValue(DataConstants.SCOPE); if (StringUtils.isEmpty(scope)) { - return processTelemetry(ctx, msg); + return processTelemetryRequest(ctx, msg); } else { return processAttributes(ctx, msg, scope); } @@ -267,9 +271,22 @@ class DeviceState { return stateChanged; } - protected boolean processTelemetry(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { + protected boolean processTelemetryRequest(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { + return processTelemetryUpdate(ctx, msg, JsonParser.parseString(msg.getData())); + } + + protected boolean processTelemetryUpdatedNotification(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { + JsonElement msgData = JsonParser.parseString(msg.getData()); + JsonElement telemetryData = Optional.ofNullable(JsonParser.parseString(msg.getData())) + .filter(JsonElement::isJsonObject) + .map(e -> e.getAsJsonObject().get("timeseries")) + .orElse(msgData); + return processTelemetryUpdate(ctx, msg, telemetryData); + } + + private boolean processTelemetryUpdate(TbContext ctx, TbMsg msg, JsonElement telemetryData) throws ExecutionException, InterruptedException { boolean stateChanged = false; - Map> tsKvMap = JsonConverter.convertToSortedTelemetry(JsonParser.parseString(msg.getData()), msg.getMetaDataTs()); + Map> tsKvMap = JsonConverter.convertToSortedTelemetry(telemetryData, msg.getMetaDataTs()); // iterate over data by ts (ASC order). for (Map.Entry> entry : tsKvMap.entrySet()) { Long ts = entry.getKey();