From 3f3930525283259d433d1d73d5c191337a70b478 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 4 May 2022 10:15:51 +0300 Subject: [PATCH 1/8] ts_key_value_partitioning_always_exist_in_reading added to skip Cassandra ts_kv_partitions_cf reads --- .../src/main/resources/thingsboard.yml | 1 + .../CassandraBaseTimeseriesDao.java | 21 +++++ .../CassandraBaseTimeseriesDaoTest.java | 78 +++++++++++++++++++ 3 files changed, 100 insertions(+) create mode 100644 dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoTest.java diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 28149246c9..fcf62a4262 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -227,6 +227,7 @@ cassandra: default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}" # Specify partitioning size for timestamp key-value storage. Example: MINUTES, HOURS, DAYS, MONTHS, INDEFINITE ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}" + ts_key_value_partitioning_always_exist_in_reading: "${TS_KV_PARTITIONING_ALWAYS_EXIST_IN_READING:false}" ts_key_value_partitions_max_cache_size: "${TS_KV_PARTITIONS_MAX_CACHE_SIZE:100000}" ts_key_value_ttl: "${TS_KV_TTL:0}" buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}" diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 3d704a0429..639693acac 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -93,6 +93,10 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD @Value("${cassandra.query.ts_key_value_partitioning}") private String partitioning; + @Getter + @Value("${cassandra.query.ts_key_value_partitioning_always_exist_in_reading:false}") + private boolean partitioningAlwaysExistInReading; + @Value("${cassandra.query.ts_key_value_partitions_max_cache_size:100000}") private long partitionsCacheSize; @@ -417,10 +421,27 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD if (isFixedPartitioning()) { //no need to fetch partitions from DB return Futures.immediateFuture(FIXED_PARTITION); } + if (isPartitioningAlwaysExistInReading()) { + return Futures.immediateFuture(calculatePartitions(minPartition, maxPartition)); + } TbResultSetFuture partitionsFuture = fetchPartitions(tenantId, entityId, query.getKey(), minPartition, maxPartition); return Futures.transformAsync(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor); } + List calculatePartitions(long minPartition, long maxPartition) { + if (minPartition == maxPartition) { + return Collections.singletonList(minPartition); + } + + List partitions = Arrays.asList(minPartition, maxPartition); + long currentPartition = minPartition; + while (maxPartition > (currentPartition = toPartitionTs(currentPartition + TimeUnit.DAYS.toMillis(32)))){ + partitions.add(currentPartition); + } + + return partitions; + } + private AsyncFunction, List> getFetchChunksAsyncFunction(TenantId tenantId, EntityId entityId, String key, Aggregation aggregation, long startTs, long endTs) { return partitions -> { try { diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoTest.java new file mode 100644 index 0000000000..c2ca768649 --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoTest.java @@ -0,0 +1,78 @@ +/** + * ThingsBoard, Inc. ("COMPANY") CONFIDENTIAL + * + * Copyright © 2016-2021 ThingsBoard, Inc. All Rights Reserved. + * + * NOTICE: All information contained herein is, and remains + * the property of ThingsBoard, Inc. and its suppliers, + * if any. The intellectual and technical concepts contained + * herein are proprietary to ThingsBoard, Inc. + * and its suppliers and may be covered by U.S. and Foreign Patents, + * patents in process, and are protected by trade secret or copyright law. + * + * Dissemination of this information or reproduction of this material is strictly forbidden + * unless prior written permission is obtained from COMPANY. + * + * Access to the source code contained herein is hereby forbidden to anyone except current COMPANY employees, + * managers or contractors who have executed Confidentiality and Non-disclosure agreements + * explicitly covering such access. + * + * The copyright notice above does not evidence any actual or intended publication + * or disclosure of this source code, which includes + * information that is confidential and/or proprietary, and is a trade secret, of COMPANY. + * ANY REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE, + * OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE CODE WITHOUT + * THE EXPRESS WRITTEN CONSENT OF COMPANY IS STRICTLY PROHIBITED, + * AND IN VIOLATION OF APPLICABLE LAWS AND INTERNATIONAL TREATIES. + * THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION + * DOES NOT CONVEY OR IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS, + * OR TO MANUFACTURE, USE, OR SELL ANYTHING THAT IT MAY DESCRIBE, IN WHOLE OR IN PART. + */ +package org.thingsboard.server.dao.timeseries; + +import lombok.extern.slf4j.Slf4j; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; +import org.thingsboard.server.dao.cassandra.CassandraCluster; +import org.thingsboard.server.dao.nosql.CassandraBufferedRateExecutor; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +@RunWith(SpringRunner.class) +@SpringBootTest(classes = CassandraBaseTimeseriesDao.class) +@TestPropertySource(properties = { + "cassandra.query.ts_key_value_partitioning=MONTHS", + "cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true", + "cassandra.query.ts_key_value_partitions_max_cache_size=100000", + "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true", + "cassandra.query.ts_key_value_partitions_cache_stats_interval=60", + "cassandra.query.ts_key_value_ttl=0", + "cassandra.query.set_null_values_enabled=false", +}) +@Slf4j +public class CassandraBaseTimeseriesDaoTest { + + @SpyBean + CassandraBaseTimeseriesDao tsDao; + + @MockBean(answer = Answers.RETURNS_MOCKS) + @Qualifier("CassandraCluster") + CassandraCluster cassandraCluster; + @MockBean + CassandraBufferedRateExecutor cassandraBufferedRateExecutor; + + @Test + public void testCalculatePartitions() { + assertThat(tsDao.calculatePartitions(0, 0)).isEqualTo(List.of(0L)); + assertThat(tsDao.calculatePartitions(0, 1)).isEqualTo(List.of(0L, 1L)); + } +} \ No newline at end of file From e19ce5a823d65848509193568224346df1e73929 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 4 May 2022 14:47:02 +0300 Subject: [PATCH 2/8] testCalculatePartitions for cassandra ts dao --- .../CassandraBaseTimeseriesDao.java | 11 +++-- .../CassandraBaseTimeseriesDaoTest.java | 41 +++++++++++++++++-- 2 files changed, 44 insertions(+), 8 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 639693acac..6521c88a6a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -82,8 +82,9 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD protected static final int MIN_AGGREGATION_STEP_MS = 1000; public static final String ASC_ORDER = "ASC"; public static final long SECONDS_IN_DAY = TimeUnit.DAYS.toSeconds(1); + static final long DAYS_32_MS = TimeUnit.DAYS.toMillis(32); - protected static List FIXED_PARTITION = Arrays.asList(new Long[]{0L}); + protected static final List FIXED_PARTITION = List.of(0L); private CassandraTsPartitionsCache cassandraTsPartitionsCache; @@ -341,7 +342,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD }, MoreExecutors.directExecutor()); } - private long toPartitionTs(long ts) { + long toPartitionTs(long ts) { LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC); return tsFormat.truncatedTo(time).toInstant(ZoneOffset.UTC).toEpochMilli(); } @@ -432,13 +433,15 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD if (minPartition == maxPartition) { return Collections.singletonList(minPartition); } + List partitions = new ArrayList<>(); + partitions.add(minPartition); - List partitions = Arrays.asList(minPartition, maxPartition); long currentPartition = minPartition; - while (maxPartition > (currentPartition = toPartitionTs(currentPartition + TimeUnit.DAYS.toMillis(32)))){ + while (maxPartition > (currentPartition = toPartitionTs(currentPartition + DAYS_32_MS))){ partitions.add(currentPartition); } + partitions.add(maxPartition); return partitions; } diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoTest.java index c2ca768649..fd9736c96c 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoTest.java @@ -34,22 +34,25 @@ import lombok.extern.slf4j.Slf4j; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Answers; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.mock.mockito.MockBean; -import org.springframework.boot.test.mock.mockito.SpyBean; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit4.SpringRunner; import org.thingsboard.server.dao.cassandra.CassandraCluster; import org.thingsboard.server.dao.nosql.CassandraBufferedRateExecutor; +import java.text.ParseException; import java.util.List; +import static org.apache.commons.lang3.time.DateFormatUtils.ISO_DATETIME_TIME_ZONE_FORMAT; import static org.assertj.core.api.Assertions.assertThat; @RunWith(SpringRunner.class) @SpringBootTest(classes = CassandraBaseTimeseriesDao.class) @TestPropertySource(properties = { + "database.ts.type=cassandra", "cassandra.query.ts_key_value_partitioning=MONTHS", "cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true", "cassandra.query.ts_key_value_partitions_max_cache_size=100000", @@ -61,7 +64,7 @@ import static org.assertj.core.api.Assertions.assertThat; @Slf4j public class CassandraBaseTimeseriesDaoTest { - @SpyBean + @Autowired CassandraBaseTimeseriesDao tsDao; @MockBean(answer = Answers.RETURNS_MOCKS) @@ -71,8 +74,38 @@ public class CassandraBaseTimeseriesDaoTest { CassandraBufferedRateExecutor cassandraBufferedRateExecutor; @Test - public void testCalculatePartitions() { + public void testToPartitionsMonths() throws ParseException { + assertThat(tsDao.getPartitioning()).isEqualTo("MONTHS"); + assertThat(tsDao.toPartitionTs(ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime())).isEqualTo(1640995200000L); + assertThat(tsDao.toPartitionTs(ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-01T00:00:00Z").getTime())).isEqualTo(1651363200000L); + assertThat(tsDao.toPartitionTs(ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-01T00:00:01Z").getTime())).isEqualTo(1651363200000L); + assertThat(tsDao.toPartitionTs(ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-31T23:59:59Z").getTime())).isEqualTo(1651363200000L); + assertThat(tsDao.toPartitionTs(ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T23:59:59Z").getTime())).isEqualTo(1701388800000L); + } + + @Test + public void testCalculatePartitions() throws ParseException { + long startTs = tsDao.toPartitionTs(ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-12T00:00:00Z").getTime()); + long nextTs = tsDao.toPartitionTs(ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-01-31T23:59:59Z").getTime()); + long leapTs = tsDao.toPartitionTs(ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-02-29T23:59:59Z").getTime()); + long endTs = tsDao.toPartitionTs(ISO_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-31T23:59:59Z").getTime()); + + log.warn("startTs {}, nextTs {}, leapTs {}, endTs {}", startTs, nextTs, leapTs, endTs); + assertThat(tsDao.calculatePartitions(0, 0)).isEqualTo(List.of(0L)); assertThat(tsDao.calculatePartitions(0, 1)).isEqualTo(List.of(0L, 1L)); + assertThat(tsDao.calculatePartitions(startTs, startTs)).isEqualTo(List.of(1575158400000L)); + assertThat(tsDao.calculatePartitions(startTs, nextTs)).isEqualTo(List.of(1575158400000L, 1577836800000L)); + assertThat(tsDao.calculatePartitions(startTs, leapTs)).isEqualTo(List.of(1575158400000L, 1577836800000L, 1580515200000L)); + + assertThat(tsDao.calculatePartitions(startTs, endTs)).hasSize(14); + assertThat(tsDao.calculatePartitions(startTs, endTs)).isEqualTo(List.of( + 1575158400000L, + 1577836800000L, 1580515200000L, 1583020800000L, + 1585699200000L, 1588291200000L, 1590969600000L, + 1593561600000L, 1596240000000L, 1598918400000L, + 1601510400000L, 1604188800000L, 1606780800000L, + 1609459200000L)); } -} \ No newline at end of file + +} From 55192e5c8f831ee879bc344a2005549b5d6ffaf9 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 16 Nov 2022 10:03:20 +0100 Subject: [PATCH 3/8] cassandra partition - minor adjustments after chery pick --- .../CassandraBaseTimeseriesDao.java | 2 + .../CassandraBaseTimeseriesDaoTest.java | 43 +++++++------------ 2 files changed, 18 insertions(+), 27 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 6521c88a6a..8602d6a4ab 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -28,6 +28,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -91,6 +92,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD @Autowired private Environment environment; + @Getter @Value("${cassandra.query.ts_key_value_partitioning}") private String partitioning; diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoTest.java index fd9736c96c..962425bd14 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoTest.java @@ -1,32 +1,17 @@ /** - * ThingsBoard, Inc. ("COMPANY") CONFIDENTIAL + * Copyright © 2016-2022 The Thingsboard Authors * - * Copyright © 2016-2021 ThingsBoard, Inc. All Rights Reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * NOTICE: All information contained herein is, and remains - * the property of ThingsBoard, Inc. and its suppliers, - * if any. The intellectual and technical concepts contained - * herein are proprietary to ThingsBoard, Inc. - * and its suppliers and may be covered by U.S. and Foreign Patents, - * patents in process, and are protected by trade secret or copyright law. + * http://www.apache.org/licenses/LICENSE-2.0 * - * Dissemination of this information or reproduction of this material is strictly forbidden - * unless prior written permission is obtained from COMPANY. - * - * Access to the source code contained herein is hereby forbidden to anyone except current COMPANY employees, - * managers or contractors who have executed Confidentiality and Non-disclosure agreements - * explicitly covering such access. - * - * The copyright notice above does not evidence any actual or intended publication - * or disclosure of this source code, which includes - * information that is confidential and/or proprietary, and is a trade secret, of COMPANY. - * ANY REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE, - * OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE CODE WITHOUT - * THE EXPRESS WRITTEN CONSENT OF COMPANY IS STRICTLY PROHIBITED, - * AND IN VIOLATION OF APPLICABLE LAWS AND INTERNATIONAL TREATIES. - * THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION - * DOES NOT CONVEY OR IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS, - * OR TO MANUFACTURE, USE, OR SELL ANYTHING THAT IT MAY DESCRIBE, IN WHOLE OR IN PART. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.thingsboard.server.dao.timeseries; @@ -41,7 +26,8 @@ import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit4.SpringRunner; import org.thingsboard.server.dao.cassandra.CassandraCluster; -import org.thingsboard.server.dao.nosql.CassandraBufferedRateExecutor; +import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor; +import org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor; import java.text.ParseException; import java.util.List; @@ -70,8 +56,11 @@ public class CassandraBaseTimeseriesDaoTest { @MockBean(answer = Answers.RETURNS_MOCKS) @Qualifier("CassandraCluster") CassandraCluster cassandraCluster; + + @MockBean + CassandraBufferedRateReadExecutor cassandraBufferedRateReadExecutor; @MockBean - CassandraBufferedRateExecutor cassandraBufferedRateExecutor; + CassandraBufferedRateWriteExecutor cassandraBufferedRateWriteExecutor; @Test public void testToPartitionsMonths() throws ParseException { From 293a7ab0786ca4bfc03c15b9ae26a9538d3e5c45 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 16 Nov 2022 13:42:49 +0100 Subject: [PATCH 4/8] cassandra partition - implemented configurable period support for always_exist_in_reading mode --- .../CassandraBaseTimeseriesDao.java | 21 ++- ...esDaoPartitioningDaysAlwaysExistsTest.java | 93 ++++++++++++ ...sDaoPartitioningHoursAlwaysExistsTest.java | 93 ++++++++++++ ...rtitioningIndefiniteAlwaysExistsTest.java} | 40 ++---- ...aoPartitioningMinutesAlwaysExistsTest.java | 93 ++++++++++++ ...DaoPartitioningMonthsAlwaysExistsTest.java | 134 ++++++++++++++++++ ...sDaoPartitioningYearsAlwaysExistsTest.java | 92 ++++++++++++ 7 files changed, 531 insertions(+), 35 deletions(-) create mode 100644 dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest.java create mode 100644 dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningHoursAlwaysExistsTest.java rename dao/src/test/java/org/thingsboard/server/dao/timeseries/{CassandraBaseTimeseriesDaoTest.java => CassandraBaseTimeseriesDaoPartitioningIndefiniteAlwaysExistsTest.java} (51%) create mode 100644 dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest.java create mode 100644 dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest.java create mode 100644 dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest.java diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 8602d6a4ab..7f37f6cc93 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -60,7 +60,6 @@ import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -83,8 +82,6 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD protected static final int MIN_AGGREGATION_STEP_MS = 1000; public static final String ASC_ORDER = "ASC"; public static final long SECONDS_IN_DAY = TimeUnit.DAYS.toSeconds(1); - static final long DAYS_32_MS = TimeUnit.DAYS.toMillis(32); - protected static final List FIXED_PARTITION = List.of(0L); private CassandraTsPartitionsCache cassandraTsPartitionsCache; @@ -110,6 +107,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD private boolean setNullValuesEnabled; private NoSqlTsPartitionDate tsFormat; + private long partitionDurationPlusOneMs; private PreparedStatement partitionInsertStmt; private PreparedStatement partitionInsertTtlStmt; @@ -134,6 +132,9 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD Optional partition = NoSqlTsPartitionDate.parse(partitioning); if (partition.isPresent()) { tsFormat = partition.get(); + if (!isFixedPartitioning()) { + partitionDurationPlusOneMs = partitionToMs(tsFormat) + 1; + } if (!isFixedPartitioning() && partitionsCacheSize > 0) { cassandraTsPartitionsCache = new CassandraTsPartitionsCache(partitionsCacheSize); } @@ -143,6 +144,18 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD } } + long partitionToMs(final NoSqlTsPartitionDate partition) { + switch (partition) { + case MINUTES: return TimeUnit.MINUTES.toMillis(1); + case HOURS: return TimeUnit.HOURS.toMillis(1); + case DAYS: return TimeUnit.DAYS.toMillis(1); + case MONTHS: return TimeUnit.DAYS.toMillis(31); //the longest month + case YEARS: return TimeUnit.DAYS.toMillis(366); // leap year + } + log.warn("Can not convert partition to milliseconds. There are no mapping [{}] for partitioning [{}]", partition, partitioning); + throw new RuntimeException("Failed to convert partition to ms: " + partitioning + "!"); + } + @PreDestroy public void stop() { super.stopExecutor(); @@ -439,7 +452,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD partitions.add(minPartition); long currentPartition = minPartition; - while (maxPartition > (currentPartition = toPartitionTs(currentPartition + DAYS_32_MS))){ + while (maxPartition > (currentPartition = toPartitionTs(currentPartition + partitionDurationPlusOneMs))){ partitions.add(currentPartition); } diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest.java new file mode 100644 index 0000000000..864fc12146 --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest.java @@ -0,0 +1,93 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.timeseries; + +import lombok.extern.slf4j.Slf4j; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; +import org.thingsboard.server.dao.cassandra.CassandraCluster; +import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor; +import org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor; + +import java.text.ParseException; +import java.util.List; + +import static org.apache.commons.lang3.time.DateFormatUtils.ISO_DATETIME_TIME_ZONE_FORMAT; +import static org.assertj.core.api.Assertions.assertThat; + +@RunWith(SpringRunner.class) +@SpringBootTest(classes = CassandraBaseTimeseriesDao.class) +@TestPropertySource(properties = { + "database.ts.type=cassandra", + "cassandra.query.ts_key_value_partitioning=DAYS", + "cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true", + "cassandra.query.ts_key_value_partitions_max_cache_size=100000", + "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true", + "cassandra.query.ts_key_value_partitions_cache_stats_interval=60", + "cassandra.query.ts_key_value_ttl=0", + "cassandra.query.set_null_values_enabled=false", +}) +@Slf4j +public class CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest { + + @Autowired + CassandraBaseTimeseriesDao tsDao; + + @MockBean(answer = Answers.RETURNS_MOCKS) + @Qualifier("CassandraCluster") + CassandraCluster cassandraCluster; + + @MockBean + CassandraBufferedRateReadExecutor cassandraBufferedRateReadExecutor; + @MockBean + CassandraBufferedRateWriteExecutor cassandraBufferedRateWriteExecutor; + + @Test + public void testToPartitionsDays() throws ParseException { + assertThat(tsDao.getPartitioning()).isEqualTo("DAYS"); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-02T00:00:00Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-02T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-03T00:00:01Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-03T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-31T23:59:59Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-31T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T23:59:59Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T00:00:00Z").getTime()); + } + + + @Ignore //TODO make test for Days + @Test + public void testCalculatePartitionsDays() throws ParseException { + + } + +} diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningHoursAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningHoursAlwaysExistsTest.java new file mode 100644 index 0000000000..a0dd21bc2b --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningHoursAlwaysExistsTest.java @@ -0,0 +1,93 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.timeseries; + +import lombok.extern.slf4j.Slf4j; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; +import org.thingsboard.server.dao.cassandra.CassandraCluster; +import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor; +import org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor; + +import java.text.ParseException; +import java.util.List; + +import static org.apache.commons.lang3.time.DateFormatUtils.ISO_DATETIME_TIME_ZONE_FORMAT; +import static org.assertj.core.api.Assertions.assertThat; + +@RunWith(SpringRunner.class) +@SpringBootTest(classes = CassandraBaseTimeseriesDao.class) +@TestPropertySource(properties = { + "database.ts.type=cassandra", + "cassandra.query.ts_key_value_partitioning=HOURS", + "cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true", + "cassandra.query.ts_key_value_partitions_max_cache_size=100000", + "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true", + "cassandra.query.ts_key_value_partitions_cache_stats_interval=60", + "cassandra.query.ts_key_value_ttl=0", + "cassandra.query.set_null_values_enabled=false", +}) +@Slf4j +public class CassandraBaseTimeseriesDaoPartitioningHoursAlwaysExistsTest { + + @Autowired + CassandraBaseTimeseriesDao tsDao; + + @MockBean(answer = Answers.RETURNS_MOCKS) + @Qualifier("CassandraCluster") + CassandraCluster cassandraCluster; + + @MockBean + CassandraBufferedRateReadExecutor cassandraBufferedRateReadExecutor; + @MockBean + CassandraBufferedRateWriteExecutor cassandraBufferedRateWriteExecutor; + + @Test + public void testToPartitionsHours() throws ParseException { + assertThat(tsDao.getPartitioning()).isEqualTo("HOURS"); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-02T01:00:00Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-02T01:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-03T02:00:01Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-03T02:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-31T23:59:59Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-31T23:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T23:59:59Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T23:00:00Z").getTime()); + } + + + @Ignore //TODO make test for Hours + @Test + public void testCalculatePartitionsHours() throws ParseException { + + } + +} diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningIndefiniteAlwaysExistsTest.java similarity index 51% rename from dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoTest.java rename to dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningIndefiniteAlwaysExistsTest.java index 962425bd14..f83632c229 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningIndefiniteAlwaysExistsTest.java @@ -16,6 +16,7 @@ package org.thingsboard.server.dao.timeseries; import lombok.extern.slf4j.Slf4j; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Answers; @@ -39,7 +40,7 @@ import static org.assertj.core.api.Assertions.assertThat; @SpringBootTest(classes = CassandraBaseTimeseriesDao.class) @TestPropertySource(properties = { "database.ts.type=cassandra", - "cassandra.query.ts_key_value_partitioning=MONTHS", + "cassandra.query.ts_key_value_partitioning=INDEFINITE", "cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true", "cassandra.query.ts_key_value_partitions_max_cache_size=100000", "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true", @@ -48,7 +49,7 @@ import static org.assertj.core.api.Assertions.assertThat; "cassandra.query.set_null_values_enabled=false", }) @Slf4j -public class CassandraBaseTimeseriesDaoTest { +public class CassandraBaseTimeseriesDaoPartitioningIndefiniteAlwaysExistsTest { @Autowired CassandraBaseTimeseriesDao tsDao; @@ -63,38 +64,15 @@ public class CassandraBaseTimeseriesDaoTest { CassandraBufferedRateWriteExecutor cassandraBufferedRateWriteExecutor; @Test - public void testToPartitionsMonths() throws ParseException { - assertThat(tsDao.getPartitioning()).isEqualTo("MONTHS"); - assertThat(tsDao.toPartitionTs(ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime())).isEqualTo(1640995200000L); - assertThat(tsDao.toPartitionTs(ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-01T00:00:00Z").getTime())).isEqualTo(1651363200000L); - assertThat(tsDao.toPartitionTs(ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-01T00:00:01Z").getTime())).isEqualTo(1651363200000L); - assertThat(tsDao.toPartitionTs(ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-31T23:59:59Z").getTime())).isEqualTo(1651363200000L); - assertThat(tsDao.toPartitionTs(ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T23:59:59Z").getTime())).isEqualTo(1701388800000L); + public void testToPartitionsIndefinite() throws ParseException { + assertThat(tsDao.getPartitioning()).isEqualTo("INDEFINITE"); + assertThat(tsDao.toPartitionTs(ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime())).isEqualTo(0L); } - @Test - public void testCalculatePartitions() throws ParseException { - long startTs = tsDao.toPartitionTs(ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-12T00:00:00Z").getTime()); - long nextTs = tsDao.toPartitionTs(ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-01-31T23:59:59Z").getTime()); - long leapTs = tsDao.toPartitionTs(ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-02-29T23:59:59Z").getTime()); - long endTs = tsDao.toPartitionTs(ISO_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-31T23:59:59Z").getTime()); - - log.warn("startTs {}, nextTs {}, leapTs {}, endTs {}", startTs, nextTs, leapTs, endTs); - assertThat(tsDao.calculatePartitions(0, 0)).isEqualTo(List.of(0L)); - assertThat(tsDao.calculatePartitions(0, 1)).isEqualTo(List.of(0L, 1L)); - assertThat(tsDao.calculatePartitions(startTs, startTs)).isEqualTo(List.of(1575158400000L)); - assertThat(tsDao.calculatePartitions(startTs, nextTs)).isEqualTo(List.of(1575158400000L, 1577836800000L)); - assertThat(tsDao.calculatePartitions(startTs, leapTs)).isEqualTo(List.of(1575158400000L, 1577836800000L, 1580515200000L)); - - assertThat(tsDao.calculatePartitions(startTs, endTs)).hasSize(14); - assertThat(tsDao.calculatePartitions(startTs, endTs)).isEqualTo(List.of( - 1575158400000L, - 1577836800000L, 1580515200000L, 1583020800000L, - 1585699200000L, 1588291200000L, 1590969600000L, - 1593561600000L, 1596240000000L, 1598918400000L, - 1601510400000L, 1604188800000L, 1606780800000L, - 1609459200000L)); + @Test + public void testCalculatePartitionsIndefinite() throws ParseException { + //Indefinite partitioning should never call tsDao.calculatePartitions() } } diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest.java new file mode 100644 index 0000000000..9afe556b42 --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest.java @@ -0,0 +1,93 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.timeseries; + +import lombok.extern.slf4j.Slf4j; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; +import org.thingsboard.server.dao.cassandra.CassandraCluster; +import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor; +import org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor; + +import java.text.ParseException; +import java.util.List; + +import static org.apache.commons.lang3.time.DateFormatUtils.ISO_DATETIME_TIME_ZONE_FORMAT; +import static org.assertj.core.api.Assertions.assertThat; + +@RunWith(SpringRunner.class) +@SpringBootTest(classes = CassandraBaseTimeseriesDao.class) +@TestPropertySource(properties = { + "database.ts.type=cassandra", + "cassandra.query.ts_key_value_partitioning=MINUTES", + "cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true", + "cassandra.query.ts_key_value_partitions_max_cache_size=100000", + "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true", + "cassandra.query.ts_key_value_partitions_cache_stats_interval=60", + "cassandra.query.ts_key_value_ttl=0", + "cassandra.query.set_null_values_enabled=false", +}) +@Slf4j +public class CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest { + + @Autowired + CassandraBaseTimeseriesDao tsDao; + + @MockBean(answer = Answers.RETURNS_MOCKS) + @Qualifier("CassandraCluster") + CassandraCluster cassandraCluster; + + @MockBean + CassandraBufferedRateReadExecutor cassandraBufferedRateReadExecutor; + @MockBean + CassandraBufferedRateWriteExecutor cassandraBufferedRateWriteExecutor; + + @Test + public void testToPartitionsMinutes() throws ParseException { + assertThat(tsDao.getPartitioning()).isEqualTo("MINUTES"); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-02T00:01:00Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-02T00:01:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-03T00:02:01Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-03T00:02:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-31T23:59:59Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-31T23:59:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T23:59:59Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T23:59:00Z").getTime()); + } + + + @Ignore //TODO make test for Minutes + @Test + public void testCalculatePartitionsMinutes() throws ParseException { + + } + +} diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest.java new file mode 100644 index 0000000000..f9a90e0d41 --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest.java @@ -0,0 +1,134 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.timeseries; + +import lombok.extern.slf4j.Slf4j; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; +import org.thingsboard.server.dao.cassandra.CassandraCluster; +import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor; +import org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor; + +import java.text.ParseException; +import java.util.List; + +import static org.apache.commons.lang3.time.DateFormatUtils.ISO_DATETIME_TIME_ZONE_FORMAT; +import static org.assertj.core.api.Assertions.assertThat; + +@RunWith(SpringRunner.class) +@SpringBootTest(classes = CassandraBaseTimeseriesDao.class) +@TestPropertySource(properties = { + "database.ts.type=cassandra", + "cassandra.query.ts_key_value_partitioning=MONTHS", + "cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true", + "cassandra.query.ts_key_value_partitions_max_cache_size=100000", + "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true", + "cassandra.query.ts_key_value_partitions_cache_stats_interval=60", + "cassandra.query.ts_key_value_ttl=0", + "cassandra.query.set_null_values_enabled=false", +}) +@Slf4j +public class CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest { + + @Autowired + CassandraBaseTimeseriesDao tsDao; + + @MockBean(answer = Answers.RETURNS_MOCKS) + @Qualifier("CassandraCluster") + CassandraCluster cassandraCluster; + + @MockBean + CassandraBufferedRateReadExecutor cassandraBufferedRateReadExecutor; + @MockBean + CassandraBufferedRateWriteExecutor cassandraBufferedRateWriteExecutor; + + @Test + public void testToPartitionsMonths() throws ParseException { + assertThat(tsDao.getPartitioning()).isEqualTo("MONTHS"); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime())).isEqualTo(1640995200000L).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-01T00:00:00Z").getTime())).isEqualTo(1651363200000L).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-01T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-01T00:00:01Z").getTime())).isEqualTo(1651363200000L).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-01T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-31T23:59:59Z").getTime())).isEqualTo(1651363200000L).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-01T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T23:59:59Z").getTime())).isEqualTo(1701388800000L).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-01T00:00:00Z").getTime()); + } + + @Test + public void testCalculatePartitionsMonths() throws ParseException { + long startTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-12T00:00:00Z").getTime()); + long nextTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-01-31T23:59:59Z").getTime()); + long leapTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-02-29T23:59:59Z").getTime()); + long endTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-31T23:59:59Z").getTime()); + log.info("startTs {}, nextTs {}, leapTs {}, endTs {}", startTs, nextTs, leapTs, endTs); + + assertThat(tsDao.calculatePartitions(0, 0)).isEqualTo(List.of(0L)); + assertThat(tsDao.calculatePartitions(0, 1)).isEqualTo(List.of(0L, 1L)); + + assertThat(tsDao.calculatePartitions(startTs, startTs)).isEqualTo(List.of(1575158400000L)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-01T00:00:00Z").getTime())); + assertThat(tsDao.calculatePartitions(startTs, nextTs)).isEqualTo(List.of(1575158400000L, 1577836800000L)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-01-01T00:00:00Z").getTime())); + + assertThat(tsDao.calculatePartitions(startTs, leapTs)).isEqualTo(List.of(1575158400000L, 1577836800000L, 1580515200000L)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-01-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-02-01T00:00:00Z").getTime())); + + assertThat(tsDao.calculatePartitions(startTs, endTs)).hasSize(14).isEqualTo(List.of( + 1575158400000L, + 1577836800000L, 1580515200000L, 1583020800000L, + 1585699200000L, 1588291200000L, 1590969600000L, + 1593561600000L, 1596240000000L, 1598918400000L, + 1601510400000L, 1604188800000L, 1606780800000L, + 1609459200000L)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-01-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-02-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-03-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-04-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-05-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-06-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-07-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-08-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-09-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-10-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-11-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-12-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-01T00:00:00Z").getTime())); + } + +} diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest.java new file mode 100644 index 0000000000..0ef0a334ea --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest.java @@ -0,0 +1,92 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.timeseries; + +import lombok.extern.slf4j.Slf4j; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; +import org.thingsboard.server.dao.cassandra.CassandraCluster; +import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor; +import org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor; + +import java.text.ParseException; +import java.util.List; + +import static org.apache.commons.lang3.time.DateFormatUtils.ISO_DATETIME_TIME_ZONE_FORMAT; +import static org.assertj.core.api.Assertions.assertThat; + +@RunWith(SpringRunner.class) +@SpringBootTest(classes = CassandraBaseTimeseriesDao.class) +@TestPropertySource(properties = { + "database.ts.type=cassandra", + "cassandra.query.ts_key_value_partitioning=YEARS", + "cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true", + "cassandra.query.ts_key_value_partitions_max_cache_size=100000", + "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true", + "cassandra.query.ts_key_value_partitions_cache_stats_interval=60", + "cassandra.query.ts_key_value_ttl=0", + "cassandra.query.set_null_values_enabled=false", +}) +@Slf4j +public class CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest { + + @Autowired + CassandraBaseTimeseriesDao tsDao; + + @MockBean(answer = Answers.RETURNS_MOCKS) + @Qualifier("CassandraCluster") + CassandraCluster cassandraCluster; + + @MockBean + CassandraBufferedRateReadExecutor cassandraBufferedRateReadExecutor; + @MockBean + CassandraBufferedRateWriteExecutor cassandraBufferedRateWriteExecutor; + + @Test + public void testToPartitionsYears() throws ParseException { + assertThat(tsDao.getPartitioning()).isEqualTo("YEARS"); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-01T00:00:00Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-01T00:00:01Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-31T23:59:59Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T23:59:59Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-01-01T00:00:00Z").getTime()); + } + + @Ignore //TODO make test for Years + @Test + public void testCalculatePartitionsYears() throws ParseException { + + } + +} From 8a276149365cec861064b61b92ef175e4746e5c4 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 16 Nov 2022 15:44:16 +0100 Subject: [PATCH 5/8] calculatePartitions improvements and added tests for cassandra ts dao --- .../CassandraBaseTimeseriesDao.java | 23 +++------ ...esDaoPartitioningDaysAlwaysExistsTest.java | 26 +++++++++- ...sDaoPartitioningHoursAlwaysExistsTest.java | 47 +++++++++++++++++-- ...aoPartitioningMinutesAlwaysExistsTest.java | 30 +++++++++++- ...sDaoPartitioningYearsAlwaysExistsTest.java | 26 +++++++++- 5 files changed, 128 insertions(+), 24 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 7f37f6cc93..651fb65150 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -107,7 +107,6 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD private boolean setNullValuesEnabled; private NoSqlTsPartitionDate tsFormat; - private long partitionDurationPlusOneMs; private PreparedStatement partitionInsertStmt; private PreparedStatement partitionInsertTtlStmt; @@ -132,9 +131,6 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD Optional partition = NoSqlTsPartitionDate.parse(partitioning); if (partition.isPresent()) { tsFormat = partition.get(); - if (!isFixedPartitioning()) { - partitionDurationPlusOneMs = partitionToMs(tsFormat) + 1; - } if (!isFixedPartitioning() && partitionsCacheSize > 0) { cassandraTsPartitionsCache = new CassandraTsPartitionsCache(partitionsCacheSize); } @@ -144,18 +140,6 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD } } - long partitionToMs(final NoSqlTsPartitionDate partition) { - switch (partition) { - case MINUTES: return TimeUnit.MINUTES.toMillis(1); - case HOURS: return TimeUnit.HOURS.toMillis(1); - case DAYS: return TimeUnit.DAYS.toMillis(1); - case MONTHS: return TimeUnit.DAYS.toMillis(31); //the longest month - case YEARS: return TimeUnit.DAYS.toMillis(366); // leap year - } - log.warn("Can not convert partition to milliseconds. There are no mapping [{}] for partitioning [{}]", partition, partitioning); - throw new RuntimeException("Failed to convert partition to ms: " + partitioning + "!"); - } - @PreDestroy public void stop() { super.stopExecutor(); @@ -452,7 +436,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD partitions.add(minPartition); long currentPartition = minPartition; - while (maxPartition > (currentPartition = toPartitionTs(currentPartition + partitionDurationPlusOneMs))){ + while (maxPartition > (currentPartition = calculateNextPartition(currentPartition))) { partitions.add(currentPartition); } @@ -460,6 +444,11 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD return partitions; } + private long calculateNextPartition(long ts) { + LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC); + return time.plus(1, tsFormat.getTruncateUnit()).toInstant(ZoneOffset.UTC).toEpochMilli(); + } + private AsyncFunction, List> getFetchChunksAsyncFunction(TenantId tenantId, EntityId entityId, String key, Aggregation aggregation, long startTs, long endTs) { return partitions -> { try { diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest.java index 864fc12146..56b0fb87a9 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest.java @@ -16,7 +16,6 @@ package org.thingsboard.server.dao.timeseries; import lombok.extern.slf4j.Slf4j; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Answers; @@ -84,10 +83,33 @@ public class CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest { } - @Ignore //TODO make test for Days @Test public void testCalculatePartitionsDays() throws ParseException { + long startTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime()); + long nextTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-12T23:59:59Z").getTime()); + long endTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-15T00:00:00Z").getTime()); + log.info("startTs {}, nextTs {}, endTs {}", startTs, nextTs, endTs); + assertThat(tsDao.calculatePartitions(0, 0)).isEqualTo(List.of(0L)); + assertThat(tsDao.calculatePartitions(0, 1)).isEqualTo(List.of(0L, 1L)); + + assertThat(tsDao.calculatePartitions(startTs, startTs)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime())); + assertThat(tsDao.calculatePartitions(startTs, nextTs)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-11T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-12T00:00:00Z").getTime())); + + assertThat(tsDao.calculatePartitions(startTs, endTs)).hasSize(6).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-11T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-12T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-13T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-14T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-15T00:00:00Z").getTime())); } } diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningHoursAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningHoursAlwaysExistsTest.java index a0dd21bc2b..4ac057aae0 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningHoursAlwaysExistsTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningHoursAlwaysExistsTest.java @@ -16,7 +16,6 @@ package org.thingsboard.server.dao.timeseries; import lombok.extern.slf4j.Slf4j; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Answers; @@ -83,11 +82,53 @@ public class CassandraBaseTimeseriesDaoPartitioningHoursAlwaysExistsTest { ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T23:00:00Z").getTime()); } - - @Ignore //TODO make test for Hours @Test public void testCalculatePartitionsHours() throws ParseException { + long startTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime()); + long nextTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T03:59:59Z").getTime()); + long endTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-11T00:59:00Z").getTime()); + log.info("startTs {}, nextTs {}, endTs {}", startTs, nextTs, endTs); + + assertThat(tsDao.calculatePartitions(0, 0)).isEqualTo(List.of(0L)); + assertThat(tsDao.calculatePartitions(0, 1)).isEqualTo(List.of(0L, 1L)); + + assertThat(tsDao.calculatePartitions(startTs, startTs)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime())); + assertThat(tsDao.calculatePartitions(startTs, nextTs)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T01:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T02:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T03:00:00Z").getTime())); + assertThat(tsDao.calculatePartitions(startTs, endTs)).hasSize(25).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T01:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T02:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T03:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T04:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T05:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T06:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T07:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T08:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T09:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T10:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T11:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T12:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T13:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T14:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T15:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T16:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T17:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T18:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T19:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T20:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T21:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T22:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T23:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-11T00:00:00Z").getTime())); } } diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest.java index 9afe556b42..056ee55562 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest.java @@ -84,10 +84,38 @@ public class CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest { } - @Ignore //TODO make test for Minutes @Test public void testCalculatePartitionsMinutes() throws ParseException { + long startTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime()); + long nextTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:02:59Z").getTime()); + long endTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:10:00Z").getTime()); + log.info("startTs {}, nextTs {}, endTs {}", startTs, nextTs, endTs); + assertThat(tsDao.calculatePartitions(0, 0)).isEqualTo(List.of(0L)); + assertThat(tsDao.calculatePartitions(0, 1)).isEqualTo(List.of(0L, 1L)); + + assertThat(tsDao.calculatePartitions(startTs, startTs)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime())); + assertThat(tsDao.calculatePartitions(startTs, nextTs)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:01:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:02:00Z").getTime())); + + assertThat(tsDao.calculatePartitions(startTs, endTs)).hasSize(11).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:01:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:02:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:03:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:04:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:05:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:06:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:07:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:08:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:09:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:10:00Z").getTime())); } } diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest.java index 0ef0a334ea..3afca0b244 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest.java @@ -83,10 +83,34 @@ public class CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest { ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-01-01T00:00:00Z").getTime()); } - @Ignore //TODO make test for Years @Test public void testCalculatePartitionsYears() throws ParseException { + long startTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-01-01T00:00:00Z").getTime()); + long nextTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2021-10-12T23:59:59Z").getTime()); + long endTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2025-07-15T00:00:00Z").getTime()); + log.info("startTs {}, nextTs {}, endTs {}", startTs, nextTs, endTs); + assertThat(tsDao.calculatePartitions(0, 0)).isEqualTo(List.of(0L)); + assertThat(tsDao.calculatePartitions(0, 1)).isEqualTo(List.of(0L, 1L)); + + assertThat(tsDao.calculatePartitions(startTs, startTs)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-01-01T00:00:00Z").getTime())); + assertThat(tsDao.calculatePartitions(startTs, nextTs)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-01-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-01-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-01T00:00:00Z").getTime())); + + assertThat(tsDao.calculatePartitions(startTs, endTs)).hasSize(7).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-01-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-01-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-01-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2024-01-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2025-01-01T00:00:00Z").getTime())); } } From 7b427d1fe6ea8c0986d0b5a4857c1de5cba1332f Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 16 Nov 2022 15:52:11 +0100 Subject: [PATCH 6/8] deleted removePartitions from TimeseriesDao --- ...stractChunkedAggregationTimeseriesDao.java | 5 --- .../timescale/TimescaleTimeseriesDao.java | 8 ---- .../dao/timeseries/BaseTimeseriesService.java | 2 - .../CassandraBaseTimeseriesDao.java | 40 ------------------- .../server/dao/timeseries/TimeseriesDao.java | 3 -- 5 files changed, 58 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java index cf726c32c4..4875468c09 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java @@ -105,11 +105,6 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq return Futures.immediateFuture(null); } - @Override - public ListenableFuture removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { - return Futures.immediateFuture(null); - } - @Override public ListenableFuture> findAllAsync(TenantId tenantId, EntityId entityId, List queries) { return processFindAllAsync(tenantId, entityId, queries); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java index c9daf1ec6d..08089a818e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java @@ -18,7 +18,6 @@ package org.thingsboard.server.dao.sqlts.timescale; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.SettableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.PageRequest; @@ -36,7 +35,6 @@ import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity; import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity; -import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper; import org.thingsboard.server.dao.sqlts.AbstractSqlTimeseriesDao; @@ -52,7 +50,6 @@ import java.util.Comparator; import java.util.List; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import java.util.function.Function; @Component @@ -144,11 +141,6 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements }); } - @Override - public ListenableFuture removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { - return service.submit(() -> null); - } - @Override public ListenableFuture findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { if (query.getAggregation() == Aggregation.NONE) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index c8a5076c65..a2905a7a64 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -46,7 +46,6 @@ import org.thingsboard.server.dao.service.Validator; import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -277,7 +276,6 @@ public class BaseTimeseriesService implements TimeseriesService { private void deleteAndRegisterFutures(TenantId tenantId, List> futures, EntityId entityId, DeleteTsKvQuery query) { futures.add(Futures.transform(timeseriesDao.remove(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor())); futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query)); - futures.add(Futures.transform(timeseriesDao.removePartition(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor())); } private static void validate(EntityId entityId) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 651fb65150..ffc52787f9 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -226,46 +226,6 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD return resultFuture; } - @Override - public ListenableFuture removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { - long minPartition = toPartitionTs(query.getStartTs()); - long maxPartition = toPartitionTs(query.getEndTs()); - if (minPartition == maxPartition) { - return Futures.immediateFuture(null); - } else { - TbResultSetFuture partitionsFuture = fetchPartitions(tenantId, entityId, query.getKey(), minPartition, maxPartition); - - final SimpleListenableFuture resultFuture = new SimpleListenableFuture<>(); - final ListenableFuture> partitionsListFuture = Futures.transformAsync(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor); - - Futures.addCallback(partitionsListFuture, new FutureCallback>() { - @Override - public void onSuccess(@Nullable List partitions) { - int index = 0; - if (minPartition != query.getStartTs()) { - index = 1; - } - List partitionsToDelete = new ArrayList<>(); - for (int i = index; i < partitions.size() - 1; i++) { - partitionsToDelete.add(partitions.get(i)); - } - QueryCursor cursor = new QueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitionsToDelete); - deletePartitionAsync(tenantId, cursor, resultFuture); - - for (Long partition : partitionsToDelete) { - cassandraTsPartitionsCache.invalidate(new CassandraPartitionCacheKey(entityId, query.getKey(), partition)); - } - } - - @Override - public void onFailure(Throwable t) { - log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t); - } - }, readResultsProcessingExecutor); - return resultFuture; - } - } - @Override public ListenableFuture findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { if (query.getAggregation() == Aggregation.NONE) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java index 5fd26d400a..4878fdd293 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java @@ -24,7 +24,6 @@ import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult; import org.thingsboard.server.common.data.kv.TsKvEntry; import java.util.List; -import java.util.Map; /** * @author Andrew Shvayka @@ -39,7 +38,5 @@ public interface TimeseriesDao { ListenableFuture remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query); - ListenableFuture removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query); - void cleanup(long systemTtl); } From ec2305df6475b462f04478d80bd58909f8b753a8 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 16 Nov 2022 16:24:47 +0100 Subject: [PATCH 7/8] added extra tests calculate partitioning days --- ...esDaoPartitioningDaysAlwaysExistsTest.java | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest.java index 56b0fb87a9..4b043b660f 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest.java @@ -82,7 +82,6 @@ public class CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest { ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T00:00:00Z").getTime()); } - @Test public void testCalculatePartitionsDays() throws ParseException { long startTs = tsDao.toPartitionTs( @@ -110,6 +109,25 @@ public class CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest { ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-13T00:00:00Z").getTime(), ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-14T00:00:00Z").getTime(), ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-15T00:00:00Z").getTime())); + + long leapStartTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-02-27T00:00:00Z").getTime()); + long leapEndTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-03-01T00:00:00Z").getTime()); + assertThat(tsDao.calculatePartitions(leapStartTs, leapEndTs)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-02-27T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-02-28T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-02-29T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-03-01T00:00:00Z").getTime())); + + long newYearStartTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-12-30T00:00:00Z").getTime()); + long newYearEndTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-01T00:00:00Z").getTime()); + assertThat(tsDao.calculatePartitions(newYearStartTs, newYearEndTs)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-12-30T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-12-31T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-01T00:00:00Z").getTime())); } } From 7d79fa0a628d5656ee0ce3b22e9fe9d9d3425b78 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Thu, 17 Nov 2022 11:07:15 +0100 Subject: [PATCH 8/8] refactoring --- .../src/main/resources/thingsboard.yml | 2 +- .../CassandraBaseTimeseriesDao.java | 19 +++++++++++-------- ...esDaoPartitioningDaysAlwaysExistsTest.java | 2 +- ...sDaoPartitioningHoursAlwaysExistsTest.java | 2 +- ...artitioningIndefiniteAlwaysExistsTest.java | 2 +- ...aoPartitioningMinutesAlwaysExistsTest.java | 2 +- ...DaoPartitioningMonthsAlwaysExistsTest.java | 2 +- ...sDaoPartitioningYearsAlwaysExistsTest.java | 2 +- 8 files changed, 18 insertions(+), 15 deletions(-) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index fcf62a4262..6028a843ea 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -227,7 +227,7 @@ cassandra: default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}" # Specify partitioning size for timestamp key-value storage. Example: MINUTES, HOURS, DAYS, MONTHS, INDEFINITE ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}" - ts_key_value_partitioning_always_exist_in_reading: "${TS_KV_PARTITIONING_ALWAYS_EXIST_IN_READING:false}" + use_ts_key_value_partitioning_on_read: "${USE_TS_KV_PARTITIONING_ON_READ:true}" ts_key_value_partitions_max_cache_size: "${TS_KV_PARTITIONS_MAX_CACHE_SIZE:100000}" ts_key_value_ttl: "${TS_KV_TTL:0}" buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}" diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index ffc52787f9..737ef8d8f6 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -94,8 +94,8 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD private String partitioning; @Getter - @Value("${cassandra.query.ts_key_value_partitioning_always_exist_in_reading:false}") - private boolean partitioningAlwaysExistInReading; + @Value("${cassandra.query.use_ts_key_value_partitioning_on_read:true}") + private boolean useTsKeyValuePartitioningOnRead; @Value("${cassandra.query.ts_key_value_partitions_max_cache_size:100000}") private long partitionsCacheSize; @@ -381,7 +381,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD if (isFixedPartitioning()) { //no need to fetch partitions from DB return Futures.immediateFuture(FIXED_PARTITION); } - if (isPartitioningAlwaysExistInReading()) { + if (!isUseTsKeyValuePartitioningOnRead()) { return Futures.immediateFuture(calculatePartitions(minPartition, maxPartition)); } TbResultSetFuture partitionsFuture = fetchPartitions(tenantId, entityId, query.getKey(), minPartition, maxPartition); @@ -393,20 +393,23 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD return Collections.singletonList(minPartition); } List partitions = new ArrayList<>(); - partitions.add(minPartition); long currentPartition = minPartition; - while (maxPartition > (currentPartition = calculateNextPartition(currentPartition))) { + LocalDateTime currentPartitionTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentPartition), ZoneOffset.UTC); + + while (maxPartition > currentPartition) { partitions.add(currentPartition); + currentPartitionTime = calculateNextPartition(currentPartitionTime); + currentPartition = currentPartitionTime.toInstant(ZoneOffset.UTC).toEpochMilli(); } partitions.add(maxPartition); + return partitions; } - private long calculateNextPartition(long ts) { - LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC); - return time.plus(1, tsFormat.getTruncateUnit()).toInstant(ZoneOffset.UTC).toEpochMilli(); + private LocalDateTime calculateNextPartition(LocalDateTime time) { + return time.plus(1, tsFormat.getTruncateUnit()); } private AsyncFunction, List> getFetchChunksAsyncFunction(TenantId tenantId, EntityId entityId, String key, Aggregation aggregation, long startTs, long endTs) { diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest.java index 4b043b660f..d5e8350104 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest.java @@ -40,7 +40,7 @@ import static org.assertj.core.api.Assertions.assertThat; @TestPropertySource(properties = { "database.ts.type=cassandra", "cassandra.query.ts_key_value_partitioning=DAYS", - "cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true", + "cassandra.query.use_ts_key_value_partitioning_on_read=false", "cassandra.query.ts_key_value_partitions_max_cache_size=100000", "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true", "cassandra.query.ts_key_value_partitions_cache_stats_interval=60", diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningHoursAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningHoursAlwaysExistsTest.java index 4ac057aae0..d5381653b3 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningHoursAlwaysExistsTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningHoursAlwaysExistsTest.java @@ -40,7 +40,7 @@ import static org.assertj.core.api.Assertions.assertThat; @TestPropertySource(properties = { "database.ts.type=cassandra", "cassandra.query.ts_key_value_partitioning=HOURS", - "cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true", + "cassandra.query.use_ts_key_value_partitioning_on_read=false", "cassandra.query.ts_key_value_partitions_max_cache_size=100000", "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true", "cassandra.query.ts_key_value_partitions_cache_stats_interval=60", diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningIndefiniteAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningIndefiniteAlwaysExistsTest.java index f83632c229..82f94cde36 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningIndefiniteAlwaysExistsTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningIndefiniteAlwaysExistsTest.java @@ -41,7 +41,7 @@ import static org.assertj.core.api.Assertions.assertThat; @TestPropertySource(properties = { "database.ts.type=cassandra", "cassandra.query.ts_key_value_partitioning=INDEFINITE", - "cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true", + "cassandra.query.use_ts_key_value_partitioning_on_read=false", "cassandra.query.ts_key_value_partitions_max_cache_size=100000", "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true", "cassandra.query.ts_key_value_partitions_cache_stats_interval=60", diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest.java index 056ee55562..ffdf18c42b 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest.java @@ -41,7 +41,7 @@ import static org.assertj.core.api.Assertions.assertThat; @TestPropertySource(properties = { "database.ts.type=cassandra", "cassandra.query.ts_key_value_partitioning=MINUTES", - "cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true", + "cassandra.query.use_ts_key_value_partitioning_on_read=false", "cassandra.query.ts_key_value_partitions_max_cache_size=100000", "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true", "cassandra.query.ts_key_value_partitions_cache_stats_interval=60", diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest.java index f9a90e0d41..8c7d12da86 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest.java @@ -40,7 +40,7 @@ import static org.assertj.core.api.Assertions.assertThat; @TestPropertySource(properties = { "database.ts.type=cassandra", "cassandra.query.ts_key_value_partitioning=MONTHS", - "cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true", + "cassandra.query.use_ts_key_value_partitioning_on_read=false", "cassandra.query.ts_key_value_partitions_max_cache_size=100000", "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true", "cassandra.query.ts_key_value_partitions_cache_stats_interval=60", diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest.java index 3afca0b244..4b2bcb6455 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest.java @@ -41,7 +41,7 @@ import static org.assertj.core.api.Assertions.assertThat; @TestPropertySource(properties = { "database.ts.type=cassandra", "cassandra.query.ts_key_value_partitioning=YEARS", - "cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true", + "cassandra.query.use_ts_key_value_partitioning_on_read=false", "cassandra.query.ts_key_value_partitions_max_cache_size=100000", "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true", "cassandra.query.ts_key_value_partitions_cache_stats_interval=60",