|
|
|
@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.metadata; |
|
|
|
|
|
|
|
import com.fasterxml.jackson.core.JsonParser; |
|
|
|
import com.fasterxml.jackson.core.json.JsonWriteFeature; |
|
|
|
import com.fasterxml.jackson.databind.JsonNode; |
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper; |
|
|
|
import com.fasterxml.jackson.databind.node.ObjectNode; |
|
|
|
import com.google.common.util.concurrent.Futures; |
|
|
|
@ -25,6 +26,7 @@ import com.google.common.util.concurrent.MoreExecutors; |
|
|
|
import com.google.gson.JsonParseException; |
|
|
|
import org.apache.commons.collections.CollectionUtils; |
|
|
|
import org.apache.commons.lang3.BooleanUtils; |
|
|
|
import org.thingsboard.common.util.JacksonUtil; |
|
|
|
import org.thingsboard.rule.engine.api.TbContext; |
|
|
|
import org.thingsboard.rule.engine.api.TbNode; |
|
|
|
import org.thingsboard.rule.engine.api.TbNodeConfiguration; |
|
|
|
@ -57,10 +59,12 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC |
|
|
|
private static final String TS = "ts"; |
|
|
|
|
|
|
|
protected C config; |
|
|
|
private boolean fetchToData; |
|
|
|
|
|
|
|
@Override |
|
|
|
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { |
|
|
|
this.config = loadGetAttributesNodeConfig(configuration); |
|
|
|
this.fetchToData = config.isFetchToData(); |
|
|
|
mapper.configure(JsonWriteFeature.QUOTE_FIELD_NAMES.mappedFeature(), false); |
|
|
|
mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); |
|
|
|
} |
|
|
|
@ -86,22 +90,36 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC |
|
|
|
ctx.tellNext(msg, FAILURE); |
|
|
|
return; |
|
|
|
} |
|
|
|
ObjectNode msgNewData = JacksonUtil.newObjectNode(); |
|
|
|
if (fetchToData) { |
|
|
|
JsonNode msgDataNode = JacksonUtil.toJsonNode(msg.getData()); |
|
|
|
if (!msgDataNode.isObject()) { |
|
|
|
ctx.tellFailure(msg, new IllegalArgumentException("Msg body is not object!")); |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
ConcurrentHashMap<String, List<String>> failuresMap = new ConcurrentHashMap<>(); |
|
|
|
ListenableFuture<List<Void>> allFutures = Futures.allAsList( |
|
|
|
putLatestTelemetry(ctx, entityId, msg, LATEST_TS, TbNodeUtils.processPatterns(config.getLatestTsKeyNames(), msg), failuresMap), |
|
|
|
putAttrAsync(ctx, entityId, msg, CLIENT_SCOPE, TbNodeUtils.processPatterns(config.getClientAttributeNames(), msg), failuresMap, "cs_"), |
|
|
|
putAttrAsync(ctx, entityId, msg, SHARED_SCOPE, TbNodeUtils.processPatterns(config.getSharedAttributeNames(), msg), failuresMap, "shared_"), |
|
|
|
putAttrAsync(ctx, entityId, msg, SERVER_SCOPE, TbNodeUtils.processPatterns(config.getServerAttributeNames(), msg), failuresMap, "ss_") |
|
|
|
putLatestTelemetry(ctx, entityId, msg, LATEST_TS, TbNodeUtils.processPatterns(config.getLatestTsKeyNames(), msg), failuresMap, msgNewData), |
|
|
|
putAttrAsync(ctx, entityId, msg, CLIENT_SCOPE, TbNodeUtils.processPatterns(config.getClientAttributeNames(), msg), failuresMap, "cs_", msgNewData), |
|
|
|
putAttrAsync(ctx, entityId, msg, SHARED_SCOPE, TbNodeUtils.processPatterns(config.getSharedAttributeNames(), msg), failuresMap, "shared_", msgNewData), |
|
|
|
putAttrAsync(ctx, entityId, msg, SERVER_SCOPE, TbNodeUtils.processPatterns(config.getServerAttributeNames(), msg), failuresMap, "ss_", msgNewData) |
|
|
|
); |
|
|
|
withCallback(allFutures, i -> { |
|
|
|
if (!failuresMap.isEmpty()) { |
|
|
|
throw reportFailures(failuresMap); |
|
|
|
} |
|
|
|
ctx.tellSuccess(msg); |
|
|
|
if (fetchToData) { |
|
|
|
ObjectNode msgDataNode = (ObjectNode) JacksonUtil.toJsonNode(msg.getData()); |
|
|
|
msgDataNode.setAll(msgNewData); |
|
|
|
ctx.tellSuccess(TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), JacksonUtil.toString(msgDataNode))); |
|
|
|
} else { |
|
|
|
ctx.tellSuccess(msg); |
|
|
|
} |
|
|
|
}, t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor()); |
|
|
|
} |
|
|
|
|
|
|
|
private ListenableFuture<Void> putAttrAsync(TbContext ctx, EntityId entityId, TbMsg msg, String scope, List<String> keys, ConcurrentHashMap<String, List<String>> failuresMap, String prefix) { |
|
|
|
private ListenableFuture<Void> putAttrAsync(TbContext ctx, EntityId entityId, TbMsg msg, String scope, List<String> keys, ConcurrentHashMap<String, List<String>> failuresMap, String prefix, ObjectNode msgData) { |
|
|
|
if (CollectionUtils.isEmpty(keys)) { |
|
|
|
return Futures.immediateFuture(null); |
|
|
|
} |
|
|
|
@ -109,7 +127,13 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC |
|
|
|
return Futures.transform(attributeKvEntryListFuture, attributeKvEntryList -> { |
|
|
|
if (!CollectionUtils.isEmpty(attributeKvEntryList)) { |
|
|
|
List<AttributeKvEntry> existingAttributesKvEntry = attributeKvEntryList.stream().filter(attributeKvEntry -> keys.contains(attributeKvEntry.getKey())).collect(Collectors.toList()); |
|
|
|
existingAttributesKvEntry.forEach(kvEntry -> msg.getMetaData().putValue(prefix + kvEntry.getKey(), kvEntry.getValueAsString())); |
|
|
|
existingAttributesKvEntry.forEach(kvEntry -> { |
|
|
|
if (fetchToData) { |
|
|
|
msgData.put(prefix + kvEntry.getKey(), kvEntry.getValueAsString()); |
|
|
|
} else { |
|
|
|
msg.getMetaData().putValue(prefix + kvEntry.getKey(), kvEntry.getValueAsString()); |
|
|
|
} |
|
|
|
}); |
|
|
|
if (existingAttributesKvEntry.size() != keys.size() && BooleanUtils.toBooleanDefaultIfNull(this.config.isTellFailureIfAbsent(), true)) { |
|
|
|
getNotExistingKeys(existingAttributesKvEntry, keys).forEach(key -> computeFailuresMap(scope, failuresMap, key)); |
|
|
|
} |
|
|
|
@ -122,7 +146,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC |
|
|
|
}, MoreExecutors.directExecutor()); |
|
|
|
} |
|
|
|
|
|
|
|
private ListenableFuture<Void> putLatestTelemetry(TbContext ctx, EntityId entityId, TbMsg msg, String scope, List<String> keys, ConcurrentHashMap<String, List<String>> failuresMap) { |
|
|
|
private ListenableFuture<Void> putLatestTelemetry(TbContext ctx, EntityId entityId, TbMsg msg, String scope, List<String> keys, ConcurrentHashMap<String, List<String>> failuresMap, ObjectNode msgData) { |
|
|
|
if (CollectionUtils.isEmpty(keys)) { |
|
|
|
return Futures.immediateFuture(null); |
|
|
|
} |
|
|
|
@ -134,16 +158,24 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC |
|
|
|
if (r.getValue() == null) { |
|
|
|
computeFailuresMap(scope, failuresMap, r.getKey()); |
|
|
|
} else if (getLatestValueWithTs) { |
|
|
|
putValueWithTs(msg, r); |
|
|
|
putValueWithTs(msg, r, msgData); |
|
|
|
} else { |
|
|
|
msg.getMetaData().putValue(r.getKey(), r.getValueAsString()); |
|
|
|
if (fetchToData) { |
|
|
|
msgData.put(r.getKey(), r.getValueAsString()); |
|
|
|
} else { |
|
|
|
msg.getMetaData().putValue(r.getKey(), r.getValueAsString()); |
|
|
|
} |
|
|
|
} |
|
|
|
} else { |
|
|
|
if (r.getValue() != null) { |
|
|
|
if (getLatestValueWithTs) { |
|
|
|
putValueWithTs(msg, r); |
|
|
|
putValueWithTs(msg, r, msgData); |
|
|
|
} else { |
|
|
|
msg.getMetaData().putValue(r.getKey(), r.getValueAsString()); |
|
|
|
if (fetchToData) { |
|
|
|
msgData.put(r.getKey(), r.getValueAsString()); |
|
|
|
} else { |
|
|
|
msg.getMetaData().putValue(r.getKey(), r.getValueAsString()); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@ -152,7 +184,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC |
|
|
|
}, MoreExecutors.directExecutor()); |
|
|
|
} |
|
|
|
|
|
|
|
private void putValueWithTs(TbMsg msg, TsKvEntry r) { |
|
|
|
private void putValueWithTs(TbMsg msg, TsKvEntry r, ObjectNode msgData) { |
|
|
|
ObjectNode value = mapper.createObjectNode(); |
|
|
|
value.put(TS, r.getTs()); |
|
|
|
switch (r.getDataType()) { |
|
|
|
@ -176,7 +208,11 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC |
|
|
|
} |
|
|
|
break; |
|
|
|
} |
|
|
|
msg.getMetaData().putValue(r.getKey(), value.toString()); |
|
|
|
if (fetchToData) { |
|
|
|
msgData.putIfAbsent(r.getKey(), value); |
|
|
|
} else { |
|
|
|
msg.getMetaData().putValue(r.getKey(), value.toString()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private List<String> getNotExistingKeys(List<AttributeKvEntry> existingAttributesKvEntry, List<String> allKeys) { |
|
|
|
|