diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 28149246c9..fcf62a4262 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -227,6 +227,7 @@ cassandra: default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}" # Specify partitioning size for timestamp key-value storage. Example: MINUTES, HOURS, DAYS, MONTHS, INDEFINITE ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}" + ts_key_value_partitioning_always_exist_in_reading: "${TS_KV_PARTITIONING_ALWAYS_EXIST_IN_READING:false}" ts_key_value_partitions_max_cache_size: "${TS_KV_PARTITIONS_MAX_CACHE_SIZE:100000}" ts_key_value_ttl: "${TS_KV_TTL:0}" buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}" diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 3d704a0429..639693acac 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -93,6 +93,10 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD @Value("${cassandra.query.ts_key_value_partitioning}") private String partitioning; + @Getter + @Value("${cassandra.query.ts_key_value_partitioning_always_exist_in_reading:false}") + private boolean partitioningAlwaysExistInReading; + @Value("${cassandra.query.ts_key_value_partitions_max_cache_size:100000}") private long partitionsCacheSize; @@ -417,10 +421,27 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD if (isFixedPartitioning()) { //no need to fetch partitions from DB return Futures.immediateFuture(FIXED_PARTITION); } + if (isPartitioningAlwaysExistInReading()) { + return Futures.immediateFuture(calculatePartitions(minPartition, maxPartition)); + } TbResultSetFuture partitionsFuture = fetchPartitions(tenantId, entityId, query.getKey(), minPartition, maxPartition); return Futures.transformAsync(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor); } + List calculatePartitions(long minPartition, long maxPartition) { + if (minPartition == maxPartition) { + return Collections.singletonList(minPartition); + } + + List partitions = Arrays.asList(minPartition, maxPartition); + long currentPartition = minPartition; + while (maxPartition > (currentPartition = toPartitionTs(currentPartition + TimeUnit.DAYS.toMillis(32)))){ + partitions.add(currentPartition); + } + + return partitions; + } + private AsyncFunction, List> getFetchChunksAsyncFunction(TenantId tenantId, EntityId entityId, String key, Aggregation aggregation, long startTs, long endTs) { return partitions -> { try { diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoTest.java new file mode 100644 index 0000000000..c2ca768649 --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoTest.java @@ -0,0 +1,78 @@ +/** + * ThingsBoard, Inc. ("COMPANY") CONFIDENTIAL + * + * Copyright © 2016-2021 ThingsBoard, Inc. All Rights Reserved. + * + * NOTICE: All information contained herein is, and remains + * the property of ThingsBoard, Inc. and its suppliers, + * if any. The intellectual and technical concepts contained + * herein are proprietary to ThingsBoard, Inc. + * and its suppliers and may be covered by U.S. and Foreign Patents, + * patents in process, and are protected by trade secret or copyright law. + * + * Dissemination of this information or reproduction of this material is strictly forbidden + * unless prior written permission is obtained from COMPANY. + * + * Access to the source code contained herein is hereby forbidden to anyone except current COMPANY employees, + * managers or contractors who have executed Confidentiality and Non-disclosure agreements + * explicitly covering such access. + * + * The copyright notice above does not evidence any actual or intended publication + * or disclosure of this source code, which includes + * information that is confidential and/or proprietary, and is a trade secret, of COMPANY. + * ANY REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE, + * OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE CODE WITHOUT + * THE EXPRESS WRITTEN CONSENT OF COMPANY IS STRICTLY PROHIBITED, + * AND IN VIOLATION OF APPLICABLE LAWS AND INTERNATIONAL TREATIES. + * THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION + * DOES NOT CONVEY OR IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS, + * OR TO MANUFACTURE, USE, OR SELL ANYTHING THAT IT MAY DESCRIBE, IN WHOLE OR IN PART. + */ +package org.thingsboard.server.dao.timeseries; + +import lombok.extern.slf4j.Slf4j; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; +import org.thingsboard.server.dao.cassandra.CassandraCluster; +import org.thingsboard.server.dao.nosql.CassandraBufferedRateExecutor; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +@RunWith(SpringRunner.class) +@SpringBootTest(classes = CassandraBaseTimeseriesDao.class) +@TestPropertySource(properties = { + "cassandra.query.ts_key_value_partitioning=MONTHS", + "cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true", + "cassandra.query.ts_key_value_partitions_max_cache_size=100000", + "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true", + "cassandra.query.ts_key_value_partitions_cache_stats_interval=60", + "cassandra.query.ts_key_value_ttl=0", + "cassandra.query.set_null_values_enabled=false", +}) +@Slf4j +public class CassandraBaseTimeseriesDaoTest { + + @SpyBean + CassandraBaseTimeseriesDao tsDao; + + @MockBean(answer = Answers.RETURNS_MOCKS) + @Qualifier("CassandraCluster") + CassandraCluster cassandraCluster; + @MockBean + CassandraBufferedRateExecutor cassandraBufferedRateExecutor; + + @Test + public void testCalculatePartitions() { + assertThat(tsDao.calculatePartitions(0, 0)).isEqualTo(List.of(0L)); + assertThat(tsDao.calculatePartitions(0, 1)).isEqualTo(List.of(0L, 1L)); + } +} \ No newline at end of file