diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java index 733755bfbd..f597354fc0 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java @@ -19,7 +19,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.gson.Gson; import com.google.gson.JsonObject; @@ -127,11 +126,12 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { CustomerId customerId = pair.getValue(); metaData.putValue(DataConstants.MSG_SOURCE_KEY, getMsgSourceKey()); if (entityData.hasPostAttributesMsg()) { + metaData.putValue(DataConstants.SCOPE, entityData.getPostAttributeScope()); long ts = entityData.hasAttributeTs() ? entityData.getAttributeTs() : System.currentTimeMillis(); result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData, ts)); } if (entityData.hasAttributesUpdatedMsg()) { - metaData.putValue("scope", entityData.getPostAttributeScope()); + metaData.putValue(DataConstants.SCOPE, entityData.getPostAttributeScope()); long ts = entityData.hasAttributeTs() ? entityData.getAttributeTs() : System.currentTimeMillis(); result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData, ts)); } @@ -265,8 +265,9 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData, long ts) throws Exception { SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); + AttributeScope scope = AttributeScope.valueOf(metaData.getValue(DataConstants.SCOPE)); List attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts)); - ListenableFuture> future = filterAttributesByTs(tenantId, entityId, AttributeScope.CLIENT_SCOPE, attributes); + ListenableFuture> future = filterAttributesByTs(tenantId, entityId, scope, attributes); Futures.addCallback(future, new FutureCallback<>() { @Override public void onSuccess(List attributesToSave) { @@ -312,7 +313,7 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { long ts) { SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); - AttributeScope scope = AttributeScope.valueOf(metaData.getValue("scope")); + AttributeScope scope = AttributeScope.valueOf(metaData.getValue(DataConstants.SCOPE)); List attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts)); ListenableFuture> future = filterAttributesByTs(tenantId, entityId, scope, attributes); Futures.addCallback(future, new FutureCallback<>() { diff --git a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java index 060e641b93..80e7d2aed4 100644 --- a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java @@ -500,6 +500,15 @@ public class DeviceEdgeTest extends AbstractEdgeTest { attributesNode.put("test_attr", originalValue); doPost("/api/plugins/telemetry/DEVICE/" + device.getId() + "/attributes/SERVER_SCOPE", attributesNode); + // Wait before device attributes saved to database + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .until(() -> { + String urlTemplate = "/api/plugins/telemetry/DEVICE/" + device.getId() + "/keys/attributes/" + DataConstants.SERVER_SCOPE; + List actualKeys = doGetAsyncTyped(urlTemplate, new TypeReference<>() {}); + return actualKeys != null && !actualKeys.isEmpty() && actualKeys.contains("test_attr"); + }); + JsonObject attributesData = new JsonObject(); // incorrect msg, will not be saved, because of ts is lower than for already existing String attributesKey = "test_attr"; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java index f1fe1affc6..911bfac43f 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java @@ -54,8 +54,6 @@ public abstract class AbstractTbMsgPushNode { entityBody.put("kv", dataJson); entityBody.put("ts", msg.getMetaDataTs()); - entityBody.put(SCOPE, getScope(metadata)); + entityBody.put(DataConstants.SCOPE, getScope(metadata)); if (EdgeEventActionType.POST_ATTRIBUTES.equals(actionType)) { entityBody.put("isPostAttributes", true); } @@ -99,7 +97,7 @@ public abstract class AbstractTbMsgPushNode keys = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() { }); entityBody.put("keys", keys); - entityBody.put(SCOPE, getScope(metadata)); + entityBody.put(DataConstants.SCOPE, getScope(metadata)); } case TIMESERIES_UPDATED -> { entityBody.put("data", dataJson); @@ -146,7 +144,7 @@ public abstract class AbstractTbMsgPushNode metadata) { - String scope = metadata.get(SCOPE); + String scope = metadata.get(DataConstants.SCOPE); if (StringUtils.isEmpty(scope)) { scope = config.getScope(); } @@ -164,7 +162,7 @@ public abstract class AbstractTbMsgPushNode