Browse Source

ts_key_value_partitioning_always_exist_in_reading added to skip Cassandra ts_kv_partitions_cf reads

pull/7629/head
Sergey Matvienko 4 years ago
parent
commit
3f39305252
  1. 1
      application/src/main/resources/thingsboard.yml
  2. 21
      dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
  3. 78
      dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoTest.java

1
application/src/main/resources/thingsboard.yml

@ -227,6 +227,7 @@ cassandra:
default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}"
# Specify partitioning size for timestamp key-value storage. Example: MINUTES, HOURS, DAYS, MONTHS, INDEFINITE
ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
ts_key_value_partitioning_always_exist_in_reading: "${TS_KV_PARTITIONING_ALWAYS_EXIST_IN_READING:false}"
ts_key_value_partitions_max_cache_size: "${TS_KV_PARTITIONS_MAX_CACHE_SIZE:100000}"
ts_key_value_ttl: "${TS_KV_TTL:0}"
buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}"

21
dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java

@ -93,6 +93,10 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
@Value("${cassandra.query.ts_key_value_partitioning}")
private String partitioning;
@Getter
@Value("${cassandra.query.ts_key_value_partitioning_always_exist_in_reading:false}")
private boolean partitioningAlwaysExistInReading;
@Value("${cassandra.query.ts_key_value_partitions_max_cache_size:100000}")
private long partitionsCacheSize;
@ -417,10 +421,27 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
if (isFixedPartitioning()) { //no need to fetch partitions from DB
return Futures.immediateFuture(FIXED_PARTITION);
}
if (isPartitioningAlwaysExistInReading()) {
return Futures.immediateFuture(calculatePartitions(minPartition, maxPartition));
}
TbResultSetFuture partitionsFuture = fetchPartitions(tenantId, entityId, query.getKey(), minPartition, maxPartition);
return Futures.transformAsync(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
}
List<Long> calculatePartitions(long minPartition, long maxPartition) {
if (minPartition == maxPartition) {
return Collections.singletonList(minPartition);
}
List<Long> partitions = Arrays.asList(minPartition, maxPartition);
long currentPartition = minPartition;
while (maxPartition > (currentPartition = toPartitionTs(currentPartition + TimeUnit.DAYS.toMillis(32)))){
partitions.add(currentPartition);
}
return partitions;
}
private AsyncFunction<List<Long>, List<TbResultSet>> getFetchChunksAsyncFunction(TenantId tenantId, EntityId entityId, String key, Aggregation aggregation, long startTs, long endTs) {
return partitions -> {
try {

78
dao/src/test/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDaoTest.java

@ -0,0 +1,78 @@
/**
* ThingsBoard, Inc. ("COMPANY") CONFIDENTIAL
*
* Copyright © 2016-2021 ThingsBoard, Inc. All Rights Reserved.
*
* NOTICE: All information contained herein is, and remains
* the property of ThingsBoard, Inc. and its suppliers,
* if any. The intellectual and technical concepts contained
* herein are proprietary to ThingsBoard, Inc.
* and its suppliers and may be covered by U.S. and Foreign Patents,
* patents in process, and are protected by trade secret or copyright law.
*
* Dissemination of this information or reproduction of this material is strictly forbidden
* unless prior written permission is obtained from COMPANY.
*
* Access to the source code contained herein is hereby forbidden to anyone except current COMPANY employees,
* managers or contractors who have executed Confidentiality and Non-disclosure agreements
* explicitly covering such access.
*
* The copyright notice above does not evidence any actual or intended publication
* or disclosure of this source code, which includes
* information that is confidential and/or proprietary, and is a trade secret, of COMPANY.
* ANY REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE,
* OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE CODE WITHOUT
* THE EXPRESS WRITTEN CONSENT OF COMPANY IS STRICTLY PROHIBITED,
* AND IN VIOLATION OF APPLICABLE LAWS AND INTERNATIONAL TREATIES.
* THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION
* DOES NOT CONVEY OR IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS,
* OR TO MANUFACTURE, USE, OR SELL ANYTHING THAT IT MAY DESCRIBE, IN WHOLE OR IN PART.
*/
package org.thingsboard.server.dao.timeseries;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import org.thingsboard.server.dao.cassandra.CassandraCluster;
import org.thingsboard.server.dao.nosql.CassandraBufferedRateExecutor;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = CassandraBaseTimeseriesDao.class)
@TestPropertySource(properties = {
"cassandra.query.ts_key_value_partitioning=MONTHS",
"cassandra.query.ts_key_value_partitioning_always_exist_in_reading=true",
"cassandra.query.ts_key_value_partitions_max_cache_size=100000",
"cassandra.query.ts_key_value_partitions_cache_stats_enabled=true",
"cassandra.query.ts_key_value_partitions_cache_stats_interval=60",
"cassandra.query.ts_key_value_ttl=0",
"cassandra.query.set_null_values_enabled=false",
})
@Slf4j
public class CassandraBaseTimeseriesDaoTest {
@SpyBean
CassandraBaseTimeseriesDao tsDao;
@MockBean(answer = Answers.RETURNS_MOCKS)
@Qualifier("CassandraCluster")
CassandraCluster cassandraCluster;
@MockBean
CassandraBufferedRateExecutor cassandraBufferedRateExecutor;
@Test
public void testCalculatePartitions() {
assertThat(tsDao.calculatePartitions(0, 0)).isEqualTo(List.of(0L));
assertThat(tsDao.calculatePartitions(0, 1)).isEqualTo(List.of(0L, 1L));
}
}
Loading…
Cancel
Save