Browse Source

Merge pull request #7629 from smatvienko-tb/feature/cassandra_partition_always_exists_mode

[3.4.2] Cassandra partition always exists mode
pull/7646/head
Andrew Shvayka 4 years ago
committed by GitHub
parent
commit
48bfa86ea9
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      application/src/main/resources/thingsboard.yml
  2. 5
      dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java
  3. 8
      dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java
  4. 2
      dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
  5. 79
      dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
  6. 3
      dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
  7. 133
      dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest.java
  8. 134
      dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningHoursAlwaysExistsTest.java
  9. 78
      dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningIndefiniteAlwaysExistsTest.java
  10. 121
      dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest.java
  11. 134
      dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest.java
  12. 116
      dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest.java

1
application/src/main/resources/thingsboard.yml

@ -227,6 +227,7 @@ cassandra:
default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}"
# Specify partitioning size for timestamp key-value storage. Example: MINUTES, HOURS, DAYS, MONTHS, INDEFINITE
ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
use_ts_key_value_partitioning_on_read: "${USE_TS_KV_PARTITIONING_ON_READ:true}"
ts_key_value_partitions_max_cache_size: "${TS_KV_PARTITIONS_MAX_CACHE_SIZE:100000}"
ts_key_value_ttl: "${TS_KV_TTL:0}"
buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}"

5
dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java

