From 7a2a4b9d01dea906fb9588154d7280bf38df4c04 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Sat, 18 Sep 2021 16:59:14 +0300 Subject: [PATCH 1/3] TbGetTelemetryNode Aggregation feature allows you to fetch aggregated telemetry as a single value by AVG, COUNT, SUM, MIN, MAX, NONE. --- .../engine/metadata/TbGetTelemetryNode.java | 16 +++- .../TbGetTelemetryNodeConfiguration.java | 3 + .../metadata/TbGetTelemetryNodeTest.java | 83 +++++++++++++++++++ 3 files changed, 99 insertions(+), 3 deletions(-) create mode 100644 rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java index 3c3fa8e21a..8bc3336fe5 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java @@ -15,7 +15,6 @@ */ package org.thingsboard.rule.engine.metadata; -import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.json.JsonWriteFeature; import com.fasterxml.jackson.databind.ObjectMapper; @@ -35,6 +34,7 @@ import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.server.common.data.kv.Aggregation; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; @@ -50,7 +50,6 @@ import java.util.stream.Collectors; import static org.thingsboard.rule.engine.metadata.TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL; import static org.thingsboard.rule.engine.metadata.TbGetTelemetryNodeConfiguration.FETCH_MODE_FIRST; import static org.thingsboard.rule.engine.metadata.TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE; -import static org.thingsboard.server.common.data.kv.Aggregation.NONE; /** * Created by mshvayka on 04.09.18. @@ -64,6 +63,7 @@ import static org.thingsboard.server.common.data.kv.Aggregation.NONE; "If selected fetch mode ALL Telemetry will be added like array into Message Metadata where key is Timestamp and value is value of Telemetry.
" + "If selected fetch mode FIRST or LAST Telemetry will be added like string without Timestamp.
" + "Also, the rule node allows you to select telemetry sampling order: ASC or DESC.
" + + "Aggregation feature allows you to fetch aggregated telemetry as a single value by AVG, COUNT, SUM, MIN, MAX, NONE.
" + "Note: The maximum size of the fetched array is 1000 records.\n ", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbEnrichmentNodeGetTelemetryFromDatabase") @@ -78,6 +78,7 @@ public class TbGetTelemetryNode implements TbNode { private ObjectMapper mapper; private String fetchMode; private String orderByFetchAll; + private Aggregation aggregation; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { @@ -89,11 +90,20 @@ public class TbGetTelemetryNode implements TbNode { if (StringUtils.isEmpty(orderByFetchAll)) { orderByFetchAll = ASC_ORDER; } + aggregation = parseAggregationConfig(config.getAggregation()); + mapper = new ObjectMapper(); mapper.configure(JsonWriteFeature.QUOTE_FIELD_NAMES.mappedFeature(), false); mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); } + Aggregation parseAggregationConfig(String aggName) { + if (StringUtils.isEmpty(aggName)) { + return Aggregation.NONE; + } + return Aggregation.valueOf(aggName); + } + @Override public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { if (tsKeyNames.isEmpty()) { @@ -121,7 +131,7 @@ public class TbGetTelemetryNode implements TbNode { private List buildQueries(TbMsg msg, List keys) { return keys.stream() - .map(key -> new BaseReadTsKvQuery(key, getInterval(msg).getStartTs(), getInterval(msg).getEndTs(), 1, limit, NONE, getOrderBy())) + .map(key -> new BaseReadTsKvQuery(key, getInterval(msg).getStartTs(), getInterval(msg).getEndTs(), 1, limit, aggregation, getOrderBy())) .collect(Collectors.toList()); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeConfiguration.java index e4e673a2ab..1233191af5 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeConfiguration.java @@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.metadata; import lombok.Data; import org.thingsboard.rule.engine.api.NodeConfiguration; +import org.thingsboard.server.common.data.kv.Aggregation; import java.util.Collections; import java.util.List; @@ -46,6 +47,7 @@ public class TbGetTelemetryNodeConfiguration implements NodeConfiguration latestTsKeyNames; @@ -63,6 +65,7 @@ public class TbGetTelemetryNodeConfiguration implements NodeConfiguration Date: Tue, 28 Sep 2021 16:38:53 +0300 Subject: [PATCH 2/3] TbGetTelemetryNode Aggregation feature - count aggIntervalStep when buildQueries for aggregation --- .../rule/engine/metadata/TbGetTelemetryNode.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java index 8bc3336fe5..03e0135715 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java @@ -130,8 +130,14 @@ public class TbGetTelemetryNode implements TbNode { } private List buildQueries(TbMsg msg, List keys) { + final Interval interval = getInterval(msg); + final long aggIntervalStep = Aggregation.NONE.equals(aggregation) ? 1 : + // exact how it validates on BaseTimeseriesService.validate() + // see CassandraBaseTimeseriesDao.findAllAsync() + interval.getEndTs() - interval.getStartTs(); + return keys.stream() - .map(key -> new BaseReadTsKvQuery(key, getInterval(msg).getStartTs(), getInterval(msg).getEndTs(), 1, limit, aggregation, getOrderBy())) + .map(key -> new BaseReadTsKvQuery(key, interval.getStartTs(), interval.getEndTs(), aggIntervalStep, limit, aggregation, getOrderBy())) .collect(Collectors.toList()); } From 3820d2bb1c4fa375ca76f3f37d896aa681e67e4a Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 12 Oct 2021 14:08:21 +0300 Subject: [PATCH 3/3] TbGetTelemetryNodeTest: fixed CE licence header --- .../metadata/TbGetTelemetryNodeTest.java | 35 ++++++------------- 1 file changed, 10 insertions(+), 25 deletions(-) diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java index d20685329a..533bdff03f 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java @@ -1,32 +1,17 @@ /** - * ThingsBoard, Inc. ("COMPANY") CONFIDENTIAL + * Copyright © 2016-2021 The Thingsboard Authors * - * Copyright © 2016-2021 ThingsBoard, Inc. All Rights Reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * NOTICE: All information contained herein is, and remains - * the property of ThingsBoard, Inc. and its suppliers, - * if any. The intellectual and technical concepts contained - * herein are proprietary to ThingsBoard, Inc. - * and its suppliers and may be covered by U.S. and Foreign Patents, - * patents in process, and are protected by trade secret or copyright law. + * http://www.apache.org/licenses/LICENSE-2.0 * - * Dissemination of this information or reproduction of this material is strictly forbidden - * unless prior written permission is obtained from COMPANY. - * - * Access to the source code contained herein is hereby forbidden to anyone except current COMPANY employees, - * managers or contractors who have executed Confidentiality and Non-disclosure agreements - * explicitly covering such access. - * - * The copyright notice above does not evidence any actual or intended publication - * or disclosure of this source code, which includes - * information that is confidential and/or proprietary, and is a trade secret, of COMPANY. - * ANY REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE, - * OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE CODE WITHOUT - * THE EXPRESS WRITTEN CONSENT OF COMPANY IS STRICTLY PROHIBITED, - * AND IN VIOLATION OF APPLICABLE LAWS AND INTERNATIONAL TREATIES. - * THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION - * DOES NOT CONVEY OR IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS, - * OR TO MANUFACTURE, USE, OR SELL ANYTHING THAT IT MAY DESCRIBE, IN WHOLE OR IN PART. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.thingsboard.rule.engine.metadata;