From 7fc712a3f7aaaf243eda351c7d095032ba70b396 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Tue, 21 Mar 2017 17:21:57 +0200 Subject: [PATCH] NPE fix for data aggregation function in case of TEXT values --- .../AggregatePartitionsFunction.java | 201 +++++++++--------- .../dao/timeseries/BaseTimeseriesDao.java | 1 + 2 files changed, 106 insertions(+), 96 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java index f099eec004..2362f5822e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java @@ -17,6 +17,7 @@ package org.thingsboard.server.dao.timeseries; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.kv.*; import javax.annotation.Nullable; @@ -26,6 +27,7 @@ import java.util.Optional; /** * Created by ashvayka on 20.02.17. */ +@Slf4j public class AggregatePartitionsFunction implements com.google.common.base.Function, Optional> { private static final int LONG_CNT_POS = 0; @@ -50,111 +52,118 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct @Nullable @Override public Optional apply(@Nullable List rsList) { - if (rsList == null || rsList.size() == 0) { - return Optional.empty(); - } - long count = 0; - DataType dataType = null; - - Boolean bValue = null; - String sValue = null; - Double dValue = null; - Long lValue = null; - - for (ResultSet rs : rsList) { - for (Row row : rs.all()) { - long curCount; - - Long curLValue = null; - Double curDValue = null; - Boolean curBValue = null; - String curSValue = null; - - long longCount = row.getLong(LONG_CNT_POS); - long doubleCount = row.getLong(DOUBLE_CNT_POS); - long boolCount = row.getLong(BOOL_CNT_POS); - long strCount = row.getLong(STR_CNT_POS); - - if (longCount > 0) { - dataType = DataType.LONG; - curCount = longCount; - curLValue = getLongValue(row); - } else if (doubleCount > 0) { - dataType = DataType.DOUBLE; - curCount = doubleCount; - curDValue = getDoubleValue(row); - } else if (boolCount > 0) { - dataType = DataType.BOOLEAN; - curCount = boolCount; - curBValue = getBooleanValue(row); - } else if (strCount > 0) { - dataType = DataType.STRING; - curCount = strCount; - curSValue = getStringValue(row); - } else { - continue; - } - - if (aggregation == Aggregation.COUNT) { - count += curCount; - } else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) { - count += curCount; - if (curDValue != null) { - dValue = dValue == null ? curDValue : dValue + curDValue; - } else if (curLValue != null) { - lValue = lValue == null ? curLValue : lValue + curLValue; + try { + log.trace("[{}][{}][{}] Going to aggregate data", key, ts, aggregation); + if (rsList == null || rsList.size() == 0) { + return Optional.empty(); + } + long count = 0; + DataType dataType = null; + + Boolean bValue = null; + String sValue = null; + Double dValue = null; + Long lValue = null; + + for (ResultSet rs : rsList) { + for (Row row : rs.all()) { + long curCount; + + Long curLValue = null; + Double curDValue = null; + Boolean curBValue = null; + String curSValue = null; + + long longCount = row.getLong(LONG_CNT_POS); + long doubleCount = row.getLong(DOUBLE_CNT_POS); + long boolCount = row.getLong(BOOL_CNT_POS); + long strCount = row.getLong(STR_CNT_POS); + + if (longCount > 0) { + dataType = DataType.LONG; + curCount = longCount; + curLValue = getLongValue(row); + } else if (doubleCount > 0) { + dataType = DataType.DOUBLE; + curCount = doubleCount; + curDValue = getDoubleValue(row); + } else if (boolCount > 0) { + dataType = DataType.BOOLEAN; + curCount = boolCount; + curBValue = getBooleanValue(row); + } else if (strCount > 0) { + dataType = DataType.STRING; + curCount = strCount; + curSValue = getStringValue(row); + } else { + continue; } - } else if (aggregation == Aggregation.MIN) { - if (curDValue != null) { - dValue = dValue == null ? curDValue : Math.min(dValue, curDValue); - } else if (curLValue != null) { - lValue = lValue == null ? curLValue : Math.min(lValue, curLValue); - } else if (curBValue != null) { - bValue = bValue == null ? curBValue : bValue && curBValue; - } else if (curSValue != null) { - if (sValue == null || curSValue.compareTo(sValue) < 0) { - sValue = curSValue; + + if (aggregation == Aggregation.COUNT) { + count += curCount; + } else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) { + count += curCount; + if (curDValue != null) { + dValue = dValue == null ? curDValue : dValue + curDValue; + } else if (curLValue != null) { + lValue = lValue == null ? curLValue : lValue + curLValue; } - } - } else if (aggregation == Aggregation.MAX) { - if (curDValue != null) { - dValue = dValue == null ? curDValue : Math.max(dValue, curDValue); - } else if (curLValue != null) { - lValue = lValue == null ? curLValue : Math.max(lValue, curLValue); - } else if (curBValue != null) { - bValue = bValue == null ? curBValue : bValue || curBValue; - } else if (curSValue != null) { - if (sValue == null || curSValue.compareTo(sValue) > 0) { - sValue = curSValue; + } else if (aggregation == Aggregation.MIN) { + if (curDValue != null) { + dValue = dValue == null ? curDValue : Math.min(dValue, curDValue); + } else if (curLValue != null) { + lValue = lValue == null ? curLValue : Math.min(lValue, curLValue); + } else if (curBValue != null) { + bValue = bValue == null ? curBValue : bValue && curBValue; + } else if (curSValue != null) { + if (sValue == null || curSValue.compareTo(sValue) < 0) { + sValue = curSValue; + } + } + } else if (aggregation == Aggregation.MAX) { + if (curDValue != null) { + dValue = dValue == null ? curDValue : Math.max(dValue, curDValue); + } else if (curLValue != null) { + lValue = lValue == null ? curLValue : Math.max(lValue, curLValue); + } else if (curBValue != null) { + bValue = bValue == null ? curBValue : bValue || curBValue; + } else if (curSValue != null) { + if (sValue == null || curSValue.compareTo(sValue) > 0) { + sValue = curSValue; + } } } } } - } - if (dataType == null) { - return Optional.empty(); - } else if (aggregation == Aggregation.COUNT) { - return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, (long) count))); - } else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) { - if (count == 0 || (dataType == DataType.DOUBLE && dValue == null) || (dataType == DataType.LONG && lValue == null)) { + if (dataType == null) { return Optional.empty(); - } else if (dataType == DataType.DOUBLE) { - return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.SUM ? dValue : (dValue / count)))); - } else if (dataType == DataType.LONG) { - return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, aggregation == Aggregation.SUM ? lValue : (lValue / count)))); - } - } else if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX) { - if (dataType == DataType.DOUBLE) { - return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, dValue))); - } else if (dataType == DataType.LONG) { - return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, lValue))); - } else if (dataType == DataType.STRING) { - return Optional.of(new BasicTsKvEntry(ts, new StringDataEntry(key, sValue))); - } else { - return Optional.of(new BasicTsKvEntry(ts, new BooleanDataEntry(key, bValue))); + } else if (aggregation == Aggregation.COUNT) { + return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, (long) count))); + } else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) { + if (count == 0 || (dataType == DataType.DOUBLE && dValue == null) || (dataType == DataType.LONG && lValue == null)) { + return Optional.empty(); + } else if (dataType == DataType.DOUBLE) { + return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.SUM ? dValue : (dValue / count)))); + } else if (dataType == DataType.LONG) { + return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, aggregation == Aggregation.SUM ? lValue : (lValue / count)))); + } + } else if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX) { + if (dataType == DataType.DOUBLE) { + return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, dValue))); + } else if (dataType == DataType.LONG) { + return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, lValue))); + } else if (dataType == DataType.STRING) { + return Optional.of(new BasicTsKvEntry(ts, new StringDataEntry(key, sValue))); + } else { + return Optional.of(new BasicTsKvEntry(ts, new BooleanDataEntry(key, bValue))); + } } + log.trace("[{}][{}][{}] Aggregated data is empty.", key, ts, aggregation); + return Optional.empty(); + }catch (Exception e){ + log.error("[{}][{}][{}] Failed to aggregate data", key, ts, aggregation, e); + return Optional.empty(); } - return null; } private Boolean getBooleanValue(Row row) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java index c7584fe511..cbf27fa9ac 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java @@ -215,6 +215,7 @@ public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao PreparedStatement proto = getFetchStmt(aggregation); List futures = new ArrayList<>(partitions.size()); for (Long partition : partitions) { + log.trace("Fetching data for partition [{}] for entityType {} and entityId {}", partition, entityType, entityId); BoundStatement stmt = proto.bind(); stmt.setString(0, entityType); stmt.setUUID(1, entityId);