diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java index 79b2edf856..2143639a2c 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java @@ -30,12 +30,14 @@ import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.msg.TbMsg; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import static org.thingsboard.common.util.DonAsynchron.withCallback; import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE; -import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE; @Slf4j @@ -84,10 +86,29 @@ public abstract class TbEntityGetAttrNode implements TbNode private void putAttributesAndTell(TbContext ctx, TbMsg msg, List attributes) { + log.info("attr " + attributes.toString()); + log.info("conf attr " + config.getAttrMapping().toString()); + List attrProcessPattern = new ArrayList<>(); + log.info("msg {}", msg); + log.info("result process {}", attrProcessPattern); + Map updConf = new HashMap<>(); + config.getAttrMapping().forEach((key, value) -> { + String processPattern = TbNodeUtils.processPattern(key, msg); + updConf.put(processPattern, value); + attrProcessPattern.add(processPattern); + }); + attributes.forEach(r -> { - String attrName = config.getAttrMapping().get(r.getKey()); + log.info("r {}", r); + log.info("rkey {}", r.getKey()); + log.info("index {}", attrProcessPattern.indexOf(r.getKey())); + log.info("getByKey {}", updConf.get(r.getKey())); + String attrName = updConf.get(r.getKey()); + log.info("attrName {}", attrName); msg.getMetaData().putValue(attrName, r.getValueAsString()); + log.info(msg.getMetaData().toString()); }); + log.info(msg.toString()); ctx.tellSuccess(msg); } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java index ca88290808..6720be90de 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java @@ -86,20 +86,24 @@ public class TbGetCustomerAttributeNodeTest { private DeviceService deviceService; private TbMsg msg; + private Map metaData; - private RuleChainId ruleChainId = new RuleChainId(Uuids.timeBased()); - private RuleNodeId ruleNodeId = new RuleNodeId(Uuids.timeBased()); + private final RuleChainId ruleChainId = new RuleChainId(Uuids.timeBased()); + private final RuleNodeId ruleNodeId = new RuleNodeId(Uuids.timeBased()); @Before public void init() throws TbNodeException { TbGetEntityAttrNodeConfiguration config = new TbGetEntityAttrNodeConfiguration(); - Map attrMapping = new HashMap<>(); - attrMapping.putIfAbsent("temperature", "tempo"); - config.setAttrMapping(attrMapping); + Map conf = new HashMap<>(); + conf.put("${word}", "result"); + config.setAttrMapping(conf); config.setTelemetry(false); ObjectMapper mapper = new ObjectMapper(); TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config)); + metaData = new HashMap<>(); + metaData.putIfAbsent("word", "temperature"); + node = new TbGetCustomerAttributeNode(); node.init(null, nodeConfiguration); } @@ -111,13 +115,13 @@ public class TbGetCustomerAttributeNodeTest { User user = new User(); user.setCustomerId(customerId); - msg = TbMsg.newMsg( "USER", userId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); + msg = TbMsg.newMsg("USER", userId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); when(ctx.getUserService()).thenReturn(userService); when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(user)); when(ctx.getAttributesService()).thenReturn(attributesService); - when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), eq(Collections.singleton("temperature")))) + when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), eq(Collections.singleton("${word}")))) .thenThrow(new IllegalStateException("something wrong")); node.onMsg(ctx, msg); @@ -136,13 +140,13 @@ public class TbGetCustomerAttributeNodeTest { User user = new User(); user.setCustomerId(customerId); - msg = TbMsg.newMsg( "USER", userId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); + msg = TbMsg.newMsg("USER", userId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); when(ctx.getUserService()).thenReturn(userService); when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(user)); when(ctx.getAttributesService()).thenReturn(attributesService); - when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), eq(Collections.singleton("temperature")))) + when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), eq(Collections.singleton("${word}")))) .thenReturn(Futures.immediateFailedFuture(new IllegalStateException("something wrong"))); node.onMsg(ctx, msg); @@ -161,7 +165,7 @@ public class TbGetCustomerAttributeNodeTest { User user = new User(); user.setCustomerId(customerId); - msg = TbMsg.newMsg( "USER", userId, new TbMsgMetaData(), TbMsgDataType.JSON,"{}", ruleChainId, ruleNodeId); + msg = TbMsg.newMsg("USER", userId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); when(ctx.getUserService()).thenReturn(userService); when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(null)); @@ -175,7 +179,7 @@ public class TbGetCustomerAttributeNodeTest { @Test public void customerAttributeAddedInMetadata() { CustomerId customerId = new CustomerId(Uuids.timeBased()); - msg = TbMsg.newMsg( "CUSTOMER", customerId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); + msg = TbMsg.newMsg("CUSTOMER", customerId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); entityAttributeFetched(customerId); } @@ -186,7 +190,7 @@ public class TbGetCustomerAttributeNodeTest { User user = new User(); user.setCustomerId(customerId); - msg = TbMsg.newMsg( "USER", userId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); + msg = TbMsg.newMsg("USER", userId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); when(ctx.getUserService()).thenReturn(userService); when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(user)); @@ -201,7 +205,7 @@ public class TbGetCustomerAttributeNodeTest { Asset asset = new Asset(); asset.setCustomerId(customerId); - msg = TbMsg.newMsg( "USER", assetId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); + msg = TbMsg.newMsg("USER", assetId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); when(ctx.getAssetService()).thenReturn(assetService); when(assetService.findAssetByIdAsync(any(), eq(assetId))).thenReturn(Futures.immediateFuture(asset)); @@ -216,7 +220,7 @@ public class TbGetCustomerAttributeNodeTest { Device device = new Device(); device.setCustomerId(customerId); - msg = TbMsg.newMsg( "USER", deviceId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); + msg = TbMsg.newMsg("USER", deviceId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); when(ctx.getDeviceService()).thenReturn(deviceService); when(deviceService.findDeviceByIdAsync(any(), eq(deviceId))).thenReturn(Futures.immediateFuture(device)); @@ -227,9 +231,10 @@ public class TbGetCustomerAttributeNodeTest { @Test public void deviceCustomerTelemetryFetched() throws TbNodeException { TbGetEntityAttrNodeConfiguration config = new TbGetEntityAttrNodeConfiguration(); - Map attrMapping = new HashMap<>(); - attrMapping.putIfAbsent("temperature", "tempo"); - config.setAttrMapping(attrMapping); + + Map conf = new HashMap<>(); + conf.put("${word}", "result"); + config.setAttrMapping(conf); config.setTelemetry(true); ObjectMapper mapper = new ObjectMapper(); TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config)); @@ -243,7 +248,7 @@ public class TbGetCustomerAttributeNodeTest { Device device = new Device(); device.setCustomerId(customerId); - msg = TbMsg.newMsg( "USER", deviceId, new TbMsgMetaData(), TbMsgDataType.JSON,"{}", ruleChainId, ruleNodeId); + msg = TbMsg.newMsg("USER", deviceId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); when(ctx.getDeviceService()).thenReturn(deviceService); when(deviceService.findDeviceByIdAsync(any(), eq(deviceId))).thenReturn(Futures.immediateFuture(device)); @@ -251,23 +256,23 @@ public class TbGetCustomerAttributeNodeTest { List timeseries = Lists.newArrayList(new BasicTsKvEntry(1L, new StringDataEntry("temperature", "highest"))); when(ctx.getTimeseriesService()).thenReturn(timeseriesService); - when(timeseriesService.findLatest(any(), eq(customerId), eq(Collections.singleton("temperature")))) + when(timeseriesService.findLatest(any(), eq(customerId), eq(Collections.singleton("${word}")))) .thenReturn(Futures.immediateFuture(timeseries)); node.onMsg(ctx, msg); verify(ctx).tellSuccess(msg); - assertEquals(msg.getMetaData().getValue("tempo"), "highest"); + assertEquals(msg.getMetaData().getValue("result"), "highest"); } private void entityAttributeFetched(CustomerId customerId) { List attributes = Lists.newArrayList(new BaseAttributeKvEntry(new StringDataEntry("temperature", "high"), 1L)); when(ctx.getAttributesService()).thenReturn(attributesService); - when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), eq(Collections.singleton("temperature")))) + when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), eq(Collections.singleton("${word}")))) .thenReturn(Futures.immediateFuture(attributes)); node.onMsg(ctx, msg); verify(ctx).tellSuccess(msg); - assertEquals(msg.getMetaData().getValue("tempo"), "high"); + assertEquals(msg.getMetaData().getValue("result"), "high"); } } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNodeTest.java new file mode 100644 index 0000000000..820d3118d4 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNodeTest.java @@ -0,0 +1,299 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.metadata; + +import com.datastax.oss.driver.api.core.uuid.Uuids; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.id.AssetId; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.RuleChainId; +import org.thingsboard.server.common.data.id.RuleNodeId; +import org.thingsboard.server.common.data.id.UserId; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.StringDataEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.relation.EntityRelation; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgDataType; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.dao.asset.AssetService; +import org.thingsboard.server.dao.attributes.AttributesService; +import org.thingsboard.server.dao.device.DeviceService; +import org.thingsboard.server.dao.relation.RelationService; +import org.thingsboard.server.dao.timeseries.TimeseriesService; +import org.thingsboard.server.dao.user.UserService; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE; +import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE; + +@RunWith(MockitoJUnitRunner.Silent.class) +public class TbGetRelatedAttributeNodeTest { + private final CustomerId customerId = new CustomerId(Uuids.timeBased()); + private final RuleChainId ruleChainId = new RuleChainId(Uuids.timeBased()); + private final RuleNodeId ruleNodeId = new RuleNodeId(Uuids.timeBased()); + private TbGetRelatedAttributeNode node; + @Mock + private TbContext ctx; + @Mock + private AttributesService attributesService; + @Mock + private TimeseriesService timeseriesService; + @Mock + private UserService userService; + @Mock + private AssetService assetService; + @Mock + private DeviceService deviceService; + @Mock + private RelationService relationService; + private TbMsg msg; + private Map metaData; + private EntityRelation entityRelation; + + @Before + public void init() throws TbNodeException { + TbGetRelatedAttrNodeConfiguration config = new TbGetRelatedAttrNodeConfiguration(); + config = config.defaultConfiguration(); + Map conf = new HashMap<>(); + conf.put("${word}", "result"); + config.setAttrMapping(conf); + config.setTelemetry(false); + ObjectMapper mapper = new ObjectMapper(); + TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config)); + + metaData = new HashMap<>(); + metaData.putIfAbsent("word", "temperature"); + + entityRelation = new EntityRelation(); + entityRelation.setTo(customerId); + entityRelation.setType(EntityRelation.CONTAINS_TYPE); + when(ctx.getRelationService()).thenReturn(relationService); + + node = new TbGetRelatedAttributeNode(); + node.init(null, nodeConfiguration); + } + + @Test + public void errorThrownIfCannotLoadAttributes() { + UserId userId = new UserId(Uuids.timeBased()); + User user = new User(); + user.setCustomerId(customerId); + entityRelation.setFrom(userId); + when(relationService.findByQuery(any(), any())).thenReturn(Futures.immediateFuture(List.of(entityRelation))); + + msg = TbMsg.newMsg("USER", userId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); + + when(ctx.getUserService()).thenReturn(userService); + when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(user)); + + when(ctx.getAttributesService()).thenReturn(attributesService); + when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), eq(Collections.singleton("${word}")))) + .thenThrow(new IllegalStateException("something wrong")); + + node.onMsg(ctx, msg); + final ArgumentCaptor captor = ArgumentCaptor.forClass(Throwable.class); + verify(ctx).tellFailure(same(msg), captor.capture()); + + Throwable value = captor.getValue(); + assertEquals("something wrong", value.getMessage()); + assertTrue(msg.getMetaData().getData().isEmpty()); + } + + @Test + public void errorThrownIfCannotLoadAttributesAsync() { + UserId userId = new UserId(Uuids.timeBased()); + User user = new User(); + user.setCustomerId(customerId); + entityRelation.setFrom(userId); + when(relationService.findByQuery(any(), any())).thenReturn(Futures.immediateFuture(List.of(entityRelation))); + + msg = TbMsg.newMsg("USER", userId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); + + when(ctx.getUserService()).thenReturn(userService); + when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(user)); + + when(ctx.getAttributesService()).thenReturn(attributesService); + + when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), eq(Collections.singleton("${word}")))) + .thenReturn(Futures.immediateFailedFuture(new IllegalStateException("something wrong"))); + + node.onMsg(ctx, msg); + final ArgumentCaptor captor = ArgumentCaptor.forClass(Throwable.class); + verify(ctx).tellFailure(same(msg), captor.capture()); + + Throwable value = captor.getValue(); + assertEquals("something wrong", value.getMessage()); + assertTrue(msg.getMetaData().getData().isEmpty()); + } + + @Test + public void failedChainUsedIfCustomerCannotBeFound() { + UserId userId = new UserId(Uuids.timeBased()); + User user = new User(); + user.setCustomerId(customerId); + entityRelation.setFrom(customerId); + entityRelation.setTo(null); + when(relationService.findByQuery(any(), any())).thenReturn(Futures.immediateFuture(List.of(entityRelation))); + + msg = TbMsg.newMsg("USER", userId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); + + when(ctx.getUserService()).thenReturn(userService); + when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(null)); + + + node.onMsg(ctx, msg); + verify(ctx).tellNext(msg, FAILURE); + assertTrue(msg.getMetaData().getData().isEmpty()); + + entityRelation.setTo(customerId); + } + + @Test + public void customerAttributeAddedInMetadata() { + entityRelation.setFrom(customerId); + when(relationService.findByQuery(any(), any())).thenReturn(Futures.immediateFuture(List.of(entityRelation))); + msg = TbMsg.newMsg("CUSTOMER", customerId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); + entityAttributeFetched(customerId); + } + + @Test + public void usersCustomerAttributesFetched() { + UserId userId = new UserId(Uuids.timeBased()); + User user = new User(); + user.setCustomerId(customerId); + entityRelation.setFrom(userId); + when(relationService.findByQuery(any(), any())).thenReturn(Futures.immediateFuture(List.of(entityRelation))); + + msg = TbMsg.newMsg("USER", userId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); + + when(ctx.getUserService()).thenReturn(userService); + when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(user)); + + entityAttributeFetched(customerId); + } + + @Test + public void assetsCustomerAttributesFetched() { + AssetId assetId = new AssetId(Uuids.timeBased()); + Asset asset = new Asset(); + asset.setCustomerId(customerId); + entityRelation.setFrom(assetId); + when(relationService.findByQuery(any(), any())).thenReturn(Futures.immediateFuture(List.of(entityRelation))); + + msg = TbMsg.newMsg("USER", assetId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); + + when(ctx.getAssetService()).thenReturn(assetService); + when(assetService.findAssetByIdAsync(any(), eq(assetId))).thenReturn(Futures.immediateFuture(asset)); + + entityAttributeFetched(customerId); + } + + @Test + public void deviceCustomerAttributesFetched() { + DeviceId deviceId = new DeviceId(Uuids.timeBased()); + Device device = new Device(); + device.setCustomerId(customerId); + entityRelation.setFrom(deviceId); + when(relationService.findByQuery(any(), any())).thenReturn(Futures.immediateFuture(List.of(entityRelation))); + + msg = TbMsg.newMsg("USER", deviceId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); + + when(ctx.getDeviceService()).thenReturn(deviceService); + when(deviceService.findDeviceByIdAsync(any(), eq(deviceId))).thenReturn(Futures.immediateFuture(device)); + + entityAttributeFetched(customerId); + } + + @Test + public void deviceCustomerTelemetryFetched() throws TbNodeException { + TbGetRelatedAttrNodeConfiguration config = new TbGetRelatedAttrNodeConfiguration(); + config = config.defaultConfiguration(); + + Map conf = new HashMap<>(); + conf.put("${word}", "result"); + config.setAttrMapping(conf); + config.setTelemetry(true); + ObjectMapper mapper = new ObjectMapper(); + TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config)); + + node = new TbGetRelatedAttributeNode(); + node.init(null, nodeConfiguration); + + + DeviceId deviceId = new DeviceId(Uuids.timeBased()); + Device device = new Device(); + device.setCustomerId(customerId); + + entityRelation.setFrom(deviceId); + when(relationService.findByQuery(any(), any())).thenReturn(Futures.immediateFuture(List.of(entityRelation))); + + msg = TbMsg.newMsg("USER", deviceId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); + + when(ctx.getDeviceService()).thenReturn(deviceService); + when(deviceService.findDeviceByIdAsync(any(), eq(deviceId))).thenReturn(Futures.immediateFuture(device)); + + List timeseries = Lists.newArrayList(new BasicTsKvEntry(1L, new StringDataEntry("temperature", "highest"))); + + when(ctx.getTimeseriesService()).thenReturn(timeseriesService); + when(timeseriesService.findLatest(any(), eq(customerId), eq(Collections.singleton("${word}")))) + .thenReturn(Futures.immediateFuture(timeseries)); + + node.onMsg(ctx, msg); + verify(ctx).tellSuccess(msg); + assertEquals(msg.getMetaData().getValue("result"), "highest"); + } + + private void entityAttributeFetched(CustomerId customerId) { + List attributes = Lists.newArrayList(new BaseAttributeKvEntry(new StringDataEntry("temperature", "high"), 1L)); + + when(ctx.getAttributesService()).thenReturn(attributesService); + when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), eq(Collections.singleton("${word}")))) + .thenReturn(Futures.immediateFuture(attributes)); + + node.onMsg(ctx, msg); + verify(ctx).tellSuccess(msg); + assertEquals(msg.getMetaData().getValue("result"), "high"); + } +} \ No newline at end of file diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNodeTest.java new file mode 100644 index 0000000000..1e111d150c --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNodeTest.java @@ -0,0 +1,278 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.metadata; + +import com.datastax.oss.driver.api.core.uuid.Uuids; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.id.AssetId; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.RuleChainId; +import org.thingsboard.server.common.data.id.RuleNodeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.id.UserId; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.StringDataEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgDataType; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.dao.asset.AssetService; +import org.thingsboard.server.dao.attributes.AttributesService; +import org.thingsboard.server.dao.device.DeviceService; +import org.thingsboard.server.dao.timeseries.TimeseriesService; +import org.thingsboard.server.dao.user.UserService; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE; +import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE; + +@RunWith(MockitoJUnitRunner.Silent.class) +public class TbGetTenantAttributeNodeTest { + private TbGetTenantAttributeNode node; + + @Mock + private TbContext ctx; + + @Mock + private AttributesService attributesService; + @Mock + private TimeseriesService timeseriesService; + @Mock + private UserService userService; + @Mock + private AssetService assetService; + @Mock + private DeviceService deviceService; + + private TbMsg msg; + private Map metaData; + + private final RuleChainId ruleChainId = new RuleChainId(Uuids.timeBased()); + private final RuleNodeId ruleNodeId = new RuleNodeId(Uuids.timeBased()); + + @Before + public void init() throws TbNodeException { + TbGetEntityAttrNodeConfiguration config = new TbGetEntityAttrNodeConfiguration(); + Map conf = new HashMap<>(); + conf.put("${word}", "result"); + config.setAttrMapping(conf); + config.setTelemetry(false); + ObjectMapper mapper = new ObjectMapper(); + TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config)); + + metaData = new HashMap<>(); + metaData.putIfAbsent("word", "temperature"); + + node = new TbGetTenantAttributeNode(); + node.init(null, nodeConfiguration); + } + + @Test + public void errorThrownIfCannotLoadAttributes() { + UserId userId = new UserId(Uuids.timeBased()); + TenantId tenantId = new TenantId(Uuids.timeBased()); + User user = new User(); + user.setTenantId(tenantId); + + msg = TbMsg.newMsg("USER", userId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); + + when(ctx.getUserService()).thenReturn(userService); + when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(user)); + + when(ctx.getAttributesService()).thenReturn(attributesService); + when(attributesService.find(any(), eq(tenantId), eq(SERVER_SCOPE), eq(Collections.singleton("${word}")))) + .thenThrow(new IllegalStateException("something wrong")); + + node.onMsg(ctx, msg); + final ArgumentCaptor captor = ArgumentCaptor.forClass(Throwable.class); + verify(ctx).tellFailure(same(msg), captor.capture()); + + Throwable value = captor.getValue(); + assertEquals("something wrong", value.getMessage()); + assertTrue(msg.getMetaData().getData().isEmpty()); + } + + @Test + public void errorThrownIfCannotLoadAttributesAsync() { + UserId userId = new UserId(Uuids.timeBased()); + TenantId tenantId = new TenantId(Uuids.timeBased()); + User user = new User(); + user.setTenantId(tenantId); + + msg = TbMsg.newMsg("USER", userId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); + + when(ctx.getUserService()).thenReturn(userService); + when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(user)); + + when(ctx.getAttributesService()).thenReturn(attributesService); + when(attributesService.find(any(), eq(tenantId), eq(SERVER_SCOPE), eq(Collections.singleton("${word}")))) + .thenReturn(Futures.immediateFailedFuture(new IllegalStateException("something wrong"))); + + node.onMsg(ctx, msg); + final ArgumentCaptor captor = ArgumentCaptor.forClass(Throwable.class); + verify(ctx).tellFailure(same(msg), captor.capture()); + + Throwable value = captor.getValue(); + assertEquals("something wrong", value.getMessage()); + assertTrue(msg.getMetaData().getData().isEmpty()); + } + + @Test + public void failedChainUsedIfCustomerCannotBeFound() { + UserId userId = new UserId(Uuids.timeBased()); + CustomerId customerId = new CustomerId(Uuids.timeBased()); + User user = new User(); + user.setCustomerId(customerId); + + msg = TbMsg.newMsg("USER", userId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); + + when(ctx.getUserService()).thenReturn(userService); + when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(null)); + + + node.onMsg(ctx, msg); + verify(ctx).tellNext(msg, FAILURE); + assertTrue(msg.getMetaData().getData().isEmpty()); + } + + @Test + public void customerAttributeAddedInMetadata() { + TenantId tenantId = new TenantId(Uuids.timeBased()); + msg = TbMsg.newMsg("TENANT", tenantId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); + entityAttributeFetched(tenantId); + } + + @Test + public void usersCustomerAttributesFetched() { + UserId userId = new UserId(Uuids.timeBased()); + TenantId tenantId = new TenantId(Uuids.timeBased()); + User user = new User(); + user.setTenantId(tenantId); + + msg = TbMsg.newMsg("USER", userId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); + + when(ctx.getUserService()).thenReturn(userService); + when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(user)); + + entityAttributeFetched(tenantId); + } + + @Test + public void assetsCustomerAttributesFetched() { + AssetId assetId = new AssetId(Uuids.timeBased()); + TenantId tenantId = new TenantId(Uuids.timeBased()); + Asset asset = new Asset(); + asset.setTenantId(tenantId); + + msg = TbMsg.newMsg("USER", assetId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); + + when(ctx.getAssetService()).thenReturn(assetService); + when(assetService.findAssetByIdAsync(any(), eq(assetId))).thenReturn(Futures.immediateFuture(asset)); + + entityAttributeFetched(tenantId); + } + + @Test + public void deviceCustomerAttributesFetched() { + DeviceId deviceId = new DeviceId(Uuids.timeBased()); + TenantId tenantId = new TenantId(Uuids.timeBased()); + Device device = new Device(); + device.setTenantId(tenantId); + + msg = TbMsg.newMsg("USER", deviceId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); + + when(ctx.getDeviceService()).thenReturn(deviceService); + when(deviceService.findDeviceByIdAsync(any(), eq(deviceId))).thenReturn(Futures.immediateFuture(device)); + + entityAttributeFetched(tenantId); + } + + @Test + public void deviceCustomerTelemetryFetched() throws TbNodeException { + TbGetEntityAttrNodeConfiguration config = new TbGetEntityAttrNodeConfiguration(); + + Map conf = new HashMap<>(); + conf.put("${word}", "result"); + config.setAttrMapping(conf); + config.setTelemetry(true); + ObjectMapper mapper = new ObjectMapper(); + TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config)); + + node = new TbGetTenantAttributeNode(); + node.init(null, nodeConfiguration); + + + DeviceId deviceId = new DeviceId(Uuids.timeBased()); + TenantId tenantId = new TenantId(Uuids.timeBased()); + Device device = new Device(); + device.setTenantId(tenantId); + + msg = TbMsg.newMsg("USER", deviceId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId); + + when(ctx.getDeviceService()).thenReturn(deviceService); + when(deviceService.findDeviceByIdAsync(any(), eq(deviceId))).thenReturn(Futures.immediateFuture(device)); + + List timeseries = Lists.newArrayList(new BasicTsKvEntry(1L, new StringDataEntry("temperature", "highest"))); + + when(ctx.getTimeseriesService()).thenReturn(timeseriesService); + when(timeseriesService.findLatest(any(), eq(tenantId), eq(Collections.singleton("${word}")))) + .thenReturn(Futures.immediateFuture(timeseries)); + + node.onMsg(ctx, msg); + verify(ctx).tellSuccess(msg); + assertEquals(msg.getMetaData().getValue("result"), "highest"); + } + + private void entityAttributeFetched(TenantId customerId) { + List attributes = Lists.newArrayList(new BaseAttributeKvEntry(new StringDataEntry("temperature", "high"), 1L)); + + when(ctx.getAttributesService()).thenReturn(attributesService); + when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), eq(Collections.singleton("${word}")))) + .thenReturn(Futures.immediateFuture(attributes)); + + node.onMsg(ctx, msg); + verify(ctx).tellSuccess(msg); + assertEquals(msg.getMetaData().getValue("result"), "high"); + } +} \ No newline at end of file