@ -105,11 +105,6 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
return Futures.immediateFuture(null);
}
@Override
public ListenableFuture<Void> removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
return Futures.immediateFuture(null);
}
@Override
public ListenableFuture<List<ReadTsKvQueryResult>> findAllAsync(TenantId tenantId, EntityId entityId, List<ReadTsKvQuery> queries) {
return processFindAllAsync(tenantId, entityId, queries);

8
dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java

@ -18,7 +18,6 @@ package org.thingsboard.server.dao.sqlts.timescale;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
@ -36,7 +35,6 @@ import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity;
import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity;
import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
import org.thingsboard.server.dao.sqlts.AbstractSqlTimeseriesDao;
@ -52,7 +50,6 @@ import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
@Component
@ -144,11 +141,6 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
});
}
@Override
public ListenableFuture<Void> removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
return service.submit(() -> null);
}
@Override
public ListenableFuture<ReadTsKvQueryResult> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
if (query.getAggregation() == Aggregation.NONE) {

2
dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java

@ -46,7 +46,6 @@ import org.thingsboard.server.dao.service.Validator;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@ -277,7 +276,6 @@ public class BaseTimeseriesService implements TimeseriesService {
private void deleteAndRegisterFutures(TenantId tenantId, List<ListenableFuture<TsKvLatestRemovingResult>> futures, EntityId entityId, DeleteTsKvQuery query) {
futures.add(Futures.transform(timeseriesDao.remove(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor()));
futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query));
futures.add(Futures.transform(timeseriesDao.removePartition(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor()));
}
private static void validate(EntityId entityId) {

79
dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java

@ -28,6 +28,7 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@ -59,7 +60,6 @@ import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@ -82,17 +82,21 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
protected static final int MIN_AGGREGATION_STEP_MS = 1000;
public static final String ASC_ORDER = "ASC";
public static final long SECONDS_IN_DAY = TimeUnit.DAYS.toSeconds(1);
protected static List<Long> FIXED_PARTITION = Arrays.asList(new Long[]{0L});
protected static final List<Long> FIXED_PARTITION = List.of(0L);
private CassandraTsPartitionsCache cassandraTsPartitionsCache;
@Autowired
private Environment environment;
@Getter
@Value("${cassandra.query.ts_key_value_partitioning}")
private String partitioning;
@Getter
@Value("${cassandra.query.use_ts_key_value_partitioning_on_read:true}")
private boolean useTsKeyValuePartitioningOnRead;
@Value("${cassandra.query.ts_key_value_partitions_max_cache_size:100000}")
private long partitionsCacheSize;
@ -222,46 +226,6 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
return resultFuture;
}
@Override
public ListenableFuture<Void> removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
long minPartition = toPartitionTs(query.getStartTs());
long maxPartition = toPartitionTs(query.getEndTs());
if (minPartition == maxPartition) {
return Futures.immediateFuture(null);
} else {
TbResultSetFuture partitionsFuture = fetchPartitions(tenantId, entityId, query.getKey(), minPartition, maxPartition);
final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
final ListenableFuture<List<Long>> partitionsListFuture = Futures.transformAsync(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() {
@Override
public void onSuccess(@Nullable List<Long> partitions) {
int index = 0;
if (minPartition != query.getStartTs()) {
index = 1;
}
List<Long> partitionsToDelete = new ArrayList<>();
for (int i = index; i < partitions.size() - 1; i++) {
partitionsToDelete.add(partitions.get(i));
}
QueryCursor cursor = new QueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitionsToDelete);
deletePartitionAsync(tenantId, cursor, resultFuture);
for (Long partition : partitionsToDelete) {
cassandraTsPartitionsCache.invalidate(new CassandraPartitionCacheKey(entityId, query.getKey(), partition));
}
}
@Override
public void onFailure(Throwable t) {
log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t);
}
}, readResultsProcessingExecutor);
return resultFuture;
}
}
@Override
public ListenableFuture<ReadTsKvQueryResult> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
if (query.getAggregation() == Aggregation.NONE) {
@ -337,7 +301,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
}, MoreExecutors.directExecutor());
}
private long toPartitionTs(long ts) {
long toPartitionTs(long ts) {
LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);
return tsFormat.truncatedTo(time).toInstant(ZoneOffset.UTC).toEpochMilli();
}
@ -417,10 +381,37 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
if (isFixedPartitioning()) { //no need to fetch partitions from DB
return Futures.immediateFuture(FIXED_PARTITION);
}
if (!isUseTsKeyValuePartitioningOnRead()) {
return Futures.immediateFuture(calculatePartitions(minPartition, maxPartition));
}
TbResultSetFuture partitionsFuture = fetchPartitions(tenantId, entityId, query.getKey(), minPartition, maxPartition);
return Futures.transformAsync(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
}
List<Long> calculatePartitions(long minPartition, long maxPartition) {
if (minPartition == maxPartition) {
return Collections.singletonList(minPartition);
}
List<Long> partitions = new ArrayList<>();
long currentPartition = minPartition;
LocalDateTime currentPartitionTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentPartition), ZoneOffset.UTC);
while (maxPartition > currentPartition) {
partitions.add(currentPartition);
currentPartitionTime = calculateNextPartition(currentPartitionTime);
currentPartition = currentPartitionTime.toInstant(ZoneOffset.UTC).toEpochMilli();
}
partitions.add(maxPartition);
return partitions;
}
private LocalDateTime calculateNextPartition(LocalDateTime time) {
return time.plus(1, tsFormat.getTruncateUnit());
}
private AsyncFunction<List<Long>, List<TbResultSet>> getFetchChunksAsyncFunction(TenantId tenantId, EntityId entityId, String key, Aggregation aggregation, long startTs, long endTs) {
return partitions -> {
try {

3
dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java

@ -24,7 +24,6 @@ import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import java.util.List;
import java.util.Map;
/**
* @author Andrew Shvayka
@ -39,7 +38,5 @@ public interface TimeseriesDao {
ListenableFuture<Void> remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query);
ListenableFuture<Void> removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query);
void cleanup(long systemTtl);
}

133
dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest.java

@ -0,0 +1,133 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.timeseries;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import org.thingsboard.server.dao.cassandra.CassandraCluster;
import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor;
import org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor;
import java.text.ParseException;
import java.util.List;
import static org.apache.commons.lang3.time.DateFormatUtils.ISO_DATETIME_TIME_ZONE_FORMAT;
import static org.assertj.core.api.Assertions.assertThat;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = CassandraBaseTimeseriesDao.class)
@TestPropertySource(properties = {
"database.ts.type=cassandra",
"cassandra.query.ts_key_value_partitioning=DAYS",
"cassandra.query.use_ts_key_value_partitioning_on_read=false",
"cassandra.query.ts_key_value_partitions_max_cache_size=100000",
"cassandra.query.ts_key_value_partitions_cache_stats_enabled=true",
"cassandra.query.ts_key_value_partitions_cache_stats_interval=60",
"cassandra.query.ts_key_value_ttl=0",
"cassandra.query.set_null_values_enabled=false",
})
@Slf4j
public class CassandraBaseTimeseriesDaoPartitioningDaysAlwaysExistsTest {
@Autowired
CassandraBaseTimeseriesDao tsDao;
@MockBean(answer = Answers.RETURNS_MOCKS)
@Qualifier("CassandraCluster")
CassandraCluster cassandraCluster;
@MockBean
CassandraBufferedRateReadExecutor cassandraBufferedRateReadExecutor;
@MockBean
CassandraBufferedRateWriteExecutor cassandraBufferedRateWriteExecutor;
@Test
public void testToPartitionsDays() throws ParseException {
assertThat(tsDao.getPartitioning()).isEqualTo("DAYS");
assertThat(tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime())).isEqualTo(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime());
assertThat(tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-02T00:00:00Z").getTime())).isEqualTo(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-02T00:00:00Z").getTime());
assertThat(tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-03T00:00:01Z").getTime())).isEqualTo(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-03T00:00:00Z").getTime());
assertThat(tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-31T23:59:59Z").getTime())).isEqualTo(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-31T00:00:00Z").getTime());
assertThat(tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T23:59:59Z").getTime())).isEqualTo(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T00:00:00Z").getTime());
}
@Test
public void testCalculatePartitionsDays() throws ParseException {
long startTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime());
long nextTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-12T23:59:59Z").getTime());
long endTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-15T00:00:00Z").getTime());
log.info("startTs {}, nextTs {}, endTs {}", startTs, nextTs, endTs);
assertThat(tsDao.calculatePartitions(0, 0)).isEqualTo(List.of(0L));
assertThat(tsDao.calculatePartitions(0, 1)).isEqualTo(List.of(0L, 1L));
assertThat(tsDao.calculatePartitions(startTs, startTs)).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime()));
assertThat(tsDao.calculatePartitions(startTs, nextTs)).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-11T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-12T00:00:00Z").getTime()));
assertThat(tsDao.calculatePartitions(startTs, endTs)).hasSize(6).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-11T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-12T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-13T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-14T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-15T00:00:00Z").getTime()));
long leapStartTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-02-27T00:00:00Z").getTime());
long leapEndTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-03-01T00:00:00Z").getTime());
assertThat(tsDao.calculatePartitions(leapStartTs, leapEndTs)).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-02-27T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-02-28T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-02-29T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-03-01T00:00:00Z").getTime()));
long newYearStartTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-12-30T00:00:00Z").getTime());
long newYearEndTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-01T00:00:00Z").getTime());
assertThat(tsDao.calculatePartitions(newYearStartTs, newYearEndTs)).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-12-30T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-12-31T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-01T00:00:00Z").getTime()));
}
}

134
dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningHoursAlwaysExistsTest.java

@ -0,0 +1,134 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.timeseries;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import org.thingsboard.server.dao.cassandra.CassandraCluster;
import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor;
import org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor;
import java.text.ParseException;
import java.util.List;
import static org.apache.commons.lang3.time.DateFormatUtils.ISO_DATETIME_TIME_ZONE_FORMAT;
import static org.assertj.core.api.Assertions.assertThat;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = CassandraBaseTimeseriesDao.class)
@TestPropertySource(properties = {
"database.ts.type=cassandra",
"cassandra.query.ts_key_value_partitioning=HOURS",
"cassandra.query.use_ts_key_value_partitioning_on_read=false",
"cassandra.query.ts_key_value_partitions_max_cache_size=100000",
"cassandra.query.ts_key_value_partitions_cache_stats_enabled=true",
"cassandra.query.ts_key_value_partitions_cache_stats_interval=60",
"cassandra.query.ts_key_value_ttl=0",
"cassandra.query.set_null_values_enabled=false",
})
@Slf4j
public class CassandraBaseTimeseriesDaoPartitioningHoursAlwaysExistsTest {
@Autowired
CassandraBaseTimeseriesDao tsDao;
@MockBean(answer = Answers.RETURNS_MOCKS)
@Qualifier("CassandraCluster")
CassandraCluster cassandraCluster;
@MockBean
CassandraBufferedRateReadExecutor cassandraBufferedRateReadExecutor;
@MockBean
CassandraBufferedRateWriteExecutor cassandraBufferedRateWriteExecutor;
@Test
public void testToPartitionsHours() throws ParseException {
assertThat(tsDao.getPartitioning()).isEqualTo("HOURS");
assertThat(tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime())).isEqualTo(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime());
assertThat(tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-02T01:00:00Z").getTime())).isEqualTo(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-02T01:00:00Z").getTime());
assertThat(tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-03T02:00:01Z").getTime())).isEqualTo(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-03T02:00:00Z").getTime());
assertThat(tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-31T23:59:59Z").getTime())).isEqualTo(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-31T23:00:00Z").getTime());
assertThat(tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T23:59:59Z").getTime())).isEqualTo(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T23:00:00Z").getTime());
}
@Test
public void testCalculatePartitionsHours() throws ParseException {
long startTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime());
long nextTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T03:59:59Z").getTime());
long endTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-11T00:59:00Z").getTime());
log.info("startTs {}, nextTs {}, endTs {}", startTs, nextTs, endTs);
assertThat(tsDao.calculatePartitions(0, 0)).isEqualTo(List.of(0L));
assertThat(tsDao.calculatePartitions(0, 1)).isEqualTo(List.of(0L, 1L));
assertThat(tsDao.calculatePartitions(startTs, startTs)).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime()));
assertThat(tsDao.calculatePartitions(startTs, nextTs)).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T01:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T02:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T03:00:00Z").getTime()));
assertThat(tsDao.calculatePartitions(startTs, endTs)).hasSize(25).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T01:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T02:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T03:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T04:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T05:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T06:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T07:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T08:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T09:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T10:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T11:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T12:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T13:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T14:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T15:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T16:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T17:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T18:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T19:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T20:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T21:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T22:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T23:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-11T00:00:00Z").getTime()));
}
}

78
dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningIndefiniteAlwaysExistsTest.java

@ -0,0 +1,78 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.timeseries;
import lombok.extern.slf4j.Slf4j;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import org.thingsboard.server.dao.cassandra.CassandraCluster;
import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor;
import org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor;
import java.text.ParseException;
import java.util.List;
import static org.apache.commons.lang3.time.DateFormatUtils.ISO_DATETIME_TIME_ZONE_FORMAT;
import static org.assertj.core.api.Assertions.assertThat;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = CassandraBaseTimeseriesDao.class)
@TestPropertySource(properties = {
"database.ts.type=cassandra",
"cassandra.query.ts_key_value_partitioning=INDEFINITE",
"cassandra.query.use_ts_key_value_partitioning_on_read=false",
"cassandra.query.ts_key_value_partitions_max_cache_size=100000",
"cassandra.query.ts_key_value_partitions_cache_stats_enabled=true",
"cassandra.query.ts_key_value_partitions_cache_stats_interval=60",
"cassandra.query.ts_key_value_ttl=0",
"cassandra.query.set_null_values_enabled=false",
})
@Slf4j
public class CassandraBaseTimeseriesDaoPartitioningIndefiniteAlwaysExistsTest {
@Autowired
CassandraBaseTimeseriesDao tsDao;
@MockBean(answer = Answers.RETURNS_MOCKS)
@Qualifier("CassandraCluster")
CassandraCluster cassandraCluster;
@MockBean
CassandraBufferedRateReadExecutor cassandraBufferedRateReadExecutor;
@MockBean
CassandraBufferedRateWriteExecutor cassandraBufferedRateWriteExecutor;
@Test
public void testToPartitionsIndefinite() throws ParseException {
assertThat(tsDao.getPartitioning()).isEqualTo("INDEFINITE");
assertThat(tsDao.toPartitionTs(ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime())).isEqualTo(0L);
}
@Test
public void testCalculatePartitionsIndefinite() throws ParseException {
//Indefinite partitioning should never call tsDao.calculatePartitions()
}
}

121
dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest.java

@ -0,0 +1,121 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.timeseries;
import lombok.extern.slf4j.Slf4j;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import org.thingsboard.server.dao.cassandra.CassandraCluster;
import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor;
import org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor;
import java.text.ParseException;
import java.util.List;
import static org.apache.commons.lang3.time.DateFormatUtils.ISO_DATETIME_TIME_ZONE_FORMAT;
import static org.assertj.core.api.Assertions.assertThat;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = CassandraBaseTimeseriesDao.class)
@TestPropertySource(properties = {
"database.ts.type=cassandra",
"cassandra.query.ts_key_value_partitioning=MINUTES",
"cassandra.query.use_ts_key_value_partitioning_on_read=false",
"cassandra.query.ts_key_value_partitions_max_cache_size=100000",
"cassandra.query.ts_key_value_partitions_cache_stats_enabled=true",
"cassandra.query.ts_key_value_partitions_cache_stats_interval=60",
"cassandra.query.ts_key_value_ttl=0",
"cassandra.query.set_null_values_enabled=false",
})
@Slf4j
public class CassandraBaseTimeseriesDaoPartitioningMinutesAlwaysExistsTest {
@Autowired
CassandraBaseTimeseriesDao tsDao;
@MockBean(answer = Answers.RETURNS_MOCKS)
@Qualifier("CassandraCluster")
CassandraCluster cassandraCluster;
@MockBean
CassandraBufferedRateReadExecutor cassandraBufferedRateReadExecutor;
@MockBean
CassandraBufferedRateWriteExecutor cassandraBufferedRateWriteExecutor;
@Test
public void testToPartitionsMinutes() throws ParseException {
assertThat(tsDao.getPartitioning()).isEqualTo("MINUTES");
assertThat(tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime())).isEqualTo(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime());
assertThat(tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-02T00:01:00Z").getTime())).isEqualTo(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-02T00:01:00Z").getTime());
assertThat(tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-03T00:02:01Z").getTime())).isEqualTo(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-03T00:02:00Z").getTime());
assertThat(tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-31T23:59:59Z").getTime())).isEqualTo(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-31T23:59:00Z").getTime());
assertThat(tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T23:59:59Z").getTime())).isEqualTo(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T23:59:00Z").getTime());
}
@Test
public void testCalculatePartitionsMinutes() throws ParseException {
long startTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime());
long nextTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:02:59Z").getTime());
long endTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:10:00Z").getTime());
log.info("startTs {}, nextTs {}, endTs {}", startTs, nextTs, endTs);
assertThat(tsDao.calculatePartitions(0, 0)).isEqualTo(List.of(0L));
assertThat(tsDao.calculatePartitions(0, 1)).isEqualTo(List.of(0L, 1L));
assertThat(tsDao.calculatePartitions(startTs, startTs)).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime()));
assertThat(tsDao.calculatePartitions(startTs, nextTs)).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:01:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:02:00Z").getTime()));
assertThat(tsDao.calculatePartitions(startTs, endTs)).hasSize(11).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:01:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:02:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:03:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:04:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:05:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:06:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:07:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:08:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:09:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-10-10T00:10:00Z").getTime()));
}
}

134
dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest.java

@ -0,0 +1,134 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.timeseries;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import org.thingsboard.server.dao.cassandra.CassandraCluster;
import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor;
import org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor;
import java.text.ParseException;
import java.util.List;
import static org.apache.commons.lang3.time.DateFormatUtils.ISO_DATETIME_TIME_ZONE_FORMAT;
import static org.assertj.core.api.Assertions.assertThat;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = CassandraBaseTimeseriesDao.class)
@TestPropertySource(properties = {
"database.ts.type=cassandra",
"cassandra.query.ts_key_value_partitioning=MONTHS",
"cassandra.query.use_ts_key_value_partitioning_on_read=false",
"cassandra.query.ts_key_value_partitions_max_cache_size=100000",
"cassandra.query.ts_key_value_partitions_cache_stats_enabled=true",
"cassandra.query.ts_key_value_partitions_cache_stats_interval=60",
"cassandra.query.ts_key_value_ttl=0",
"cassandra.query.set_null_values_enabled=false",
})
@Slf4j
public class CassandraBaseTimeseriesDaoPartitioningMonthsAlwaysExistsTest {
@Autowired
CassandraBaseTimeseriesDao tsDao;
@MockBean(answer = Answers.RETURNS_MOCKS)
@Qualifier("CassandraCluster")
CassandraCluster cassandraCluster;
@MockBean
CassandraBufferedRateReadExecutor cassandraBufferedRateReadExecutor;
@MockBean
CassandraBufferedRateWriteExecutor cassandraBufferedRateWriteExecutor;
@Test
public void testToPartitionsMonths() throws ParseException {
assertThat(tsDao.getPartitioning()).isEqualTo("MONTHS");
assertThat(tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime())).isEqualTo(1640995200000L).isEqualTo(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime());
assertThat(tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-01T00:00:00Z").getTime())).isEqualTo(1651363200000L).isEqualTo(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-01T00:00:00Z").getTime());
assertThat(tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-01T00:00:01Z").getTime())).isEqualTo(1651363200000L).isEqualTo(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-01T00:00:00Z").getTime());
assertThat(tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-31T23:59:59Z").getTime())).isEqualTo(1651363200000L).isEqualTo(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-01T00:00:00Z").getTime());
assertThat(tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T23:59:59Z").getTime())).isEqualTo(1701388800000L).isEqualTo(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-01T00:00:00Z").getTime());
}
@Test
public void testCalculatePartitionsMonths() throws ParseException {
long startTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-12T00:00:00Z").getTime());
long nextTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-01-31T23:59:59Z").getTime());
long leapTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-02-29T23:59:59Z").getTime());
long endTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-31T23:59:59Z").getTime());
log.info("startTs {}, nextTs {}, leapTs {}, endTs {}", startTs, nextTs, leapTs, endTs);
assertThat(tsDao.calculatePartitions(0, 0)).isEqualTo(List.of(0L));
assertThat(tsDao.calculatePartitions(0, 1)).isEqualTo(List.of(0L, 1L));
assertThat(tsDao.calculatePartitions(startTs, startTs)).isEqualTo(List.of(1575158400000L)).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-01T00:00:00Z").getTime()));
assertThat(tsDao.calculatePartitions(startTs, nextTs)).isEqualTo(List.of(1575158400000L, 1577836800000L)).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-01-01T00:00:00Z").getTime()));
assertThat(tsDao.calculatePartitions(startTs, leapTs)).isEqualTo(List.of(1575158400000L, 1577836800000L, 1580515200000L)).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-01-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-02-01T00:00:00Z").getTime()));
assertThat(tsDao.calculatePartitions(startTs, endTs)).hasSize(14).isEqualTo(List.of(
1575158400000L,
1577836800000L, 1580515200000L, 1583020800000L,
1585699200000L, 1588291200000L, 1590969600000L,
1593561600000L, 1596240000000L, 1598918400000L,
1601510400000L, 1604188800000L, 1606780800000L,
1609459200000L)).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-12-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-01-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-02-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-03-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-04-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-05-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-06-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-07-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-08-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-09-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-10-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-11-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-12-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-01T00:00:00Z").getTime()));
}
}

116
dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest.java

@ -0,0 +1,116 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.timeseries;
import lombok.extern.slf4j.Slf4j;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import org.thingsboard.server.dao.cassandra.CassandraCluster;
import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor;
import org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor;
import java.text.ParseException;
import java.util.List;
import static org.apache.commons.lang3.time.DateFormatUtils.ISO_DATETIME_TIME_ZONE_FORMAT;
import static org.assertj.core.api.Assertions.assertThat;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = CassandraBaseTimeseriesDao.class)
@TestPropertySource(properties = {
"database.ts.type=cassandra",
"cassandra.query.ts_key_value_partitioning=YEARS",
"cassandra.query.use_ts_key_value_partitioning_on_read=false",
"cassandra.query.ts_key_value_partitions_max_cache_size=100000",
"cassandra.query.ts_key_value_partitions_cache_stats_enabled=true",
"cassandra.query.ts_key_value_partitions_cache_stats_interval=60",
"cassandra.query.ts_key_value_ttl=0",
"cassandra.query.set_null_values_enabled=false",
})
@Slf4j
public class CassandraBaseTimeseriesDaoPartitioningYearsAlwaysExistsTest {
@Autowired
CassandraBaseTimeseriesDao tsDao;
@MockBean(answer = Answers.RETURNS_MOCKS)
@Qualifier("CassandraCluster")
CassandraCluster cassandraCluster;
@MockBean
CassandraBufferedRateReadExecutor cassandraBufferedRateReadExecutor;
@MockBean
CassandraBufferedRateWriteExecutor cassandraBufferedRateWriteExecutor;
@Test
public void testToPartitionsYears() throws ParseException {
assertThat(tsDao.getPartitioning()).isEqualTo("YEARS");
assertThat(tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime())).isEqualTo(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime());
assertThat(tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-01T00:00:00Z").getTime())).isEqualTo(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime());
assertThat(tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-01T00:00:01Z").getTime())).isEqualTo(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime());
assertThat(tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-05-31T23:59:59Z").getTime())).isEqualTo(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime());
assertThat(tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-12-31T23:59:59Z").getTime())).isEqualTo(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-01-01T00:00:00Z").getTime());
}
@Test
public void testCalculatePartitionsYears() throws ParseException {
long startTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-01-01T00:00:00Z").getTime());
long nextTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2021-10-12T23:59:59Z").getTime());
long endTs = tsDao.toPartitionTs(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2025-07-15T00:00:00Z").getTime());
log.info("startTs {}, nextTs {}, endTs {}", startTs, nextTs, endTs);
assertThat(tsDao.calculatePartitions(0, 0)).isEqualTo(List.of(0L));
assertThat(tsDao.calculatePartitions(0, 1)).isEqualTo(List.of(0L, 1L));
assertThat(tsDao.calculatePartitions(startTs, startTs)).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-01-01T00:00:00Z").getTime()));
assertThat(tsDao.calculatePartitions(startTs, nextTs)).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-01-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-01-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-01T00:00:00Z").getTime()));
assertThat(tsDao.calculatePartitions(startTs, endTs)).hasSize(7).isEqualTo(List.of(
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2019-01-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2020-01-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2021-01-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2022-01-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2023-01-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2024-01-01T00:00:00Z").getTime(),
ISO_DATETIME_TIME_ZONE_FORMAT.parse("2025-01-01T00:00:00Z").getTime()));
}
}
Loading…
Cancel
Save