From 88307d6ef1bfd599fb46f3f68b46662856b95a7a Mon Sep 17 00:00:00 2001 From: David Date: Thu, 16 Aug 2018 12:03:12 +0300 Subject: [PATCH 1/6] address issue Add partition granularity to time series data #1006 https://github.com/thingsboard/thingsboard/issues/1006 --- .../CassandraBaseTimeseriesDao.java | 37 +++++++++++++------ .../dao/timeseries/TsPartitionDate.java | 7 +++- 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 7aa317cbf9..591023443c 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -54,10 +54,7 @@ import javax.annotation.PreDestroy; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Optional; +import java.util.*; import java.util.stream.Collectors; import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; @@ -76,6 +73,9 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem public static final String SELECT_PREFIX = "SELECT "; public static final String EQUALS_PARAM = " = ? "; + + private static List FIXED_PARTITION = Arrays.asList(new Long[]{0l}); + @Autowired private Environment environment; @@ -163,14 +163,28 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem } } + public boolean isFixedPartitioning(){ + return tsFormat.getTruncateUnit().equals(TsPartitionDate.FIXED_PARTITION); + } + + private ListenableFuture> getPartitionsFuture(TsKvQuery query, EntityId entityId, long minPartition, long maxPartition){ + + if(isFixedPartitioning()){ //no need to fetch partitions from DB + return Futures.immediateFuture(FIXED_PARTITION); + } + ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition); + + return Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor); + + } + private ListenableFuture> findAllAsyncWithLimit(EntityId entityId, TsKvQuery query) { + long minPartition = toPartitionTs(query.getStartTs()); long maxPartition = toPartitionTs(query.getEndTs()); - ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition); - + final ListenableFuture> partitionsListFuture = getPartitionsFuture(query,entityId,minPartition,maxPartition); final SimpleListenableFuture> resultFuture = new SimpleListenableFuture<>(); - final ListenableFuture> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor); Futures.addCallback(partitionsListFuture, new FutureCallback>() { @Override @@ -181,7 +195,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @Override public void onFailure(Throwable t) { - log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t); + log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), toPartitionTs(query.getStartTs()), toPartitionTs(query.getEndTs()), t); } }, readResultsProcessingExecutor); @@ -229,10 +243,8 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem final long endTs = query.getEndTs(); final long ts = startTs + (endTs - startTs) / 2; - ResultSetFuture partitionsFuture = fetchPartitions(entityId, key, minPartition, maxPartition); - - ListenableFuture> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor); + ListenableFuture> partitionsListFuture = getPartitionsFuture(query,entityId, minPartition, maxPartition); ListenableFuture> aggregationChunks = Futures.transformAsync(partitionsListFuture, getFetchChunksAsyncFunction(entityId, key, aggregation, startTs, endTs), readResultsProcessingExecutor); @@ -308,6 +320,9 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @Override public ListenableFuture savePartition(EntityId entityId, long tsKvEntryTs, String key, long ttl) { + if(isFixedPartitioning()){ + return getFuture(null, rs -> null); + } ttl = computeTtl(ttl); long partition = toPartitionTs(tsKvEntryTs); log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsPartitionDate.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsPartitionDate.java index 4148004dad..d9dfc7dc93 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsPartitionDate.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsPartitionDate.java @@ -16,22 +16,25 @@ package org.thingsboard.server.dao.timeseries; import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; import java.time.temporal.TemporalUnit; import java.util.Optional; public enum TsPartitionDate { - MINUTES("yyyy-MM-dd-HH-mm", ChronoUnit.MINUTES), HOURS("yyyy-MM-dd-HH", ChronoUnit.HOURS), DAYS("yyyy-MM-dd", ChronoUnit.DAYS), MONTHS("yyyy-MM", ChronoUnit.MONTHS), YEARS("yyyy", ChronoUnit.YEARS); + MINUTES("yyyy-MM-dd-HH-mm", ChronoUnit.MINUTES), HOURS("yyyy-MM-dd-HH", ChronoUnit.HOURS), DAYS("yyyy-MM-dd", ChronoUnit.DAYS), MONTHS("yyyy-MM", ChronoUnit.MONTHS), YEARS("yyyy", ChronoUnit.YEARS),INDEFINITE("",ChronoUnit.FOREVER); private final String pattern; private final transient TemporalUnit truncateUnit; + public final static LocalDateTime FIXED_PARTITION = LocalDateTime.ofEpochSecond(0,0, ZoneOffset.UTC); TsPartitionDate(String pattern, TemporalUnit truncateUnit) { this.pattern = pattern; this.truncateUnit = truncateUnit; } + public String getPattern() { return pattern; } @@ -46,6 +49,8 @@ public enum TsPartitionDate { return time.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1); case YEARS: return time.truncatedTo(ChronoUnit.DAYS).withDayOfYear(1); + case INDEFINITE: + return FIXED_PARTITION; default: return time.truncatedTo(truncateUnit); } From 2b212d777c67d1eb7b6a30011ae267cf29c47554 Mon Sep 17 00:00:00 2001 From: David Date: Thu, 16 Aug 2018 17:24:59 +0300 Subject: [PATCH 2/6] formatting ,added INDEFINITE to possible partitioning in yml ,savePartition returns immediateFuture(null) if FixedPartition mode enabled --- .../src/main/resources/thingsboard.yml | 2 +- .../dao/timeseries/BaseTimeseriesService.java | 8 +++--- .../CassandraBaseTimeseriesDao.java | 26 +++++++++---------- 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index f510ec48a5..4d7dc7f41e 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -201,7 +201,7 @@ cassandra: read_consistency_level: "${CASSANDRA_READ_CONSISTENCY_LEVEL:ONE}" write_consistency_level: "${CASSANDRA_WRITE_CONSISTENCY_LEVEL:ONE}" default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}" - # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS + # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS,INDEFINITE ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}" ts_key_value_ttl: "${TS_KV_TTL:0}" buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}" diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index c981378939..6c00735c4d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 591023443c..f25a8b4e63 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -163,18 +163,18 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem } } - public boolean isFixedPartitioning(){ - return tsFormat.getTruncateUnit().equals(TsPartitionDate.FIXED_PARTITION); + public boolean isFixedPartitioning() { + return tsFormat.getTruncateUnit().equals(TsPartitionDate.FIXED_PARTITION); } - private ListenableFuture> getPartitionsFuture(TsKvQuery query, EntityId entityId, long minPartition, long maxPartition){ + private ListenableFuture> getPartitionsFuture(TsKvQuery query, EntityId entityId, long minPartition, long maxPartition) { - if(isFixedPartitioning()){ //no need to fetch partitions from DB + if (isFixedPartitioning()) { //no need to fetch partitions from DB return Futures.immediateFuture(FIXED_PARTITION); } ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition); - return Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor); + return Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor); } @@ -183,7 +183,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem long minPartition = toPartitionTs(query.getStartTs()); long maxPartition = toPartitionTs(query.getEndTs()); - final ListenableFuture> partitionsListFuture = getPartitionsFuture(query,entityId,minPartition,maxPartition); + final ListenableFuture> partitionsListFuture = getPartitionsFuture(query, entityId, minPartition, maxPartition); final SimpleListenableFuture> resultFuture = new SimpleListenableFuture<>(); Futures.addCallback(partitionsListFuture, new FutureCallback>() { @@ -244,7 +244,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem final long ts = startTs + (endTs - startTs) / 2; - ListenableFuture> partitionsListFuture = getPartitionsFuture(query,entityId, minPartition, maxPartition); + ListenableFuture> partitionsListFuture = getPartitionsFuture(query, entityId, minPartition, maxPartition); ListenableFuture> aggregationChunks = Futures.transformAsync(partitionsListFuture, getFetchChunksAsyncFunction(entityId, key, aggregation, startTs, endTs), readResultsProcessingExecutor); @@ -320,8 +320,8 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @Override public ListenableFuture savePartition(EntityId entityId, long tsKvEntryTs, String key, long ttl) { - if(isFixedPartitioning()){ - return getFuture(null, rs -> null); + if (isFixedPartitioning()) { + return Futures.immediateFuture(null); } ttl = computeTtl(ttl); long partition = toPartitionTs(tsKvEntryTs); From 45cf12c4d9a40c34807f200a0a98a9f30189fd3e Mon Sep 17 00:00:00 2001 From: David Date: Thu, 16 Aug 2018 18:04:29 +0300 Subject: [PATCH 3/6] fixed License format --- .../server/dao/timeseries/BaseTimeseriesService.java | 8 ++++---- .../server/dao/timeseries/CassandraBaseTimeseriesDao.java | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index 6c00735c4d..c981378939 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index f25a8b4e63..7d5e7d4665 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. From 16fa955a4c737a8c57b855457b577c0f790966ab Mon Sep 17 00:00:00 2001 From: David Date: Thu, 16 Aug 2018 22:22:32 +0300 Subject: [PATCH 4/6] fixed wildcard imports --- .../server/dao/timeseries/CassandraBaseTimeseriesDao.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 7d5e7d4665..8aeb32aeca 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -54,7 +54,11 @@ import javax.annotation.PreDestroy; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; -import java.util.*; +import java.util.Arrays; +import java.util.List; +import java.util.ArrayList; +import java.util.Optional; +import java.util.Collections; import java.util.stream.Collectors; import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; @@ -164,7 +168,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem } public boolean isFixedPartitioning() { - return tsFormat.getTruncateUnit().equals(TsPartitionDate.FIXED_PARTITION); + return tsFormat.getTruncateUnit().equals(TsPartitionDate.EPOCH_START); } private ListenableFuture> getPartitionsFuture(TsKvQuery query, EntityId entityId, long minPartition, long maxPartition) { From 3363a33ccd2528445c3cbd711f1df58b5553f981 Mon Sep 17 00:00:00 2001 From: David Date: Thu, 16 Aug 2018 22:24:37 +0300 Subject: [PATCH 5/6] changed to EPOCH_START --- .../thingsboard/server/dao/timeseries/TsPartitionDate.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsPartitionDate.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsPartitionDate.java index d9dfc7dc93..93283fdda8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsPartitionDate.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsPartitionDate.java @@ -27,7 +27,7 @@ public enum TsPartitionDate { private final String pattern; private final transient TemporalUnit truncateUnit; - public final static LocalDateTime FIXED_PARTITION = LocalDateTime.ofEpochSecond(0,0, ZoneOffset.UTC); + public final static LocalDateTime EPOCH_START = LocalDateTime.ofEpochSecond(0,0, ZoneOffset.UTC); TsPartitionDate(String pattern, TemporalUnit truncateUnit) { this.pattern = pattern; @@ -50,7 +50,7 @@ public enum TsPartitionDate { case YEARS: return time.truncatedTo(ChronoUnit.DAYS).withDayOfYear(1); case INDEFINITE: - return FIXED_PARTITION; + return EPOCH_START; default: return time.truncatedTo(truncateUnit); } From 7f01a9800364c469dad63739b881b2921f5e8dfa Mon Sep 17 00:00:00 2001 From: David Date: Thu, 16 Aug 2018 22:46:28 +0300 Subject: [PATCH 6/6] extra space styling --- .../server/dao/timeseries/CassandraBaseTimeseriesDao.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 8aeb32aeca..c025e64a31 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -78,7 +78,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem public static final String EQUALS_PARAM = " = ? "; - private static List FIXED_PARTITION = Arrays.asList(new Long[]{0l}); + private static List FIXED_PARTITION = Arrays.asList(new Long[]{0L}); @Autowired private Environment environment; @@ -172,14 +172,11 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem } private ListenableFuture> getPartitionsFuture(TsKvQuery query, EntityId entityId, long minPartition, long maxPartition) { - if (isFixedPartitioning()) { //no need to fetch partitions from DB return Futures.immediateFuture(FIXED_PARTITION); } ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition); - return Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor); - } private ListenableFuture> findAllAsyncWithLimit(EntityId entityId, TsKvQuery query) {