From 4eedea076633f4e1a6aef0245564bdf1510267ae Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Wed, 15 Jan 2025 16:11:54 +0200 Subject: [PATCH] EntityDataProto: add ts to correctly handle PostAttributeMsg update from/to Edge --- .../telemetry/EntityDataMsgConstructor.java | 6 +++ .../telemetry/BaseTelemetryProcessor.java | 46 ++++++++++++---- .../server/edge/DeviceEdgeTest.java | 53 +++++++++++++++++++ common/edge-api/src/main/proto/edge.proto | 1 + .../server/common/adaptor/JsonConverter.java | 10 ++-- .../common/transport/util/JsonUtils.java | 4 +- .../engine/edge/AbstractTbMsgPushNode.java | 1 + 7 files changed, 107 insertions(+), 14 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/telemetry/EntityDataMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/telemetry/EntityDataMsgConstructor.java index 6493f97f57..f7f3cc31dd 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/telemetry/EntityDataMsgConstructor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/telemetry/EntityDataMsgConstructor.java @@ -67,6 +67,9 @@ public class EntityDataMsgConstructor { TransportProtos.PostAttributeMsg attributesUpdatedMsg = JsonConverter.convertToAttributesProto(data.getAsJsonObject("kv")); builder.setAttributesUpdatedMsg(attributesUpdatedMsg); builder.setPostAttributeScope(getScopeOfDefault(data)); + if (data.get("ts") != null && !data.get("ts").isJsonNull()) { + builder.setAttributeTs(data.getAsJsonPrimitive("ts").getAsLong()); + } } catch (Exception e) { log.warn("[{}][{}] Can't convert to AttributesUpdatedMsg proto, entityData [{}]", tenantId, entityId, entityData, e); } @@ -77,6 +80,9 @@ public class EntityDataMsgConstructor { TransportProtos.PostAttributeMsg postAttributesMsg = JsonConverter.convertToAttributesProto(data.getAsJsonObject("kv")); builder.setPostAttributesMsg(postAttributesMsg); builder.setPostAttributeScope(getScopeOfDefault(data)); + if (data.get("ts") != null && !data.get("ts").isJsonNull()) { + builder.setAttributeTs(data.getAsJsonPrimitive("ts").getAsLong()); + } } catch (Exception e) { log.warn("[{}][{}] Can't convert to PostAttributesMsg, entityData [{}]", tenantId, entityId, entityData, e); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java index 488fc3d8b9..520c7ef5ff 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java @@ -54,6 +54,7 @@ import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -79,7 +80,9 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; @Slf4j public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { @@ -114,7 +117,7 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { abstract protected String getMsgSourceKey(); - public List> processTelemetryMsg(TenantId tenantId, EntityDataProto entityData) { + public List> processTelemetryMsg(TenantId tenantId, EntityDataProto entityData) throws Exception { log.trace("[{}] processTelemetryMsg [{}]", tenantId, entityData); List> result = new ArrayList<>(); EntityId entityId = constructEntityId(entityData.getEntityType(), entityData.getEntityIdMSB(), entityData.getEntityIdLSB()); @@ -125,11 +128,13 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { CustomerId customerId = pair.getValue(); metaData.putValue(DataConstants.MSG_SOURCE_KEY, getMsgSourceKey()); if (entityData.hasPostAttributesMsg()) { - result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData)); + long ts = entityData.hasAttributeTs() ? entityData.getAttributeTs() : System.currentTimeMillis(); + result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData, ts)); } if (entityData.hasAttributesUpdatedMsg()) { metaData.putValue("scope", entityData.getPostAttributeScope()); - result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData)); + long ts = entityData.hasAttributeTs() ? entityData.getAttributeTs() : System.currentTimeMillis(); + result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData, ts)); } if (entityData.hasPostTelemetryMsg()) { result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData)); @@ -257,9 +262,13 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { return new ImmutablePair<>(queueName, ruleChainId); } - private ListenableFuture processPostAttributes(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { + private ListenableFuture processPostAttributes(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData, long ts) throws Exception { SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); + + List attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts)); + filterAttributesByTs(tenantId, entityId, AttributeScope.CLIENT_SCOPE, attributes, json); + var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); TbMsg tbMsg = TbMsg.newMsg() .queueName(defaultQueueAndRuleChain.getKey()) @@ -289,16 +298,18 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, - TbMsgMetaData metaData) { + TbMsgMetaData metaData, + long ts) throws Exception { SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); - List attributes = new ArrayList<>(JsonConverter.convertToAttributes(json)); - String scope = metaData.getValue("scope"); + AttributeScope scope = AttributeScope.valueOf(metaData.getValue("scope")); + List attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts)); + List attributesToSave = filterAttributesByTs(tenantId, entityId, scope, attributes, json); tsSubService.saveAttributes(AttributesSaveRequest.builder() .tenantId(tenantId) .entityId(entityId) - .scope(AttributeScope.valueOf(scope)) - .entries(attributes) + .scope(scope) + .entries(attributesToSave) .callback(new FutureCallback<>() { @Override public void onSuccess(@Nullable Void tmp) { @@ -390,4 +401,21 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { entityDataMsgConstructor.constructEntityDataMsg(tenantId, entityId, actionType, JsonParser.parseString(bodyJackson)); } + private List filterAttributesByTs(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributes, JsonObject jsonObject) throws Exception { + List keys = attributes.stream().map(KvEntry::getKey).toList(); + Map existingAttributesTs = edgeCtx.getAttributesService().find(tenantId, entityId, scope, keys).get() + .stream().collect(Collectors.toMap(KvEntry::getKey, AttributeKvEntry::getLastUpdateTs)); + return attributes.stream() + .filter(attribute -> { + String key = attribute.getKey(); + long incomingTs = attribute.getLastUpdateTs(); + if (incomingTs > existingAttributesTs.getOrDefault(key, 0L)) { + return true; + } else { + jsonObject.remove(key); + return false; + } + }).toList(); + } + } diff --git a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java index 387011717a..11bc053bb5 100644 --- a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java @@ -488,6 +488,59 @@ public class DeviceEdgeTest extends AbstractEdgeTest { doDelete("/api/plugins/telemetry/DEVICE/" + device.getId().getId() + "/SERVER_SCOPE?keys=" + attributesKey, String.class); } + @Test + public void testSendOutdatedAttributeToCloud() throws Exception { + long ts = System.currentTimeMillis(); + Device device = saveDeviceOnCloudAndVerifyDeliveryToEdge(); + + edgeImitator.expectResponsesAmount(1); + + ObjectNode attributesNode = JacksonUtil.newObjectNode(); + String originalValue = "original_value"; + attributesNode.put("test_attr", originalValue); + doPost("/api/plugins/telemetry/DEVICE/" + device.getId() + "/attributes/SERVER_SCOPE", attributesNode); + + JsonObject attributesData = new JsonObject(); + // incorrect msg, will not be saved, because of ts is lower than for already existing + String attributesKey = "test_attr"; + String attributeValueIncorrect = "test_value"; + // correct msg, will be saved, no ts issue + String attributeKey2 = "test_attr2"; + String attributeValue2Correct = "test_value2"; + attributesData.addProperty(attributesKey, attributeValueIncorrect); + attributesData.addProperty(attributeKey2, attributeValue2Correct); + UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder(); + EntityDataProto.Builder entityDataBuilder = EntityDataProto.newBuilder(); + entityDataBuilder.setEntityType(device.getId().getEntityType().name()); + entityDataBuilder.setEntityIdMSB(device.getId().getId().getMostSignificantBits()); + entityDataBuilder.setEntityIdLSB(device.getId().getId().getLeastSignificantBits()); + entityDataBuilder.setAttributesUpdatedMsg(JsonConverter.convertToAttributesProto(attributesData)); + entityDataBuilder.setPostAttributeScope(DataConstants.SERVER_SCOPE); + entityDataBuilder.setAttributeTs(ts); + + uplinkMsgBuilder.addEntityData(entityDataBuilder.build()); + + edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); + Assert.assertTrue(edgeImitator.waitForResponses()); + + String attributeValuesUrl = "/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/attributes/" + DataConstants.SERVER_SCOPE; + List> attributes = doGetAsyncTyped(attributeValuesUrl, new TypeReference<>() { + }); + + Optional> customAttributeOpt = getAttributeByKey(attributesKey, attributes); + Assert.assertTrue(customAttributeOpt.isPresent()); + Map customAttribute = customAttributeOpt.get(); + Assert.assertNotEquals(attributeValueIncorrect, customAttribute.get("value")); + Assert.assertEquals(originalValue, customAttribute.get("value")); + + customAttributeOpt = getAttributeByKey(attributeKey2, attributes); + Assert.assertTrue(customAttributeOpt.isPresent()); + customAttribute = customAttributeOpt.get(); + Assert.assertEquals(attributeValue2Correct, customAttribute.get("value")); + + doDelete("/api/plugins/telemetry/DEVICE/" + device.getId().getId() + "/SERVER_SCOPE?keys=" + attributesKey, String.class); + } + @Test public void testSendDeviceToCloudWithNameThatAlreadyExistsOnCloud() throws Exception { String deviceOnCloudName = StringUtils.randomAlphanumeric(15); diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index 4dc762e14c..0557c5f5d4 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -132,6 +132,7 @@ message EntityDataProto { transport.PostAttributeMsg attributesUpdatedMsg = 6; string postAttributeScope = 7; AttributeDeleteMsg attributeDeleteMsg = 8; + optional int64 attributeTs = 9; } message AttributeDeleteMsg { diff --git a/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java index f4abf5f3a0..9226395289 100644 --- a/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java @@ -63,7 +63,6 @@ import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; import java.util.function.Consumer; -import java.util.stream.Collectors; public class JsonConverter { @@ -540,10 +539,12 @@ public class JsonConverter { } public static Set convertToAttributes(JsonElement element) { - Set result = new HashSet<>(); long ts = System.currentTimeMillis(); - result.addAll(parseValues(element.getAsJsonObject()).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList())); - return result; + return new HashSet<>(parseValues(element.getAsJsonObject()).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).toList()); + } + + public static Set convertToAttributes(JsonElement element, long ts) { + return new HashSet<>(parseValues(element.getAsJsonObject()).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).toList()); } private static List parseValues(JsonObject valuesObject) { @@ -702,4 +703,5 @@ public class JsonConverter { return ""; } } + } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/util/JsonUtils.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/util/JsonUtils.java index ed0b425c76..a97dc411b4 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/util/JsonUtils.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/util/JsonUtils.java @@ -49,6 +49,7 @@ public class JsonUtils { } return json; } + public static JsonElement parse(Object value) { if (value instanceof Integer) { return new JsonPrimitive((Integer) value); @@ -67,7 +68,7 @@ public class JsonUtils { } } - public static JsonObject convertToJsonObject(Map map) { + public static JsonObject convertToJsonObject(Map map) { JsonObject jsonObject = new JsonObject(); for (Map.Entry entry : map.entrySet()) { jsonObject.add(entry.getKey(), parse(entry.getValue())); @@ -75,4 +76,5 @@ public class JsonUtils { return jsonObject; } + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java index 73368d40f5..f1fe1affc6 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java @@ -89,6 +89,7 @@ public abstract class AbstractTbMsgPushNode { entityBody.put("kv", dataJson); + entityBody.put("ts", msg.getMetaDataTs()); entityBody.put(SCOPE, getScope(metadata)); if (EdgeEventActionType.POST_ATTRIBUTES.equals(actionType)) { entityBody.put("isPostAttributes", true);