|
|
|
@ -15,6 +15,7 @@ |
|
|
|
*/ |
|
|
|
package org.thingsboard.rule.engine.profile; |
|
|
|
|
|
|
|
import com.google.gson.JsonElement; |
|
|
|
import com.google.gson.JsonParser; |
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.thingsboard.common.util.JacksonUtil; |
|
|
|
@ -49,6 +50,7 @@ import java.util.HashMap; |
|
|
|
import java.util.HashSet; |
|
|
|
import java.util.List; |
|
|
|
import java.util.Map; |
|
|
|
import java.util.Optional; |
|
|
|
import java.util.Set; |
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
import java.util.concurrent.ConcurrentMap; |
|
|
|
@ -149,8 +151,10 @@ class DeviceState { |
|
|
|
latestValues = fetchLatestValues(ctx, deviceId); |
|
|
|
} |
|
|
|
boolean stateChanged = false; |
|
|
|
if (msg.isTypeOf(POST_TELEMETRY_REQUEST) || msg.isTypeOf(TIMESERIES_UPDATED)) { |
|
|
|
stateChanged = processTelemetry(ctx, msg); |
|
|
|
if (msg.isTypeOf(POST_TELEMETRY_REQUEST)) { |
|
|
|
stateChanged = processTelemetryRequest(ctx, msg); |
|
|
|
} else if (msg.isTypeOf(TIMESERIES_UPDATED)) { |
|
|
|
stateChanged = processTelemetryUpdatedNotification(ctx, msg); |
|
|
|
} else if (msg.isTypeOf(POST_ATTRIBUTES_REQUEST)) { |
|
|
|
stateChanged = processAttributesUpdateRequest(ctx, msg); |
|
|
|
} else if (msg.isTypeOneOf(ACTIVITY_EVENT, INACTIVITY_EVENT)) { |
|
|
|
@ -180,7 +184,7 @@ class DeviceState { |
|
|
|
private boolean processDeviceActivityEvent(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { |
|
|
|
String scope = msg.getMetaData().getValue(DataConstants.SCOPE); |
|
|
|
if (StringUtils.isEmpty(scope)) { |
|
|
|
return processTelemetry(ctx, msg); |
|
|
|
return processTelemetryRequest(ctx, msg); |
|
|
|
} else { |
|
|
|
return processAttributes(ctx, msg, scope); |
|
|
|
} |
|
|
|
@ -267,9 +271,22 @@ class DeviceState { |
|
|
|
return stateChanged; |
|
|
|
} |
|
|
|
|
|
|
|
protected boolean processTelemetry(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { |
|
|
|
protected boolean processTelemetryRequest(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { |
|
|
|
return processTelemetryUpdate(ctx, msg, JsonParser.parseString(msg.getData())); |
|
|
|
} |
|
|
|
|
|
|
|
protected boolean processTelemetryUpdatedNotification(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { |
|
|
|
JsonElement msgData = JsonParser.parseString(msg.getData()); |
|
|
|
JsonElement telemetryData = Optional.ofNullable(JsonParser.parseString(msg.getData())) |
|
|
|
.filter(JsonElement::isJsonObject) |
|
|
|
.map(e -> e.getAsJsonObject().get("timeseries")) |
|
|
|
.orElse(msgData); |
|
|
|
return processTelemetryUpdate(ctx, msg, telemetryData); |
|
|
|
} |
|
|
|
|
|
|
|
private boolean processTelemetryUpdate(TbContext ctx, TbMsg msg, JsonElement telemetryData) throws ExecutionException, InterruptedException { |
|
|
|
boolean stateChanged = false; |
|
|
|
Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToSortedTelemetry(JsonParser.parseString(msg.getData()), msg.getMetaDataTs()); |
|
|
|
Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToSortedTelemetry(telemetryData, msg.getMetaDataTs()); |
|
|
|
// iterate over data by ts (ASC order).
|
|
|
|
for (Map.Entry<Long, List<KvEntry>> entry : tsKvMap.entrySet()) { |
|
|
|
Long ts = entry.getKey(); |
|
|
|
|