diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 025446a1da..8e26d46ffc 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -227,6 +227,7 @@ cassandra: default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}" # Specify partitioning size for timestamp key-value storage. Example: MINUTES, HOURS, DAYS, MONTHS, INDEFINITE ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}" + use_ts_key_value_partitioning_on_read: "${USE_TS_KV_PARTITIONING_ON_READ:true}" ts_key_value_partitions_max_cache_size: "${TS_KV_PARTITIONS_MAX_CACHE_SIZE:100000}" ts_key_value_ttl: "${TS_KV_TTL:0}" buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}" diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java index cf726c32c4..4875468c09 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java @@ -105,11 +105,6 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq return Futures.immediateFuture(null); } - @Override - public ListenableFuture removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { - return Futures.immediateFuture(null); - } - @Override public ListenableFuture> findAllAsync(TenantId tenantId, EntityId entityId, List queries) { return processFindAllAsync(tenantId, entityId, queries); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java index c9daf1ec6d..08089a818e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java @@ -18,7 +18,6 @@ package org.thingsboard.server.dao.sqlts.timescale; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.SettableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.PageRequest; @@ -36,7 +35,6 @@ import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity; import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity; -import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper; import org.thingsboard.server.dao.sqlts.AbstractSqlTimeseriesDao; @@ -52,7 +50,6 @@ import java.util.Comparator; import java.util.List; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import java.util.function.Function; @Component @@ -144,11 +141,6 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements }); } - @Override - public ListenableFuture removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { - return service.submit(() -> null); - } - @Override public ListenableFuture findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { if (query.getAggregation() == Aggregation.NONE) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index c8a5076c65..a2905a7a64 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -46,7 +46,6 @@ import org.thingsboard.server.dao.service.Validator; import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -277,7 +276,6 @@ public class BaseTimeseriesService implements TimeseriesService { private void deleteAndRegisterFutures(TenantId tenantId, List> futures, EntityId entityId, DeleteTsKvQuery query) { futures.add(Futures.transform(timeseriesDao.remove(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor())); futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query)); - futures.add(Futures.transform(timeseriesDao.removePartition(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor())); } private static void validate(EntityId entityId) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 3d704a0429..737ef8d8f6 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -28,6 +28,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -59,7 +60,6 @@ import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -82,17 +82,21 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD protected static final int MIN_AGGREGATION_STEP_MS = 1000; public static final String ASC_ORDER = "ASC"; public static final long SECONDS_IN_DAY = TimeUnit.DAYS.toSeconds(1); - - protected static List FIXED_PARTITION = Arrays.asList(new Long[]{0L}); + protected static final List FIXED_PARTITION = List.of(0L); private CassandraTsPartitionsCache cassandraTsPartitionsCache; @Autowired private Environment environment; + @Getter @Value("${cassandra.query.ts_key_value_partitioning}") private String partitioning; + @Getter + @Value("${cassandra.query.use_ts_key_value_partitioning_on_read:true}") + private boolean useTsKeyValuePartitioningOnRead; + @Value("${cassandra.query.ts_key_value_partitions_max_cache_size:100000}") private long partitionsCacheSize; @@ -222,46 +226,6 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD return resultFuture; } - @Override - public ListenableFuture removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { - long minPartition = toPartitionTs(query.getStartTs()); - long maxPartition = toPartitionTs(query.getEndTs()); - if (minPartition == maxPartition) { - return Futures.immediateFuture(null); - } else { - TbResultSetFuture partitionsFuture = fetchPartitions(tenantId, entityId, query.getKey(), minPartition, maxPartition); - - final SimpleListenableFuture resultFuture = new SimpleListenableFuture<>(); - final ListenableFuture> partitionsListFuture = Futures.transformAsync(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor); - - Futures.addCallback(partitionsListFuture, new FutureCallback>() { - @Override - public void onSuccess(@Nullable List partitions) { - int index = 0; - if (minPartition != query.getStartTs()) { - index = 1; - } - List partitionsToDelete = new ArrayList<>(); - for (int i = index; i < partitions.size() - 1; i++) { - partitionsToDelete.add(partitions.get(i)); - } - QueryCursor cursor = new QueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitionsToDelete); - deletePartitionAsync(tenantId, cursor, resultFuture); - - for (Long partition : partitionsToDelete) { - cassandraTsPartitionsCache.invalidate(new CassandraPartitionCacheKey(entityId, query.getKey(), partition)); - } - } - - @Override - public void onFailure(Throwable t) { - log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t); - } - }, readResultsProcessingExecutor); - return resultFuture; - } - } - @Override public ListenableFuture findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { if (query.getAggregation() == Aggregation.NONE) { @@ -337,7 +301,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD }, MoreExecutors.directExecutor()); } - private long toPartitionTs(long ts) { + long toPartitionTs(long ts) { LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC); return tsFormat.truncatedTo(time).toInstant(ZoneOffset.UTC).toEpochMilli(); } @@ -417,10 +381,37 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD if (isFixedPartitioning()) { //no need to fetch partitions from DB return Futures.immediateFuture(FIXED_PARTITION); } + if (!isUseTsKeyValuePartitioningOnRead()) { + return Futures.immediateFuture(calculatePartitions(minPartition, maxPartition)); + } TbResultSetFuture partitionsFuture = fetchPartitions(tenantId, entityId, query.getKey(), minPartition, maxPartition); return Futures.transformAsync(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor); } + List calculatePartitions(long minPartition, long maxPartition) { + if (minPartition == maxPartition) { + return Collections.singletonList(minPartition); + } + List partitions = new ArrayList<>(); + + long currentPartition = minPartition; + LocalDateTime currentPartitionTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentPartition), ZoneOffset.UTC); + + while (maxPartition > currentPartition) { + partitions.add(currentPartition); + currentPartitionTime = calculateNextPartition(currentPartitionTime); + currentPartition = currentPartitionTime.toInstant(ZoneOffset.UTC).toEpochMilli(); + } + + partitions.add(maxPartition); + + return partitions; + } + + private LocalDateTime calculateNextPartition(LocalDateTime time) { + return time.plus(1, tsFormat.getTruncateUnit()); + } + private AsyncFunction, List> getFetchChunksAsyncFunction(TenantId tenantId, EntityId entityId, String key, Aggregation aggregation, long startTs, long endTs) { return partitions -> { try { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java index 5fd26d400a..4878fdd293 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java @@ -24,7 +24,6 @@ import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult; import org.thingsboard.server.common.data.kv.TsKvEntry; import java.util.List; -import java.util.Map; /** * @author Andrew Shvayka @@ -39,7 +38,5 @@ public interface TimeseriesDao { ListenableFuture remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query); - ListenableFuture removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query); - void cleanup(long systemTtl); } diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest.java new file mode 100644 index 0000000000..d5e8350104 --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest.java @@ -0,0 +1,133 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.timeseries; + +import lombok.extern.slf4j.Slf4j; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; +import org.thingsboard.server.dao.cassandra.CassandraCluster; +import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor; +import org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor; + +import java.text.ParseException; +import java.util.List; + +import static org.apache.commons.lang3.time.DateFormatUtils.ISO_DATETIME_TIME_ZONE_FORMAT; +import static org.assertj.core.api.Assertions.assertThat; + +@RunWith(SpringRunner.class) +@SpringBootTest(classes = CassandraBaseTimeseriesDao.class) +@TestPropertySource(properties = { + "database.ts.type=cassandra", + "cassandra.query.ts_key_value_partitioning=DAYS", + "cassandra.query.use_ts_key_value_partitioning_on_read=false", + "cassandra.query.ts_key_value_partitions_max_cache_size=100000", + "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true", + "cassandra.query.ts_key_value_partitions_cache_stats_interval=60", + "cassandra.query.ts_key_value_ttl=0", + "cassandra.query.set_null_values_enabled=false", +}) +@Slf4j +public class CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest { + + @Autowired + CassandraBaseTimeseriesDao tsDao; + + @MockBean(answer = Answers.RETURNS_MOCKS) + @Qualifier("CassandraCluster") + CassandraCluster cassandraCluster; + + @MockBean + CassandraBufferedRateReadExecutor cassandraBufferedRateReadExecutor; + @MockBean + CassandraBufferedRateWriteExecutor cassandraBufferedRateWriteExecutor; + + @Test + public void testToPartitionsDays() throws ParseException { + assertThat(tsDao.getPartitioning()).isEqualTo("DAYS"); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-02T00:00:00Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-02T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-03T00:00:01Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-03T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-31T23:59:59Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-31T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T23:59:59Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T00:00:00Z").getTime()); + } + + @Test + public void testCalculatePartitionsDays() throws ParseException { + long startTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime()); + long nextTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-12T23:59:59Z").getTime()); + long endTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-15T00:00:00Z").getTime()); + log.info("startTs {}, nextTs {}, endTs {}", startTs, nextTs, endTs); + + assertThat(tsDao.calculatePartitions(0, 0)).isEqualTo(List.of(0L)); + assertThat(tsDao.calculatePartitions(0, 1)).isEqualTo(List.of(0L, 1L)); + + assertThat(tsDao.calculatePartitions(startTs, startTs)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime())); + assertThat(tsDao.calculatePartitions(startTs, nextTs)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-11T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-12T00:00:00Z").getTime())); + + assertThat(tsDao.calculatePartitions(startTs, endTs)).hasSize(6).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-11T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-12T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-13T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-14T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-15T00:00:00Z").getTime())); + + long leapStartTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-02-27T00:00:00Z").getTime()); + long leapEndTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-03-01T00:00:00Z").getTime()); + assertThat(tsDao.calculatePartitions(leapStartTs, leapEndTs)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-02-27T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-02-28T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-02-29T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-03-01T00:00:00Z").getTime())); + + long newYearStartTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-12-30T00:00:00Z").getTime()); + long newYearEndTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-01T00:00:00Z").getTime()); + assertThat(tsDao.calculatePartitions(newYearStartTs, newYearEndTs)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-12-30T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-12-31T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-01T00:00:00Z").getTime())); + } + +} diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningHoursAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningHoursAlwaysExistsTest.java new file mode 100644 index 0000000000..d5381653b3 --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningHoursAlwaysExistsTest.java @@ -0,0 +1,134 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.timeseries; + +import lombok.extern.slf4j.Slf4j; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; +import org.thingsboard.server.dao.cassandra.CassandraCluster; +import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor; +import org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor; + +import java.text.ParseException; +import java.util.List; + +import static org.apache.commons.lang3.time.DateFormatUtils.ISO_DATETIME_TIME_ZONE_FORMAT; +import static org.assertj.core.api.Assertions.assertThat; + +@RunWith(SpringRunner.class) +@SpringBootTest(classes = CassandraBaseTimeseriesDao.class) +@TestPropertySource(properties = { + "database.ts.type=cassandra", + "cassandra.query.ts_key_value_partitioning=HOURS", + "cassandra.query.use_ts_key_value_partitioning_on_read=false", + "cassandra.query.ts_key_value_partitions_max_cache_size=100000", + "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true", + "cassandra.query.ts_key_value_partitions_cache_stats_interval=60", + "cassandra.query.ts_key_value_ttl=0", + "cassandra.query.set_null_values_enabled=false", +}) +@Slf4j +public class CassandraBaseTimeseriesDaoPartitioningHoursAlwaysExistsTest { + + @Autowired + CassandraBaseTimeseriesDao tsDao; + + @MockBean(answer = Answers.RETURNS_MOCKS) + @Qualifier("CassandraCluster") + CassandraCluster cassandraCluster; + + @MockBean + CassandraBufferedRateReadExecutor cassandraBufferedRateReadExecutor; + @MockBean + CassandraBufferedRateWriteExecutor cassandraBufferedRateWriteExecutor; + + @Test + public void testToPartitionsHours() throws ParseException { + assertThat(tsDao.getPartitioning()).isEqualTo("HOURS"); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-02T01:00:00Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-02T01:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-03T02:00:01Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-03T02:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-31T23:59:59Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-31T23:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T23:59:59Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T23:00:00Z").getTime()); + } + + @Test + public void testCalculatePartitionsHours() throws ParseException { + long startTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime()); + long nextTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T03:59:59Z").getTime()); + long endTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-11T00:59:00Z").getTime()); + log.info("startTs {}, nextTs {}, endTs {}", startTs, nextTs, endTs); + + assertThat(tsDao.calculatePartitions(0, 0)).isEqualTo(List.of(0L)); + assertThat(tsDao.calculatePartitions(0, 1)).isEqualTo(List.of(0L, 1L)); + + assertThat(tsDao.calculatePartitions(startTs, startTs)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime())); + assertThat(tsDao.calculatePartitions(startTs, nextTs)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T01:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T02:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T03:00:00Z").getTime())); + + assertThat(tsDao.calculatePartitions(startTs, endTs)).hasSize(25).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T01:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T02:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T03:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T04:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T05:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T06:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T07:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T08:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T09:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T10:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T11:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T12:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T13:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T14:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T15:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T16:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T17:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T18:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T19:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T20:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T21:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T22:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T23:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-11T00:00:00Z").getTime())); + } + +} diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningIndefiniteAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningIndefiniteAlwaysExistsTest.java new file mode 100644 index 0000000000..82f94cde36 --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningIndefiniteAlwaysExistsTest.java @@ -0,0 +1,78 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.timeseries; + +import lombok.extern.slf4j.Slf4j; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; +import org.thingsboard.server.dao.cassandra.CassandraCluster; +import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor; +import org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor; + +import java.text.ParseException; +import java.util.List; + +import static org.apache.commons.lang3.time.DateFormatUtils.ISO_DATETIME_TIME_ZONE_FORMAT; +import static org.assertj.core.api.Assertions.assertThat; + +@RunWith(SpringRunner.class) +@SpringBootTest(classes = CassandraBaseTimeseriesDao.class) +@TestPropertySource(properties = { + "database.ts.type=cassandra", + "cassandra.query.ts_key_value_partitioning=INDEFINITE", + "cassandra.query.use_ts_key_value_partitioning_on_read=false", + "cassandra.query.ts_key_value_partitions_max_cache_size=100000", + "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true", + "cassandra.query.ts_key_value_partitions_cache_stats_interval=60", + "cassandra.query.ts_key_value_ttl=0", + "cassandra.query.set_null_values_enabled=false", +}) +@Slf4j +public class CassandraBaseTimeseriesDaoPartitioningIndefiniteAlwaysExistsTest { + + @Autowired + CassandraBaseTimeseriesDao tsDao; + + @MockBean(answer = Answers.RETURNS_MOCKS) + @Qualifier("CassandraCluster") + CassandraCluster cassandraCluster; + + @MockBean + CassandraBufferedRateReadExecutor cassandraBufferedRateReadExecutor; + @MockBean + CassandraBufferedRateWriteExecutor cassandraBufferedRateWriteExecutor; + + @Test + public void testToPartitionsIndefinite() throws ParseException { + assertThat(tsDao.getPartitioning()).isEqualTo("INDEFINITE"); + assertThat(tsDao.toPartitionTs(ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime())).isEqualTo(0L); + } + + + @Test + public void testCalculatePartitionsIndefinite() throws ParseException { + //Indefinite partitioning should never call tsDao.calculatePartitions() + } + +} diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest.java new file mode 100644 index 0000000000..ffdf18c42b --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest.java @@ -0,0 +1,121 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.timeseries; + +import lombok.extern.slf4j.Slf4j; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; +import org.thingsboard.server.dao.cassandra.CassandraCluster; +import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor; +import org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor; + +import java.text.ParseException; +import java.util.List; + +import static org.apache.commons.lang3.time.DateFormatUtils.ISO_DATETIME_TIME_ZONE_FORMAT; +import static org.assertj.core.api.Assertions.assertThat; + +@RunWith(SpringRunner.class) +@SpringBootTest(classes = CassandraBaseTimeseriesDao.class) +@TestPropertySource(properties = { + "database.ts.type=cassandra", + "cassandra.query.ts_key_value_partitioning=MINUTES", + "cassandra.query.use_ts_key_value_partitioning_on_read=false", + "cassandra.query.ts_key_value_partitions_max_cache_size=100000", + "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true", + "cassandra.query.ts_key_value_partitions_cache_stats_interval=60", + "cassandra.query.ts_key_value_ttl=0", + "cassandra.query.set_null_values_enabled=false", +}) +@Slf4j +public class CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest { + + @Autowired + CassandraBaseTimeseriesDao tsDao; + + @MockBean(answer = Answers.RETURNS_MOCKS) + @Qualifier("CassandraCluster") + CassandraCluster cassandraCluster; + + @MockBean + CassandraBufferedRateReadExecutor cassandraBufferedRateReadExecutor; + @MockBean + CassandraBufferedRateWriteExecutor cassandraBufferedRateWriteExecutor; + + @Test + public void testToPartitionsMinutes() throws ParseException { + assertThat(tsDao.getPartitioning()).isEqualTo("MINUTES"); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-02T00:01:00Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-02T00:01:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-03T00:02:01Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-03T00:02:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-31T23:59:59Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-31T23:59:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T23:59:59Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T23:59:00Z").getTime()); + } + + + @Test + public void testCalculatePartitionsMinutes() throws ParseException { + long startTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime()); + long nextTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:02:59Z").getTime()); + long endTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:10:00Z").getTime()); + log.info("startTs {}, nextTs {}, endTs {}", startTs, nextTs, endTs); + + assertThat(tsDao.calculatePartitions(0, 0)).isEqualTo(List.of(0L)); + assertThat(tsDao.calculatePartitions(0, 1)).isEqualTo(List.of(0L, 1L)); + + assertThat(tsDao.calculatePartitions(startTs, startTs)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime())); + assertThat(tsDao.calculatePartitions(startTs, nextTs)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:01:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:02:00Z").getTime())); + + assertThat(tsDao.calculatePartitions(startTs, endTs)).hasSize(11).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:01:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:02:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:03:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:04:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:05:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:06:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:07:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:08:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:09:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:10:00Z").getTime())); + } + +} diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest.java new file mode 100644 index 0000000000..8c7d12da86 --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest.java @@ -0,0 +1,134 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.timeseries; + +import lombok.extern.slf4j.Slf4j; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; +import org.thingsboard.server.dao.cassandra.CassandraCluster; +import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor; +import org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor; + +import java.text.ParseException; +import java.util.List; + +import static org.apache.commons.lang3.time.DateFormatUtils.ISO_DATETIME_TIME_ZONE_FORMAT; +import static org.assertj.core.api.Assertions.assertThat; + +@RunWith(SpringRunner.class) +@SpringBootTest(classes = CassandraBaseTimeseriesDao.class) +@TestPropertySource(properties = { + "database.ts.type=cassandra", + "cassandra.query.ts_key_value_partitioning=MONTHS", + "cassandra.query.use_ts_key_value_partitioning_on_read=false", + "cassandra.query.ts_key_value_partitions_max_cache_size=100000", + "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true", + "cassandra.query.ts_key_value_partitions_cache_stats_interval=60", + "cassandra.query.ts_key_value_ttl=0", + "cassandra.query.set_null_values_enabled=false", +}) +@Slf4j +public class CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest { + + @Autowired + CassandraBaseTimeseriesDao tsDao; + + @MockBean(answer = Answers.RETURNS_MOCKS) + @Qualifier("CassandraCluster") + CassandraCluster cassandraCluster; + + @MockBean + CassandraBufferedRateReadExecutor cassandraBufferedRateReadExecutor; + @MockBean + CassandraBufferedRateWriteExecutor cassandraBufferedRateWriteExecutor; + + @Test + public void testToPartitionsMonths() throws ParseException { + assertThat(tsDao.getPartitioning()).isEqualTo("MONTHS"); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime())).isEqualTo(1640995200000L).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-01T00:00:00Z").getTime())).isEqualTo(1651363200000L).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-01T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-01T00:00:01Z").getTime())).isEqualTo(1651363200000L).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-01T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-31T23:59:59Z").getTime())).isEqualTo(1651363200000L).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-01T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T23:59:59Z").getTime())).isEqualTo(1701388800000L).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-01T00:00:00Z").getTime()); + } + + @Test + public void testCalculatePartitionsMonths() throws ParseException { + long startTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-12T00:00:00Z").getTime()); + long nextTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-01-31T23:59:59Z").getTime()); + long leapTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-02-29T23:59:59Z").getTime()); + long endTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-31T23:59:59Z").getTime()); + log.info("startTs {}, nextTs {}, leapTs {}, endTs {}", startTs, nextTs, leapTs, endTs); + + assertThat(tsDao.calculatePartitions(0, 0)).isEqualTo(List.of(0L)); + assertThat(tsDao.calculatePartitions(0, 1)).isEqualTo(List.of(0L, 1L)); + + assertThat(tsDao.calculatePartitions(startTs, startTs)).isEqualTo(List.of(1575158400000L)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-01T00:00:00Z").getTime())); + assertThat(tsDao.calculatePartitions(startTs, nextTs)).isEqualTo(List.of(1575158400000L, 1577836800000L)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-01-01T00:00:00Z").getTime())); + + assertThat(tsDao.calculatePartitions(startTs, leapTs)).isEqualTo(List.of(1575158400000L, 1577836800000L, 1580515200000L)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-01-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-02-01T00:00:00Z").getTime())); + + assertThat(tsDao.calculatePartitions(startTs, endTs)).hasSize(14).isEqualTo(List.of( + 1575158400000L, + 1577836800000L, 1580515200000L, 1583020800000L, + 1585699200000L, 1588291200000L, 1590969600000L, + 1593561600000L, 1596240000000L, 1598918400000L, + 1601510400000L, 1604188800000L, 1606780800000L, + 1609459200000L)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-01-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-02-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-03-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-04-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-05-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-06-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-07-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-08-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-09-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-10-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-11-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-12-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-01T00:00:00Z").getTime())); + } + +} diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest.java new file mode 100644 index 0000000000..4b2bcb6455 --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest.java @@ -0,0 +1,116 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.timeseries; + +import lombok.extern.slf4j.Slf4j; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; +import org.thingsboard.server.dao.cassandra.CassandraCluster; +import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor; +import org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor; + +import java.text.ParseException; +import java.util.List; + +import static org.apache.commons.lang3.time.DateFormatUtils.ISO_DATETIME_TIME_ZONE_FORMAT; +import static org.assertj.core.api.Assertions.assertThat; + +@RunWith(SpringRunner.class) +@SpringBootTest(classes = CassandraBaseTimeseriesDao.class) +@TestPropertySource(properties = { + "database.ts.type=cassandra", + "cassandra.query.ts_key_value_partitioning=YEARS", + "cassandra.query.use_ts_key_value_partitioning_on_read=false", + "cassandra.query.ts_key_value_partitions_max_cache_size=100000", + "cassandra.query.ts_key_value_partitions_cache_stats_enabled=true", + "cassandra.query.ts_key_value_partitions_cache_stats_interval=60", + "cassandra.query.ts_key_value_ttl=0", + "cassandra.query.set_null_values_enabled=false", +}) +@Slf4j +public class CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest { + + @Autowired + CassandraBaseTimeseriesDao tsDao; + + @MockBean(answer = Answers.RETURNS_MOCKS) + @Qualifier("CassandraCluster") + CassandraCluster cassandraCluster; + + @MockBean + CassandraBufferedRateReadExecutor cassandraBufferedRateReadExecutor; + @MockBean + CassandraBufferedRateWriteExecutor cassandraBufferedRateWriteExecutor; + + @Test + public void testToPartitionsYears() throws ParseException { + assertThat(tsDao.getPartitioning()).isEqualTo("YEARS"); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-01T00:00:00Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-01T00:00:01Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-31T23:59:59Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime()); + assertThat(tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T23:59:59Z").getTime())).isEqualTo( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-01-01T00:00:00Z").getTime()); + } + + @Test + public void testCalculatePartitionsYears() throws ParseException { + long startTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-01-01T00:00:00Z").getTime()); + long nextTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2021-10-12T23:59:59Z").getTime()); + long endTs = tsDao.toPartitionTs( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2025-07-15T00:00:00Z").getTime()); + log.info("startTs {}, nextTs {}, endTs {}", startTs, nextTs, endTs); + + assertThat(tsDao.calculatePartitions(0, 0)).isEqualTo(List.of(0L)); + assertThat(tsDao.calculatePartitions(0, 1)).isEqualTo(List.of(0L, 1L)); + + assertThat(tsDao.calculatePartitions(startTs, startTs)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-01-01T00:00:00Z").getTime())); + assertThat(tsDao.calculatePartitions(startTs, nextTs)).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-01-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-01-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-01T00:00:00Z").getTime())); + + assertThat(tsDao.calculatePartitions(startTs, endTs)).hasSize(7).isEqualTo(List.of( + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-01-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-01-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-01-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2024-01-01T00:00:00Z").getTime(), + ISO_DATETIME_TIME_ZONE_FORMAT.parse("2025-01-01T00:00:00Z").getTime())); + } + +}