|
|
|
@ -15,15 +15,11 @@ |
|
|
|
*/ |
|
|
|
package org.thingsboard.rule.engine.metadata; |
|
|
|
|
|
|
|
import com.fasterxml.jackson.core.JsonParser; |
|
|
|
import com.fasterxml.jackson.core.json.JsonWriteFeature; |
|
|
|
import com.fasterxml.jackson.databind.JsonNode; |
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper; |
|
|
|
import com.fasterxml.jackson.databind.node.ObjectNode; |
|
|
|
import com.google.common.util.concurrent.Futures; |
|
|
|
import com.google.common.util.concurrent.ListenableFuture; |
|
|
|
import com.google.common.util.concurrent.MoreExecutors; |
|
|
|
import com.google.gson.JsonParseException; |
|
|
|
import org.apache.commons.collections.CollectionUtils; |
|
|
|
import org.apache.commons.lang3.BooleanUtils; |
|
|
|
import org.thingsboard.common.util.JacksonUtil; |
|
|
|
@ -35,14 +31,12 @@ import org.thingsboard.rule.engine.api.util.TbNodeUtils; |
|
|
|
import org.thingsboard.server.common.data.id.EntityId; |
|
|
|
import org.thingsboard.server.common.data.kv.AttributeKvEntry; |
|
|
|
import org.thingsboard.server.common.data.kv.BasicTsKvEntry; |
|
|
|
import org.thingsboard.server.common.data.kv.DataType; |
|
|
|
import org.thingsboard.server.common.data.kv.JsonDataEntry; |
|
|
|
import org.thingsboard.server.common.data.kv.KvEntry; |
|
|
|
import org.thingsboard.server.common.data.kv.TsKvEntry; |
|
|
|
import org.thingsboard.server.common.msg.TbMsg; |
|
|
|
import org.thingsboard.server.common.msg.TbMsgMetaData; |
|
|
|
|
|
|
|
import java.io.IOException; |
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.HashMap; |
|
|
|
import java.util.List; |
|
|
|
@ -60,8 +54,6 @@ import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE; |
|
|
|
|
|
|
|
public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeConfiguration, T extends EntityId> implements TbNode { |
|
|
|
|
|
|
|
private static ObjectMapper mapper = new ObjectMapper(); |
|
|
|
|
|
|
|
private static final String VALUE = "value"; |
|
|
|
private static final String TS = "ts"; |
|
|
|
|
|
|
|
@ -73,8 +65,6 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC |
|
|
|
@Override |
|
|
|
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { |
|
|
|
this.config = loadGetAttributesNodeConfig(configuration); |
|
|
|
mapper.configure(JsonWriteFeature.QUOTE_FIELD_NAMES.mappedFeature(), false); |
|
|
|
mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); |
|
|
|
this.fetchToData = config.isFetchToData(); |
|
|
|
this.getLatestValueWithTs = config.isGetLatestValueWithTs(); |
|
|
|
this.isTellFailureIfAbsent = BooleanUtils.toBooleanDefaultIfNull(this.config.isTellFailureIfAbsent(), true); |
|
|
|
@ -101,7 +91,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC |
|
|
|
ctx.tellNext(msg, FAILURE); |
|
|
|
return; |
|
|
|
} |
|
|
|
JsonNode msgDataNode = toJsonNode(msg.getData()); |
|
|
|
JsonNode msgDataNode = JacksonUtil.toJsonNode(msg.getData(), JacksonUtil.ALLOW_UNQUOTED_FIELD_NAMES_MAPPER); |
|
|
|
if (fetchToData) { |
|
|
|
if (!msgDataNode.isObject()) { |
|
|
|
ctx.tellFailure(msg, new IllegalArgumentException("Msg body is not an object!")); |
|
|
|
@ -125,7 +115,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC |
|
|
|
String prefix = getPrefix(keyScope); |
|
|
|
kvEntryList.forEach(kvEntry -> { |
|
|
|
if (fetchToData) { |
|
|
|
addKvEntryToJson((ObjectNode) msgDataNode, kvEntry, prefix + kvEntry.getKey()); |
|
|
|
JacksonUtil.addKvEntry((ObjectNode) msgDataNode, kvEntry, prefix + kvEntry.getKey(), JacksonUtil.ALLOW_UNQUOTED_FIELD_NAMES_MAPPER); |
|
|
|
} else { |
|
|
|
msgMetaData.putValue(prefix + kvEntry.getKey(), kvEntry.getValueAsString()); |
|
|
|
} |
|
|
|
@ -180,9 +170,9 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC |
|
|
|
} |
|
|
|
|
|
|
|
private TsKvEntry getValueWithTs(TsKvEntry tsKvEntry) { |
|
|
|
ObjectNode value = mapper.createObjectNode(); |
|
|
|
ObjectNode value = JacksonUtil.newObjectNode(JacksonUtil.ALLOW_UNQUOTED_FIELD_NAMES_MAPPER); |
|
|
|
value.put(TS, tsKvEntry.getTs()); |
|
|
|
addKvEntryToJson(value, tsKvEntry, VALUE); |
|
|
|
JacksonUtil.addKvEntry(value, tsKvEntry, VALUE, JacksonUtil.ALLOW_UNQUOTED_FIELD_NAMES_MAPPER); |
|
|
|
return new BasicTsKvEntry(tsKvEntry.getTs(), new JsonDataEntry(tsKvEntry.getKey(), value.toString())); |
|
|
|
} |
|
|
|
|
|
|
|
@ -202,30 +192,6 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC |
|
|
|
return prefix; |
|
|
|
} |
|
|
|
|
|
|
|
private static void addKvEntryToJson(ObjectNode entityNode, KvEntry kvEntry, String key) { |
|
|
|
if (kvEntry.getDataType() == DataType.BOOLEAN) { |
|
|
|
kvEntry.getBooleanValue().ifPresent(value -> entityNode.put(key, value)); |
|
|
|
} else if (kvEntry.getDataType() == DataType.DOUBLE) { |
|
|
|
kvEntry.getDoubleValue().ifPresent(value -> entityNode.put(key, value)); |
|
|
|
} else if (kvEntry.getDataType() == DataType.LONG) { |
|
|
|
kvEntry.getLongValue().ifPresent(value -> entityNode.put(key, value)); |
|
|
|
} else if (kvEntry.getDataType() == DataType.JSON) { |
|
|
|
if (kvEntry.getJsonValue().isPresent()) { |
|
|
|
entityNode.set(key, toJsonNode(kvEntry.getJsonValue().get())); |
|
|
|
} |
|
|
|
} else { |
|
|
|
entityNode.put(key, kvEntry.getValueAsString()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private static JsonNode toJsonNode(String value) { |
|
|
|
try { |
|
|
|
return mapper.readTree(value); |
|
|
|
} catch (IOException e) { |
|
|
|
throw new JsonParseException("Can't parse jsonValue: " + value, e); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private List<String> getNotExistingKeys(List<AttributeKvEntry> existingAttributesKvEntry, List<String> allKeys) { |
|
|
|
List<String> existingKeys = existingAttributesKvEntry.stream().map(KvEntry::getKey).collect(Collectors.toList()); |
|
|
|
return allKeys.stream().filter(key -> !existingKeys.contains(key)).collect(Collectors.toList()); |
|
|
|
|