diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/Customer.java b/common/data/src/main/java/org/thingsboard/server/common/data/Customer.java index 03115a9940..078c97b1c0 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/Customer.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/Customer.java @@ -23,7 +23,7 @@ import org.thingsboard.server.common.data.id.TenantId; import com.fasterxml.jackson.databind.JsonNode; -public class Customer extends ContactBased implements HasName { +public class Customer extends ContactBased implements HasName, HasTenantId { private static final long serialVersionUID = -1599722990298929275L; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/Device.java b/common/data/src/main/java/org/thingsboard/server/common/data/Device.java index 13fa011d9d..95662c1292 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/Device.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/Device.java @@ -23,7 +23,7 @@ import org.thingsboard.server.common.data.id.TenantId; import com.fasterxml.jackson.databind.JsonNode; @EqualsAndHashCode(callSuper = true) -public class Device extends SearchTextBasedWithAdditionalInfo implements HasName { +public class Device extends SearchTextBasedWithAdditionalInfo implements HasName, HasTenantId, HasCustomerId { private static final long serialVersionUID = 2807343040519543363L; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/HasCustomerId.java b/common/data/src/main/java/org/thingsboard/server/common/data/HasCustomerId.java new file mode 100644 index 0000000000..e89eba8bbb --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/HasCustomerId.java @@ -0,0 +1,23 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data; + +import org.thingsboard.server.common.data.id.CustomerId; + +public interface HasCustomerId { + + CustomerId getCustomerId(); +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/HasTenantId.java b/common/data/src/main/java/org/thingsboard/server/common/data/HasTenantId.java new file mode 100644 index 0000000000..ebba003cc8 --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/HasTenantId.java @@ -0,0 +1,23 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data; + +import org.thingsboard.server.common.data.id.TenantId; + +public interface HasTenantId { + + TenantId getTenantId(); +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/User.java b/common/data/src/main/java/org/thingsboard/server/common/data/User.java index c893d644d9..3b957764fc 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/User.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/User.java @@ -25,7 +25,7 @@ import org.thingsboard.server.common.data.security.Authority; import com.fasterxml.jackson.databind.JsonNode; @EqualsAndHashCode(callSuper = true) -public class User extends SearchTextBasedWithAdditionalInfo implements HasName { +public class User extends SearchTextBasedWithAdditionalInfo implements HasName, HasTenantId, HasCustomerId { private static final long serialVersionUID = 8250339805336035966L; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/alarm/Alarm.java b/common/data/src/main/java/org/thingsboard/server/common/data/alarm/Alarm.java index 70f5042a75..125406c4a1 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/alarm/Alarm.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/alarm/Alarm.java @@ -22,6 +22,7 @@ import lombok.Builder; import lombok.Data; import org.thingsboard.server.common.data.BaseData; import org.thingsboard.server.common.data.HasName; +import org.thingsboard.server.common.data.HasTenantId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -31,7 +32,7 @@ import org.thingsboard.server.common.data.id.TenantId; @Data @Builder @AllArgsConstructor -public class Alarm extends BaseData implements HasName { +public class Alarm extends BaseData implements HasName, HasTenantId { private TenantId tenantId; private String type; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/asset/Asset.java b/common/data/src/main/java/org/thingsboard/server/common/data/asset/Asset.java index cc3c111724..c7b246c1ee 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/asset/Asset.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/asset/Asset.java @@ -17,16 +17,13 @@ package org.thingsboard.server.common.data.asset; import com.fasterxml.jackson.databind.JsonNode; import lombok.EqualsAndHashCode; -import org.thingsboard.server.common.data.HasAdditionalInfo; -import org.thingsboard.server.common.data.HasName; -import org.thingsboard.server.common.data.SearchTextBased; -import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo; +import org.thingsboard.server.common.data.*; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.TenantId; @EqualsAndHashCode(callSuper = true) -public class Asset extends SearchTextBasedWithAdditionalInfo implements HasName { +public class Asset extends SearchTextBasedWithAdditionalInfo implements HasName, HasTenantId, HasCustomerId { private static final long serialVersionUID = 2807343040519543363L; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/plugin/PluginMetaData.java b/common/data/src/main/java/org/thingsboard/server/common/data/plugin/PluginMetaData.java index 8576264d5b..4c33ffe65c 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/plugin/PluginMetaData.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/plugin/PluginMetaData.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import lombok.EqualsAndHashCode; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.HasName; +import org.thingsboard.server.common.data.HasTenantId; import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo; import org.thingsboard.server.common.data.id.PluginId; import org.thingsboard.server.common.data.id.TenantId; @@ -32,7 +33,7 @@ import java.io.IOException; @EqualsAndHashCode(callSuper = true) @Slf4j -public class PluginMetaData extends SearchTextBasedWithAdditionalInfo implements HasName { +public class PluginMetaData extends SearchTextBasedWithAdditionalInfo implements HasName, HasTenantId { private static final long serialVersionUID = 1L; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChain.java b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChain.java index e82c850b9d..f2ba0cc66e 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChain.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChain.java @@ -21,6 +21,7 @@ import lombok.Data; import lombok.EqualsAndHashCode; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.HasName; +import org.thingsboard.server.common.data.HasTenantId; import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleNodeId; @@ -29,7 +30,7 @@ import org.thingsboard.server.common.data.id.TenantId; @Data @EqualsAndHashCode(callSuper = true) @Slf4j -public class RuleChain extends SearchTextBasedWithAdditionalInfo implements HasName { +public class RuleChain extends SearchTextBasedWithAdditionalInfo implements HasName, HasTenantId { private static final long serialVersionUID = -5656679015121935465L; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleMetaData.java b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleMetaData.java index 98adeb7742..953e5eb6bc 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleMetaData.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleMetaData.java @@ -23,6 +23,7 @@ import lombok.Data; import lombok.EqualsAndHashCode; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.HasName; +import org.thingsboard.server.common.data.HasTenantId; import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo; import org.thingsboard.server.common.data.id.RuleId; import org.thingsboard.server.common.data.id.TenantId; @@ -31,7 +32,7 @@ import org.thingsboard.server.common.data.plugin.ComponentLifecycleState; @Data @EqualsAndHashCode(callSuper = true) @Slf4j -public class RuleMetaData extends SearchTextBasedWithAdditionalInfo implements HasName { +public class RuleMetaData extends SearchTextBasedWithAdditionalInfo implements HasName, HasTenantId { private static final long serialVersionUID = -5656679015122935465L; diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistry.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistry.java index 8d254a08c6..3782ed22ed 100644 --- a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistry.java +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistry.java @@ -61,13 +61,14 @@ public class HostRequestIntervalRegistry { } public long tick(String clientHostId) { + IntervalCount intervalCount = hostCounts.computeIfAbsent(clientHostId, s -> new IntervalCount(intervalDurationMs)); + long currentCount = intervalCount.resetIfExpiredAndTick(); if (whiteList.contains(clientHostId)) { return 0; } else if (blackList.contains(clientHostId)) { return Long.MAX_VALUE; } - IntervalCount intervalCount = hostCounts.computeIfAbsent(clientHostId, s -> new IntervalCount(intervalDurationMs)); - return intervalCount.resetIfExpiredAndTick(); + return currentCount; } public void clean() { diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index a1c85e2bfb..ede076be4a 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -17,7 +17,17 @@ package org.thingsboard.rule.engine.api; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.cluster.ServerAddress; +import org.thingsboard.server.dao.alarm.AlarmService; +import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.attributes.AttributesService; +import org.thingsboard.server.dao.customer.CustomerService; +import org.thingsboard.server.dao.device.DeviceService; +import org.thingsboard.server.dao.plugin.PluginService; +import org.thingsboard.server.dao.relation.RelationService; +import org.thingsboard.server.dao.rule.RuleChainService; +import org.thingsboard.server.dao.rule.RuleService; +import org.thingsboard.server.dao.timeseries.TimeseriesService; +import org.thingsboard.server.dao.user.UserService; import java.util.UUID; @@ -40,6 +50,28 @@ public interface TbContext { void ack(TbMsg msg); + void tellError(TbMsg msg, Throwable th); + AttributesService getAttributesService(); + CustomerService getCustomerService(); + + UserService getUserService(); + + RuleService getRuleService(); + + PluginService getPluginService(); + + AssetService getAssetService(); + + DeviceService getDeviceService(); + + AlarmService getAlarmService(); + + RuleChainService getRuleChainService(); + + TimeseriesService getTimeseriesService(); + + RelationService getRelationService(); + } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeException.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeException.java index 6766999d50..b42ec8ec13 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeException.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeException.java @@ -22,6 +22,10 @@ import com.fasterxml.jackson.core.JsonProcessingException; */ public class TbNodeException extends Exception { + public TbNodeException(String message) { + super(message); + } + public TbNodeException(Exception e) { super(e); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/DonAsynchron.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/DonAsynchron.java new file mode 100644 index 0000000000..4fed574638 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/DonAsynchron.java @@ -0,0 +1,45 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + +import javax.annotation.Nullable; +import java.util.function.Consumer; + +public class DonAsynchron { + + public static void withCallback(ListenableFuture future, Consumer onSuccess, Consumer onFailure) { + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable T result) { + try { + onSuccess.accept(result); + } catch (Throwable th) { + onFailure(th); + } + + } + + @Override + public void onFailure(Throwable t) { + onFailure.accept(t); + } + }); + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java new file mode 100644 index 0000000000..52850be9a6 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java @@ -0,0 +1,88 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.metadata; + +import com.google.common.base.Function; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.rule.engine.TbNodeUtils; +import org.thingsboard.rule.engine.api.*; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.msg.TbMsg; + +import java.util.List; +import java.util.stream.Collectors; + +import static org.thingsboard.rule.engine.DonAsynchron.withCallback; +import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE; + +public abstract class TbEntityGetAttrNode implements TbNode { + + private TbGetEntityAttrNodeConfiguration config; + + @Override + public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException { + this.config = TbNodeUtils.convert(configuration, TbGetEntityAttrNodeConfiguration.class); + } + + @Override + public void onMsg(TbContext ctx, TbMsg msg) { + try { + withCallback( + findEntityAsync(ctx, msg.getOriginator()), + entityId -> withCallback( + config.isTelemetry() ? getLatestTelemetry(ctx, entityId) : getAttributesAsync(ctx, entityId), + attributes -> putAttributesAndTell(ctx, msg, attributes), + t -> ctx.tellError(msg, t) + ), + t -> ctx.tellError(msg, t)); + } catch (Throwable th) { + ctx.tellError(msg, th); + } + } + + private ListenableFuture> getAttributesAsync(TbContext ctx, EntityId entityId) { + ListenableFuture> latest = ctx.getAttributesService().find(entityId, SERVER_SCOPE, config.getAttrMapping().keySet()); + return Futures.transform(latest, (Function, ? extends List>) l -> + l.stream().map(i -> (KvEntry) i).collect(Collectors.toList())); + } + + private ListenableFuture> getLatestTelemetry(TbContext ctx, EntityId entityId) { + ListenableFuture> latest = ctx.getTimeseriesService().findLatest(entityId, config.getAttrMapping().keySet()); + return Futures.transform(latest, (Function, ? extends List>) l -> + l.stream().map(i -> (KvEntry) i).collect(Collectors.toList())); + } + + + private void putAttributesAndTell(TbContext ctx, TbMsg msg, List attributes) { + attributes.forEach(r -> { + String attrName = config.getAttrMapping().get(r.getKey()); + msg.getMetaData().putValue(attrName, r.getValueAsString()); + }); + ctx.tellNext(msg); + } + + @Override + public void destroy() { + + } + + protected abstract ListenableFuture findEntityAsync(TbContext ctx, EntityId originator); + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java index 11c644c429..4d41921740 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java @@ -15,23 +15,27 @@ */ package org.thingsboard.rule.engine.metadata; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.thingsboard.rule.engine.TbNodeUtils; import org.thingsboard.rule.engine.api.*; -import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.dao.attributes.AttributesService; import java.util.List; +import static org.thingsboard.rule.engine.DonAsynchron.withCallback; +import static org.thingsboard.server.common.data.DataConstants.*; + /** * Created by ashvayka on 19.01.18. */ @Slf4j public class TbGetAttributesNode implements TbNode { - TbGetAttributesNodeConfiguration config; + private TbGetAttributesNodeConfiguration config; @Override public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException { @@ -40,26 +44,25 @@ public class TbGetAttributesNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException { - try { - //TODO: refactor this to work async and fetch attributes from cache. - AttributesService service = ctx.getAttributesService(); - fetchAttributes(msg, service, config.getClientAttributeNames(), DataConstants.CLIENT_SCOPE, "cs."); - fetchAttributes(msg, service, config.getServerAttributeNames(), DataConstants.SERVER_SCOPE, "ss."); - fetchAttributes(msg, service, config.getSharedAttributeNames(), DataConstants.SHARED_SCOPE, "shared."); - ctx.tellNext(msg); - } catch (Exception e) { - log.warn("[{}][{}] Failed to fetch attributes", msg.getOriginator(), msg.getId(), e); - throw new TbNodeException(e); - } + ListenableFuture> future = Futures.allAsList( + putAttrAsync(ctx, msg, CLIENT_SCOPE, config.getClientAttributeNames(), "cs."), + putAttrAsync(ctx, msg, SHARED_SCOPE, config.getSharedAttributeNames(), "shared."), + putAttrAsync(ctx, msg, SERVER_SCOPE, config.getServerAttributeNames(), "ss.")); + + withCallback(future, i -> ctx.tellNext(msg), t -> ctx.tellError(msg, t)); } - private void fetchAttributes(TbMsg msg, AttributesService service, List attributeNames, String scope, String prefix) throws InterruptedException, java.util.concurrent.ExecutionException { - if (attributeNames != null && attributeNames.isEmpty()) { - List attributes = service.find(msg.getOriginator(), scope, attributeNames).get(); - attributes.forEach(attr -> msg.getMetaData().putValue(prefix + attr.getKey(), attr.getValueAsString())); - } + private ListenableFuture putAttributesAsync(TbMsg msg, List attributes, String prefix) { + attributes.forEach(r -> msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString())); + return Futures.immediateFuture(null); } + private ListenableFuture putAttrAsync(TbContext ctx, TbMsg msg, String scope, List attributes, String prefix) { + return Futures.transform(ctx.getAttributesService().find(msg.getOriginator(), scope, attributes), + (AsyncFunction, Void>) i -> putAttributesAsync(msg, i, prefix)); + } + + @Override public void destroy() { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java new file mode 100644 index 0000000000..57f9b79e7b --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java @@ -0,0 +1,51 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.metadata; + +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.HasCustomerId; +import org.thingsboard.server.common.data.id.*; + +public class TbGetCustomerAttributeNode extends TbEntityGetAttrNode { + + @Override + protected ListenableFuture findEntityAsync(TbContext ctx, EntityId originator) { + + switch (originator.getEntityType()) { + case CUSTOMER: + return Futures.immediateFuture((CustomerId) originator); + case USER: + return getCustomerAsync(ctx.getUserService().findUserByIdAsync((UserId) originator)); + case ASSET: + return getCustomerAsync(ctx.getAssetService().findAssetByIdAsync((AssetId) originator)); + case DEVICE: + return getCustomerAsync(ctx.getDeviceService().findDeviceByIdAsync((DeviceId) originator)); + default: + return Futures.immediateFailedFuture(new TbNodeException("Unexpected originator EntityType " + originator)); + } + } + + private ListenableFuture getCustomerAsync(ListenableFuture future) { + return Futures.transform(future, (AsyncFunction) in -> { + return in != null ? Futures.immediateFuture(in.getCustomerId()) + : Futures.immediateFailedFuture(new IllegalStateException("Customer not found"));}); + } + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetEntityAttrNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetEntityAttrNodeConfiguration.java new file mode 100644 index 0000000000..a5e85c57d7 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetEntityAttrNodeConfiguration.java @@ -0,0 +1,28 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.metadata; + +import lombok.Data; + +import java.util.Map; +import java.util.Optional; + +@Data +public class TbGetEntityAttrNodeConfiguration { + + private Map attrMapping; + private boolean isTelemetry = false; +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java new file mode 100644 index 0000000000..75b0a6524b --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java @@ -0,0 +1,26 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.metadata; + +import lombok.Data; +import org.thingsboard.server.common.data.relation.EntitySearchDirection; + +@Data +public class TbGetRelatedAttrNodeConfiguration { + + private String relationType; + private EntitySearchDirection direction; +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java new file mode 100644 index 0000000000..5823c181de --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java @@ -0,0 +1,62 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.metadata; + +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.commons.collections.CollectionUtils; +import org.thingsboard.rule.engine.TbNodeUtils; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.TbNodeState; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.relation.EntityRelation; +import org.thingsboard.server.common.data.relation.EntitySearchDirection; +import org.thingsboard.server.dao.relation.RelationService; + +import java.util.List; + +import static org.thingsboard.server.common.data.relation.RelationTypeGroup.COMMON; + +public class TbGetRelatedAttributeNode extends TbEntityGetAttrNode { + + private TbGetRelatedAttrNodeConfiguration config; + + @Override + public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException { + this.config = TbNodeUtils.convert(configuration, TbGetRelatedAttrNodeConfiguration.class); + } + + @Override + protected ListenableFuture findEntityAsync(TbContext ctx, EntityId originator) { + RelationService relationService = ctx.getRelationService(); + if (config.getDirection() == EntitySearchDirection.FROM) { + ListenableFuture> asyncRelation = relationService.findByFromAndTypeAsync(originator, config.getRelationType(), COMMON); + return Futures.transform(asyncRelation, (AsyncFunction, EntityId>) + r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getTo()) + : Futures.immediateFailedFuture(new IllegalStateException("Relation not found"))); + } else if (config.getDirection() == EntitySearchDirection.TO) { + ListenableFuture> asyncRelation = relationService.findByToAndTypeAsync(originator, config.getRelationType(), COMMON); + return Futures.transform(asyncRelation, (AsyncFunction, EntityId>) + r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getFrom()) + : Futures.immediateFailedFuture(new IllegalStateException("Relation not found"))); + } + + return Futures.immediateFailedFuture(new IllegalStateException("Unknown direction")); + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java new file mode 100644 index 0000000000..2cf9a97d20 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java @@ -0,0 +1,64 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.metadata; + +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.HasTenantId; +import org.thingsboard.server.common.data.alarm.AlarmId; +import org.thingsboard.server.common.data.id.*; + +@Slf4j +public class TbGetTenantAttributeNode extends TbEntityGetAttrNode { + + @Override + protected ListenableFuture findEntityAsync(TbContext ctx, EntityId originator) { + + switch (originator.getEntityType()) { + case TENANT: + return Futures.immediateFuture((TenantId) originator); + case CUSTOMER: + return getTenantAsync(ctx.getCustomerService().findCustomerByIdAsync((CustomerId) originator)); + case USER: + return getTenantAsync(ctx.getUserService().findUserByIdAsync((UserId) originator)); + case RULE: + return getTenantAsync(ctx.getRuleService().findRuleByIdAsync((RuleId) originator)); + case PLUGIN: + return getTenantAsync(ctx.getPluginService().findPluginByIdAsync((PluginId) originator)); + case ASSET: + return getTenantAsync(ctx.getAssetService().findAssetByIdAsync((AssetId) originator)); + case DEVICE: + return getTenantAsync(ctx.getDeviceService().findDeviceByIdAsync((DeviceId) originator)); + case ALARM: + return getTenantAsync(ctx.getAlarmService().findAlarmByIdAsync((AlarmId) originator)); + case RULE_CHAIN: + return getTenantAsync(ctx.getRuleChainService().findRuleChainByIdAsync((RuleChainId) originator)); + default: + return Futures.immediateFailedFuture(new TbNodeException("Unexpected originator EntityType " + originator)); + } + } + + private ListenableFuture getTenantAsync(ListenableFuture future) { + return Futures.transform(future, (AsyncFunction) in -> { + return in != null ? Futures.immediateFuture(in.getTenantId()) + : Futures.immediateFailedFuture(new IllegalStateException("Tenant not found"));}); + } + +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java new file mode 100644 index 0000000000..8e5ddb8069 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java @@ -0,0 +1,266 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.metadata; + +import com.datastax.driver.core.utils.UUIDs; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.id.AssetId; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.UserId; +import org.thingsboard.server.common.data.kv.*; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.dao.asset.AssetService; +import org.thingsboard.server.dao.attributes.AttributesService; +import org.thingsboard.server.dao.device.DeviceService; +import org.thingsboard.server.dao.timeseries.TimeseriesService; +import org.thingsboard.server.dao.user.UserService; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE; + +@RunWith(MockitoJUnitRunner.class) +public class TbGetCustomerAttributeNodeTest { + + private TbGetCustomerAttributeNode node; + + @Mock + private TbContext ctx; + + @Mock + private AttributesService attributesService; + @Mock + private TimeseriesService timeseriesService; + @Mock + private UserService userService; + @Mock + private AssetService assetService; + @Mock + private DeviceService deviceService; + + private TbMsg msg; + + @Before + public void init() throws TbNodeException { + TbGetEntityAttrNodeConfiguration config = new TbGetEntityAttrNodeConfiguration(); + Map attrMapping = new HashMap<>(); + attrMapping.putIfAbsent("temperature", "tempo"); + config.setAttrMapping(attrMapping); + config.setTelemetry(false); + ObjectMapper mapper = new ObjectMapper(); + TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(); + nodeConfiguration.setData(mapper.valueToTree(config)); + + node = new TbGetCustomerAttributeNode(); + node.init(nodeConfiguration, null); + } + + @Test + public void errorThrownIfCannotLoadAttributes() { + UserId userId = new UserId(UUIDs.timeBased()); + CustomerId customerId = new CustomerId(UUIDs.timeBased()); + User user = new User(); + user.setCustomerId(customerId); + + msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), new byte[4]); + + when(ctx.getUserService()).thenReturn(userService); + when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(user)); + + when(ctx.getAttributesService()).thenReturn(attributesService); + when(attributesService.find(customerId, SERVER_SCOPE, Collections.singleton("temperature"))) + .thenThrow(new IllegalStateException("something wrong")); + + node.onMsg(ctx, msg); + final ArgumentCaptor captor = ArgumentCaptor.forClass(Throwable.class); + verify(ctx).tellError(same(msg), captor.capture()); + + Throwable value = captor.getValue(); + assertEquals("something wrong", value.getMessage()); + assertTrue(msg.getMetaData().getData().isEmpty()); + } + + @Test + public void errorThrownIfCannotLoadAttributesAsync() { + UserId userId = new UserId(UUIDs.timeBased()); + CustomerId customerId = new CustomerId(UUIDs.timeBased()); + User user = new User(); + user.setCustomerId(customerId); + + msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), new byte[4]); + + when(ctx.getUserService()).thenReturn(userService); + when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(user)); + + when(ctx.getAttributesService()).thenReturn(attributesService); + when(attributesService.find(customerId, SERVER_SCOPE, Collections.singleton("temperature"))) + .thenReturn(Futures.immediateFailedFuture(new IllegalStateException("something wrong"))); + + node.onMsg(ctx, msg); + final ArgumentCaptor captor = ArgumentCaptor.forClass(Throwable.class); + verify(ctx).tellError(same(msg), captor.capture()); + + Throwable value = captor.getValue(); + assertEquals("something wrong", value.getMessage()); + assertTrue(msg.getMetaData().getData().isEmpty()); + } + + @Test + public void errorThrownIfCustomerCannotBeFound() { + UserId userId = new UserId(UUIDs.timeBased()); + CustomerId customerId = new CustomerId(UUIDs.timeBased()); + User user = new User(); + user.setCustomerId(customerId); + + msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), new byte[4]); + + when(ctx.getUserService()).thenReturn(userService); + when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(null)); + + node.onMsg(ctx, msg); + final ArgumentCaptor captor = ArgumentCaptor.forClass(Throwable.class); + verify(ctx).tellError(same(msg), captor.capture()); + + Throwable value = captor.getValue(); + assertEquals(IllegalStateException.class, value.getClass()); + assertEquals("Customer not found", value.getMessage()); + assertTrue(msg.getMetaData().getData().isEmpty()); + } + + @Test + public void customerAttributeAddedInMetadata() { + CustomerId customerId = new CustomerId(UUIDs.timeBased()); + msg = new TbMsg(UUIDs.timeBased(), "CUSTOMER", customerId, new TbMsgMetaData(), new byte[4]); + entityAttributeFetched(customerId); + } + + @Test + public void usersCustomerAttributesFetched() { + UserId userId = new UserId(UUIDs.timeBased()); + CustomerId customerId = new CustomerId(UUIDs.timeBased()); + User user = new User(); + user.setCustomerId(customerId); + + msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), new byte[4]); + + when(ctx.getUserService()).thenReturn(userService); + when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(user)); + + entityAttributeFetched(customerId); + } + + @Test + public void assetsCustomerAttributesFetched() { + AssetId assetId = new AssetId(UUIDs.timeBased()); + CustomerId customerId = new CustomerId(UUIDs.timeBased()); + Asset asset = new Asset(); + asset.setCustomerId(customerId); + + msg = new TbMsg(UUIDs.timeBased(), "USER", assetId, new TbMsgMetaData(), new byte[4]); + + when(ctx.getAssetService()).thenReturn(assetService); + when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset)); + + entityAttributeFetched(customerId); + } + + @Test + public void deviceCustomerAttributesFetched() { + DeviceId deviceId = new DeviceId(UUIDs.timeBased()); + CustomerId customerId = new CustomerId(UUIDs.timeBased()); + Device device = new Device(); + device.setCustomerId(customerId); + + msg = new TbMsg(UUIDs.timeBased(), "USER", deviceId, new TbMsgMetaData(), new byte[4]); + + when(ctx.getDeviceService()).thenReturn(deviceService); + when(deviceService.findDeviceByIdAsync(deviceId)).thenReturn(Futures.immediateFuture(device)); + + entityAttributeFetched(customerId); + } + + @Test + public void deviceCustomerTelemetryFetched() throws TbNodeException { + TbGetEntityAttrNodeConfiguration config = new TbGetEntityAttrNodeConfiguration(); + Map attrMapping = new HashMap<>(); + attrMapping.putIfAbsent("temperature", "tempo"); + config.setAttrMapping(attrMapping); + config.setTelemetry(true); + ObjectMapper mapper = new ObjectMapper(); + TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(); + nodeConfiguration.setData(mapper.valueToTree(config)); + + node = new TbGetCustomerAttributeNode(); + node.init(nodeConfiguration, null); + + + DeviceId deviceId = new DeviceId(UUIDs.timeBased()); + CustomerId customerId = new CustomerId(UUIDs.timeBased()); + Device device = new Device(); + device.setCustomerId(customerId); + + msg = new TbMsg(UUIDs.timeBased(), "USER", deviceId, new TbMsgMetaData(), new byte[4]); + + when(ctx.getDeviceService()).thenReturn(deviceService); + when(deviceService.findDeviceByIdAsync(deviceId)).thenReturn(Futures.immediateFuture(device)); + + List timeseries = Lists.newArrayList(new BasicTsKvEntry(1L, new StringDataEntry("temperature", "highest"))); + + when(ctx.getTimeseriesService()).thenReturn(timeseriesService); + when(timeseriesService.findLatest(customerId, Collections.singleton("temperature"))) + .thenReturn(Futures.immediateFuture(timeseries)); + + node.onMsg(ctx, msg); + verify(ctx).tellNext(msg); + assertEquals(msg.getMetaData().getValue("tempo"), "highest"); + } + + private void entityAttributeFetched(CustomerId customerId) { + List attributes = Lists.newArrayList(new BaseAttributeKvEntry(new StringDataEntry("temperature", "high"), 1L)); + + when(ctx.getAttributesService()).thenReturn(attributesService); + when(attributesService.find(customerId, SERVER_SCOPE, Collections.singleton("temperature"))) + .thenReturn(Futures.immediateFuture(attributes)); + + node.onMsg(ctx, msg); + verify(ctx).tellNext(msg); + assertEquals(msg.getMetaData().getValue("tempo"), "high"); + } +} \ No newline at end of file