Browse Source

add process pattern for rule node:

- customer attribute
 - tenant attribute
 - related attribute

also, refactoring old tests for customer attribute and add tests related and tenant attribute.
pull/5550/head
van-vanich 5 years ago
parent
commit
6d657fed46
  1. 25
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java
  2. 49
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java
  3. 299
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNodeTest.java
  4. 278
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNodeTest.java

25
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java

@ -30,12 +30,14 @@ import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.msg.TbMsg;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.thingsboard.common.util.DonAsynchron.withCallback;
import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
@Slf4j
@ -84,10 +86,29 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode
private void putAttributesAndTell(TbContext ctx, TbMsg msg, List<? extends KvEntry> attributes) {
log.info("attr " + attributes.toString());
log.info("conf attr " + config.getAttrMapping().toString());
List<String> attrProcessPattern = new ArrayList<>();
log.info("msg {}", msg);
log.info("result process {}", attrProcessPattern);
Map<String, String> updConf = new HashMap<>();
config.getAttrMapping().forEach((key, value) -> {
String processPattern = TbNodeUtils.processPattern(key, msg);
updConf.put(processPattern, value);
attrProcessPattern.add(processPattern);
});
attributes.forEach(r -> {
String attrName = config.getAttrMapping().get(r.getKey());
log.info("r {}", r);
log.info("rkey {}", r.getKey());
log.info("index {}", attrProcessPattern.indexOf(r.getKey()));
log.info("getByKey {}", updConf.get(r.getKey()));
String attrName = updConf.get(r.getKey());
log.info("attrName {}", attrName);
msg.getMetaData().putValue(attrName, r.getValueAsString());
log.info(msg.getMetaData().toString());
});
log.info(msg.toString());
ctx.tellSuccess(msg);
}

49
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java

@ -86,20 +86,24 @@ public class TbGetCustomerAttributeNodeTest {
private DeviceService deviceService;
private TbMsg msg;
private Map metaData;
private RuleChainId ruleChainId = new RuleChainId(Uuids.timeBased());
private RuleNodeId ruleNodeId = new RuleNodeId(Uuids.timeBased());
private final RuleChainId ruleChainId = new RuleChainId(Uuids.timeBased());
private final RuleNodeId ruleNodeId = new RuleNodeId(Uuids.timeBased());
@Before
public void init() throws TbNodeException {
TbGetEntityAttrNodeConfiguration config = new TbGetEntityAttrNodeConfiguration();
Map<String, String> attrMapping = new HashMap<>();
attrMapping.putIfAbsent("temperature", "tempo");
config.setAttrMapping(attrMapping);
Map<String, String> conf = new HashMap<>();
conf.put("${word}", "result");
config.setAttrMapping(conf);
config.setTelemetry(false);
ObjectMapper mapper = new ObjectMapper();
TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config));
metaData = new HashMap<>();
metaData.putIfAbsent("word", "temperature");
node = new TbGetCustomerAttributeNode();
node.init(null, nodeConfiguration);
}
@ -111,13 +115,13 @@ public class TbGetCustomerAttributeNodeTest {
User user = new User();
user.setCustomerId(customerId);
msg = TbMsg.newMsg( "USER", userId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
msg = TbMsg.newMsg("USER", userId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
when(ctx.getUserService()).thenReturn(userService);
when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(user));
when(ctx.getAttributesService()).thenReturn(attributesService);
when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), eq(Collections.singleton("temperature"))))
when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), eq(Collections.singleton("${word}"))))
.thenThrow(new IllegalStateException("something wrong"));
node.onMsg(ctx, msg);
@ -136,13 +140,13 @@ public class TbGetCustomerAttributeNodeTest {
User user = new User();
user.setCustomerId(customerId);
msg = TbMsg.newMsg( "USER", userId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
msg = TbMsg.newMsg("USER", userId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
when(ctx.getUserService()).thenReturn(userService);
when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(user));
when(ctx.getAttributesService()).thenReturn(attributesService);
when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), eq(Collections.singleton("temperature"))))
when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), eq(Collections.singleton("${word}"))))
.thenReturn(Futures.immediateFailedFuture(new IllegalStateException("something wrong")));
node.onMsg(ctx, msg);
@ -161,7 +165,7 @@ public class TbGetCustomerAttributeNodeTest {
User user = new User();
user.setCustomerId(customerId);
msg = TbMsg.newMsg( "USER", userId, new TbMsgMetaData(), TbMsgDataType.JSON,"{}", ruleChainId, ruleNodeId);
msg = TbMsg.newMsg("USER", userId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
when(ctx.getUserService()).thenReturn(userService);
when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(null));
@ -175,7 +179,7 @@ public class TbGetCustomerAttributeNodeTest {
@Test
public void customerAttributeAddedInMetadata() {
CustomerId customerId = new CustomerId(Uuids.timeBased());
msg = TbMsg.newMsg( "CUSTOMER", customerId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
msg = TbMsg.newMsg("CUSTOMER", customerId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
entityAttributeFetched(customerId);
}
@ -186,7 +190,7 @@ public class TbGetCustomerAttributeNodeTest {
User user = new User();
user.setCustomerId(customerId);
msg = TbMsg.newMsg( "USER", userId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
msg = TbMsg.newMsg("USER", userId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
when(ctx.getUserService()).thenReturn(userService);
when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(user));
@ -201,7 +205,7 @@ public class TbGetCustomerAttributeNodeTest {
Asset asset = new Asset();
asset.setCustomerId(customerId);
msg = TbMsg.newMsg( "USER", assetId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
msg = TbMsg.newMsg("USER", assetId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
when(ctx.getAssetService()).thenReturn(assetService);
when(assetService.findAssetByIdAsync(any(), eq(assetId))).thenReturn(Futures.immediateFuture(asset));
@ -216,7 +220,7 @@ public class TbGetCustomerAttributeNodeTest {
Device device = new Device();
device.setCustomerId(customerId);
msg = TbMsg.newMsg( "USER", deviceId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
msg = TbMsg.newMsg("USER", deviceId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
when(ctx.getDeviceService()).thenReturn(deviceService);
when(deviceService.findDeviceByIdAsync(any(), eq(deviceId))).thenReturn(Futures.immediateFuture(device));
@ -227,9 +231,10 @@ public class TbGetCustomerAttributeNodeTest {
@Test
public void deviceCustomerTelemetryFetched() throws TbNodeException {
TbGetEntityAttrNodeConfiguration config = new TbGetEntityAttrNodeConfiguration();
Map<String, String> attrMapping = new HashMap<>();
attrMapping.putIfAbsent("temperature", "tempo");
config.setAttrMapping(attrMapping);
Map<String, String> conf = new HashMap<>();
conf.put("${word}", "result");
config.setAttrMapping(conf);
config.setTelemetry(true);
ObjectMapper mapper = new ObjectMapper();
TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config));
@ -243,7 +248,7 @@ public class TbGetCustomerAttributeNodeTest {
Device device = new Device();
device.setCustomerId(customerId);
msg = TbMsg.newMsg( "USER", deviceId, new TbMsgMetaData(), TbMsgDataType.JSON,"{}", ruleChainId, ruleNodeId);
msg = TbMsg.newMsg("USER", deviceId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
when(ctx.getDeviceService()).thenReturn(deviceService);
when(deviceService.findDeviceByIdAsync(any(), eq(deviceId))).thenReturn(Futures.immediateFuture(device));
@ -251,23 +256,23 @@ public class TbGetCustomerAttributeNodeTest {
List<TsKvEntry> timeseries = Lists.newArrayList(new BasicTsKvEntry(1L, new StringDataEntry("temperature", "highest")));
when(ctx.getTimeseriesService()).thenReturn(timeseriesService);
when(timeseriesService.findLatest(any(), eq(customerId), eq(Collections.singleton("temperature"))))
when(timeseriesService.findLatest(any(), eq(customerId), eq(Collections.singleton("${word}"))))
.thenReturn(Futures.immediateFuture(timeseries));
node.onMsg(ctx, msg);
verify(ctx).tellSuccess(msg);
assertEquals(msg.getMetaData().getValue("tempo"), "highest");
assertEquals(msg.getMetaData().getValue("result"), "highest");
}
private void entityAttributeFetched(CustomerId customerId) {
List<AttributeKvEntry> attributes = Lists.newArrayList(new BaseAttributeKvEntry(new StringDataEntry("temperature", "high"), 1L));
when(ctx.getAttributesService()).thenReturn(attributesService);
when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), eq(Collections.singleton("temperature"))))
when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), eq(Collections.singleton("${word}"))))
.thenReturn(Futures.immediateFuture(attributes));
node.onMsg(ctx, msg);
verify(ctx).tellSuccess(msg);
assertEquals(msg.getMetaData().getValue("tempo"), "high");
assertEquals(msg.getMetaData().getValue("result"), "high");
}
}

299
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNodeTest.java

@ -0,0 +1,299 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.metadata;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.user.UserService;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
@RunWith(MockitoJUnitRunner.Silent.class)
public class TbGetRelatedAttributeNodeTest {
private final CustomerId customerId = new CustomerId(Uuids.timeBased());
private final RuleChainId ruleChainId = new RuleChainId(Uuids.timeBased());
private final RuleNodeId ruleNodeId = new RuleNodeId(Uuids.timeBased());
private TbGetRelatedAttributeNode node;
@Mock
private TbContext ctx;
@Mock
private AttributesService attributesService;
@Mock
private TimeseriesService timeseriesService;
@Mock
private UserService userService;
@Mock
private AssetService assetService;
@Mock
private DeviceService deviceService;
@Mock
private RelationService relationService;
private TbMsg msg;
private Map<String, String> metaData;
private EntityRelation entityRelation;
@Before
public void init() throws TbNodeException {
TbGetRelatedAttrNodeConfiguration config = new TbGetRelatedAttrNodeConfiguration();
config = config.defaultConfiguration();
Map<String, String> conf = new HashMap<>();
conf.put("${word}", "result");
config.setAttrMapping(conf);
config.setTelemetry(false);
ObjectMapper mapper = new ObjectMapper();
TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config));
metaData = new HashMap<>();
metaData.putIfAbsent("word", "temperature");
entityRelation = new EntityRelation();
entityRelation.setTo(customerId);
entityRelation.setType(EntityRelation.CONTAINS_TYPE);
when(ctx.getRelationService()).thenReturn(relationService);
node = new TbGetRelatedAttributeNode();
node.init(null, nodeConfiguration);
}
@Test
public void errorThrownIfCannotLoadAttributes() {
UserId userId = new UserId(Uuids.timeBased());
User user = new User();
user.setCustomerId(customerId);
entityRelation.setFrom(userId);
when(relationService.findByQuery(any(), any())).thenReturn(Futures.immediateFuture(List.of(entityRelation)));
msg = TbMsg.newMsg("USER", userId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
when(ctx.getUserService()).thenReturn(userService);
when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(user));
when(ctx.getAttributesService()).thenReturn(attributesService);
when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), eq(Collections.singleton("${word}"))))
.thenThrow(new IllegalStateException("something wrong"));
node.onMsg(ctx, msg);
final ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
verify(ctx).tellFailure(same(msg), captor.capture());
Throwable value = captor.getValue();
assertEquals("something wrong", value.getMessage());
assertTrue(msg.getMetaData().getData().isEmpty());
}
@Test
public void errorThrownIfCannotLoadAttributesAsync() {
UserId userId = new UserId(Uuids.timeBased());
User user = new User();
user.setCustomerId(customerId);
entityRelation.setFrom(userId);
when(relationService.findByQuery(any(), any())).thenReturn(Futures.immediateFuture(List.of(entityRelation)));
msg = TbMsg.newMsg("USER", userId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
when(ctx.getUserService()).thenReturn(userService);
when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(user));
when(ctx.getAttributesService()).thenReturn(attributesService);
when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), eq(Collections.singleton("${word}"))))
.thenReturn(Futures.immediateFailedFuture(new IllegalStateException("something wrong")));
node.onMsg(ctx, msg);
final ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
verify(ctx).tellFailure(same(msg), captor.capture());
Throwable value = captor.getValue();
assertEquals("something wrong", value.getMessage());
assertTrue(msg.getMetaData().getData().isEmpty());
}
@Test
public void failedChainUsedIfCustomerCannotBeFound() {
UserId userId = new UserId(Uuids.timeBased());
User user = new User();
user.setCustomerId(customerId);
entityRelation.setFrom(customerId);
entityRelation.setTo(null);
when(relationService.findByQuery(any(), any())).thenReturn(Futures.immediateFuture(List.of(entityRelation)));
msg = TbMsg.newMsg("USER", userId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
when(ctx.getUserService()).thenReturn(userService);
when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(null));
node.onMsg(ctx, msg);
verify(ctx).tellNext(msg, FAILURE);
assertTrue(msg.getMetaData().getData().isEmpty());
entityRelation.setTo(customerId);
}
@Test
public void customerAttributeAddedInMetadata() {
entityRelation.setFrom(customerId);
when(relationService.findByQuery(any(), any())).thenReturn(Futures.immediateFuture(List.of(entityRelation)));
msg = TbMsg.newMsg("CUSTOMER", customerId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
entityAttributeFetched(customerId);
}
@Test
public void usersCustomerAttributesFetched() {
UserId userId = new UserId(Uuids.timeBased());
User user = new User();
user.setCustomerId(customerId);
entityRelation.setFrom(userId);
when(relationService.findByQuery(any(), any())).thenReturn(Futures.immediateFuture(List.of(entityRelation)));
msg = TbMsg.newMsg("USER", userId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
when(ctx.getUserService()).thenReturn(userService);
when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(user));
entityAttributeFetched(customerId);
}
@Test
public void assetsCustomerAttributesFetched() {
AssetId assetId = new AssetId(Uuids.timeBased());
Asset asset = new Asset();
asset.setCustomerId(customerId);
entityRelation.setFrom(assetId);
when(relationService.findByQuery(any(), any())).thenReturn(Futures.immediateFuture(List.of(entityRelation)));
msg = TbMsg.newMsg("USER", assetId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
when(ctx.getAssetService()).thenReturn(assetService);
when(assetService.findAssetByIdAsync(any(), eq(assetId))).thenReturn(Futures.immediateFuture(asset));
entityAttributeFetched(customerId);
}
@Test
public void deviceCustomerAttributesFetched() {
DeviceId deviceId = new DeviceId(Uuids.timeBased());
Device device = new Device();
device.setCustomerId(customerId);
entityRelation.setFrom(deviceId);
when(relationService.findByQuery(any(), any())).thenReturn(Futures.immediateFuture(List.of(entityRelation)));
msg = TbMsg.newMsg("USER", deviceId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
when(ctx.getDeviceService()).thenReturn(deviceService);
when(deviceService.findDeviceByIdAsync(any(), eq(deviceId))).thenReturn(Futures.immediateFuture(device));
entityAttributeFetched(customerId);
}
@Test
public void deviceCustomerTelemetryFetched() throws TbNodeException {
TbGetRelatedAttrNodeConfiguration config = new TbGetRelatedAttrNodeConfiguration();
config = config.defaultConfiguration();
Map<String, String> conf = new HashMap<>();
conf.put("${word}", "result");
config.setAttrMapping(conf);
config.setTelemetry(true);
ObjectMapper mapper = new ObjectMapper();
TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config));
node = new TbGetRelatedAttributeNode();
node.init(null, nodeConfiguration);
DeviceId deviceId = new DeviceId(Uuids.timeBased());
Device device = new Device();
device.setCustomerId(customerId);
entityRelation.setFrom(deviceId);
when(relationService.findByQuery(any(), any())).thenReturn(Futures.immediateFuture(List.of(entityRelation)));
msg = TbMsg.newMsg("USER", deviceId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
when(ctx.getDeviceService()).thenReturn(deviceService);
when(deviceService.findDeviceByIdAsync(any(), eq(deviceId))).thenReturn(Futures.immediateFuture(device));
List<TsKvEntry> timeseries = Lists.newArrayList(new BasicTsKvEntry(1L, new StringDataEntry("temperature", "highest")));
when(ctx.getTimeseriesService()).thenReturn(timeseriesService);
when(timeseriesService.findLatest(any(), eq(customerId), eq(Collections.singleton("${word}"))))
.thenReturn(Futures.immediateFuture(timeseries));
node.onMsg(ctx, msg);
verify(ctx).tellSuccess(msg);
assertEquals(msg.getMetaData().getValue("result"), "highest");
}
private void entityAttributeFetched(CustomerId customerId) {
List<AttributeKvEntry> attributes = Lists.newArrayList(new BaseAttributeKvEntry(new StringDataEntry("temperature", "high"), 1L));
when(ctx.getAttributesService()).thenReturn(attributesService);
when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), eq(Collections.singleton("${word}"))))
.thenReturn(Futures.immediateFuture(attributes));
node.onMsg(ctx, msg);
verify(ctx).tellSuccess(msg);
assertEquals(msg.getMetaData().getValue("result"), "high");
}
}

278
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNodeTest.java

@ -0,0 +1,278 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.metadata;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.user.UserService;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
@RunWith(MockitoJUnitRunner.Silent.class)
public class TbGetTenantAttributeNodeTest {
private TbGetTenantAttributeNode node;
@Mock
private TbContext ctx;
@Mock
private AttributesService attributesService;
@Mock
private TimeseriesService timeseriesService;
@Mock
private UserService userService;
@Mock
private AssetService assetService;
@Mock
private DeviceService deviceService;
private TbMsg msg;
private Map metaData;
private final RuleChainId ruleChainId = new RuleChainId(Uuids.timeBased());
private final RuleNodeId ruleNodeId = new RuleNodeId(Uuids.timeBased());
@Before
public void init() throws TbNodeException {
TbGetEntityAttrNodeConfiguration config = new TbGetEntityAttrNodeConfiguration();
Map<String, String> conf = new HashMap<>();
conf.put("${word}", "result");
config.setAttrMapping(conf);
config.setTelemetry(false);
ObjectMapper mapper = new ObjectMapper();
TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config));
metaData = new HashMap<>();
metaData.putIfAbsent("word", "temperature");
node = new TbGetTenantAttributeNode();
node.init(null, nodeConfiguration);
}
@Test
public void errorThrownIfCannotLoadAttributes() {
UserId userId = new UserId(Uuids.timeBased());
TenantId tenantId = new TenantId(Uuids.timeBased());
User user = new User();
user.setTenantId(tenantId);
msg = TbMsg.newMsg("USER", userId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
when(ctx.getUserService()).thenReturn(userService);
when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(user));
when(ctx.getAttributesService()).thenReturn(attributesService);
when(attributesService.find(any(), eq(tenantId), eq(SERVER_SCOPE), eq(Collections.singleton("${word}"))))
.thenThrow(new IllegalStateException("something wrong"));
node.onMsg(ctx, msg);
final ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
verify(ctx).tellFailure(same(msg), captor.capture());
Throwable value = captor.getValue();
assertEquals("something wrong", value.getMessage());
assertTrue(msg.getMetaData().getData().isEmpty());
}
@Test
public void errorThrownIfCannotLoadAttributesAsync() {
UserId userId = new UserId(Uuids.timeBased());
TenantId tenantId = new TenantId(Uuids.timeBased());
User user = new User();
user.setTenantId(tenantId);
msg = TbMsg.newMsg("USER", userId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
when(ctx.getUserService()).thenReturn(userService);
when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(user));
when(ctx.getAttributesService()).thenReturn(attributesService);
when(attributesService.find(any(), eq(tenantId), eq(SERVER_SCOPE), eq(Collections.singleton("${word}"))))
.thenReturn(Futures.immediateFailedFuture(new IllegalStateException("something wrong")));
node.onMsg(ctx, msg);
final ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
verify(ctx).tellFailure(same(msg), captor.capture());
Throwable value = captor.getValue();
assertEquals("something wrong", value.getMessage());
assertTrue(msg.getMetaData().getData().isEmpty());
}
@Test
public void failedChainUsedIfCustomerCannotBeFound() {
UserId userId = new UserId(Uuids.timeBased());
CustomerId customerId = new CustomerId(Uuids.timeBased());
User user = new User();
user.setCustomerId(customerId);
msg = TbMsg.newMsg("USER", userId, new TbMsgMetaData(), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
when(ctx.getUserService()).thenReturn(userService);
when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(null));
node.onMsg(ctx, msg);
verify(ctx).tellNext(msg, FAILURE);
assertTrue(msg.getMetaData().getData().isEmpty());
}
@Test
public void customerAttributeAddedInMetadata() {
TenantId tenantId = new TenantId(Uuids.timeBased());
msg = TbMsg.newMsg("TENANT", tenantId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
entityAttributeFetched(tenantId);
}
@Test
public void usersCustomerAttributesFetched() {
UserId userId = new UserId(Uuids.timeBased());
TenantId tenantId = new TenantId(Uuids.timeBased());
User user = new User();
user.setTenantId(tenantId);
msg = TbMsg.newMsg("USER", userId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
when(ctx.getUserService()).thenReturn(userService);
when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(user));
entityAttributeFetched(tenantId);
}
@Test
public void assetsCustomerAttributesFetched() {
AssetId assetId = new AssetId(Uuids.timeBased());
TenantId tenantId = new TenantId(Uuids.timeBased());
Asset asset = new Asset();
asset.setTenantId(tenantId);
msg = TbMsg.newMsg("USER", assetId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
when(ctx.getAssetService()).thenReturn(assetService);
when(assetService.findAssetByIdAsync(any(), eq(assetId))).thenReturn(Futures.immediateFuture(asset));
entityAttributeFetched(tenantId);
}
@Test
public void deviceCustomerAttributesFetched() {
DeviceId deviceId = new DeviceId(Uuids.timeBased());
TenantId tenantId = new TenantId(Uuids.timeBased());
Device device = new Device();
device.setTenantId(tenantId);
msg = TbMsg.newMsg("USER", deviceId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
when(ctx.getDeviceService()).thenReturn(deviceService);
when(deviceService.findDeviceByIdAsync(any(), eq(deviceId))).thenReturn(Futures.immediateFuture(device));
entityAttributeFetched(tenantId);
}
@Test
public void deviceCustomerTelemetryFetched() throws TbNodeException {
TbGetEntityAttrNodeConfiguration config = new TbGetEntityAttrNodeConfiguration();
Map<String, String> conf = new HashMap<>();
conf.put("${word}", "result");
config.setAttrMapping(conf);
config.setTelemetry(true);
ObjectMapper mapper = new ObjectMapper();
TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config));
node = new TbGetTenantAttributeNode();
node.init(null, nodeConfiguration);
DeviceId deviceId = new DeviceId(Uuids.timeBased());
TenantId tenantId = new TenantId(Uuids.timeBased());
Device device = new Device();
device.setTenantId(tenantId);
msg = TbMsg.newMsg("USER", deviceId, new TbMsgMetaData(metaData), TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
when(ctx.getDeviceService()).thenReturn(deviceService);
when(deviceService.findDeviceByIdAsync(any(), eq(deviceId))).thenReturn(Futures.immediateFuture(device));
List<TsKvEntry> timeseries = Lists.newArrayList(new BasicTsKvEntry(1L, new StringDataEntry("temperature", "highest")));
when(ctx.getTimeseriesService()).thenReturn(timeseriesService);
when(timeseriesService.findLatest(any(), eq(tenantId), eq(Collections.singleton("${word}"))))
.thenReturn(Futures.immediateFuture(timeseries));
node.onMsg(ctx, msg);
verify(ctx).tellSuccess(msg);
assertEquals(msg.getMetaData().getValue("result"), "highest");
}
private void entityAttributeFetched(TenantId customerId) {
List<AttributeKvEntry> attributes = Lists.newArrayList(new BaseAttributeKvEntry(new StringDataEntry("temperature", "high"), 1L));
when(ctx.getAttributesService()).thenReturn(attributesService);
when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), eq(Collections.singleton("${word}"))))
.thenReturn(Futures.immediateFuture(attributes));
node.onMsg(ctx, msg);
verify(ctx).tellSuccess(msg);
assertEquals(msg.getMetaData().getValue("result"), "high");
}
}
Loading…
Cancel
Save