Browse Source

Add Cassandra result set byte-size limit to prevent OOM on large queries

Adds page-level byte-size tracking in TbResultSet.allRows() using
ExecutionInfo.getResponseSizeInBytes() to fail early when accumulated
result set size exceeds the configurable limit (default 50MB).

Also fixes pre-existing bugs where onFailure callbacks in
CassandraBaseTimeseriesDao only logged errors but never completed
the future, causing callers to hang indefinitely on failures.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
pull/15058/head
Viacheslav Klimov 3 months ago
parent
commit
412fbefdbb
Failed to extract signature
  1. 2
      application/src/main/resources/thingsboard.yml
  2. 32
      common/dao-api/src/main/java/org/thingsboard/server/dao/nosql/ResultSetSizeLimitExceededException.java
  3. 24
      common/dao-api/src/main/java/org/thingsboard/server/dao/nosql/TbResultSet.java
  4. 140
      common/dao-api/src/test/java/org/thingsboard/server/dao/nosql/TbResultSetTest.java
  5. 3
      dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractAsyncDao.java
  6. 88
      dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
  7. 2
      dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java
  8. 4
      dao/src/main/java/org/thingsboard/server/dao/timeseries/SimpleListenableFuture.java
  9. 42
      dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlTest.java

2
application/src/main/resources/thingsboard.yml

@ -330,6 +330,8 @@ cassandra:
set_null_values_enabled: "${CASSANDRA_QUERY_SET_NULL_VALUES_ENABLED:true}"
# log one of cassandra queries with specified frequency (0 - logging is disabled)
print_queries_freq: "${CASSANDRA_QUERY_PRINT_FREQ:0}"
# Maximum total size in bytes of a Cassandra query result set across all pages. Default is 50MB. 0 means unlimited
max_result_set_size_in_bytes: "${CASSANDRA_QUERY_MAX_RESULT_SET_SIZE_IN_BYTES:52428800}"
tenant_rate_limits:
# Whether to print rate-limited tenant names when printing Cassandra query queue statistic
print_tenant_names: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_PRINT_TENANT_NAMES:false}"

32
common/dao-api/src/main/java/org/thingsboard/server/dao/nosql/ResultSetSizeLimitExceededException.java

@ -0,0 +1,32 @@
/**
* Copyright © 2016-2026 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.nosql;
import lombok.Getter;
@Getter
public class ResultSetSizeLimitExceededException extends IllegalArgumentException {
private final long limitBytes;
private final long actualBytes;
public ResultSetSizeLimitExceededException(long limitBytes, long actualBytes) {
super("Result set size exceeds the maximum allowed limit. Please narrow your query");
this.limitBytes = limitBytes;
this.actualBytes = actualBytes;
}
}

24
common/dao-api/src/main/java/org/thingsboard/server/dao/nosql/TbResultSet.java

@ -34,6 +34,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
public class TbResultSet implements AsyncResultSet {
@ -89,9 +90,14 @@ public class TbResultSet implements AsyncResultSet {
}
public ListenableFuture<List<Row>> allRows(Executor executor) {
return allRows(executor, 0);
}
public ListenableFuture<List<Row>> allRows(Executor executor, long maxResultSetSizeBytes) {
List<Row> allRows = new ArrayList<>();
SettableFuture<List<Row>> resultFuture = SettableFuture.create();
this.processRows(originalStatement, delegate, allRows, resultFuture, executor);
AtomicLong accumulatedBytes = new AtomicLong(0);
this.processRows(originalStatement, delegate, allRows, resultFuture, executor, maxResultSetSizeBytes, accumulatedBytes);
return resultFuture;
}
@ -99,7 +105,19 @@ public class TbResultSet implements AsyncResultSet {
AsyncResultSet resultSet,
List<Row> allRows,
SettableFuture<List<Row>> resultFuture,
Executor executor) {
Executor executor,
long maxResultSetSizeBytes,
AtomicLong accumulatedBytes) {
if (maxResultSetSizeBytes > 0) {
int pageSize = resultSet.getExecutionInfo().getResponseSizeInBytes();
if (pageSize > 0) {
accumulatedBytes.addAndGet(pageSize);
}
if (accumulatedBytes.get() > maxResultSetSizeBytes) {
resultFuture.setException(new ResultSetSizeLimitExceededException(maxResultSetSizeBytes, accumulatedBytes.get()));
return;
}
}
allRows.addAll(loadRows(resultSet));
if (resultSet.hasMorePages()) {
ByteBuffer nextPagingState = resultSet.getExecutionInfo().getPagingState();
@ -110,7 +128,7 @@ public class TbResultSet implements AsyncResultSet {
@Override
public void onSuccess(@Nullable TbResultSet result) {
processRows(nextStatement, result,
allRows, resultFuture, executor);
allRows, resultFuture, executor, maxResultSetSizeBytes, accumulatedBytes);
}
@Override

140
common/dao-api/src/test/java/org/thingsboard/server/dao/nosql/TbResultSetTest.java

@ -0,0 +1,140 @@
/**
* Copyright © 2016-2026 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.nosql;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
class TbResultSetTest {
@Test
void allRows_withinLimit_returnsAllRows() throws Exception {
Row row = mock(Row.class);
AsyncResultSet asyncResultSet = createMockResultSet(List.of(row), false, 1000);
Statement<?> statement = mock(Statement.class);
TbResultSet tbResultSet = new TbResultSet(statement, asyncResultSet, s -> null);
ListenableFuture<List<Row>> future = tbResultSet.allRows(MoreExecutors.directExecutor(), 5000);
List<Row> result = future.get();
assertThat(result).hasSize(1);
assertThat(result.get(0)).isSameAs(row);
}
@Test
void allRows_exceedsLimitOnFirstPage_failsWithException() {
Row row = mock(Row.class);
AsyncResultSet asyncResultSet = createMockResultSet(List.of(row), false, 6000);
Statement<?> statement = mock(Statement.class);
TbResultSet tbResultSet = new TbResultSet(statement, asyncResultSet, s -> null);
ListenableFuture<List<Row>> future = tbResultSet.allRows(MoreExecutors.directExecutor(), 5000);
assertThatThrownBy(future::get)
.isInstanceOf(ExecutionException.class)
.hasCauseInstanceOf(ResultSetSizeLimitExceededException.class);
}
@Test
void allRows_exceedsLimitOnSecondPage_failsAfterSecondPage() {
Row row1 = mock(Row.class);
Row row2 = mock(Row.class);
Statement<?> statement = mock(Statement.class);
doReturn(statement).when(statement).setPagingState((ByteBuffer) null);
AsyncResultSet page2 = createMockResultSet(List.of(row2), false, 3000);
TbResultSet tbResultSetPage2 = new TbResultSet(statement, page2, s -> null);
SettableFuture<TbResultSet> page2Future = SettableFuture.create();
page2Future.set(tbResultSetPage2);
TbResultSetFuture tbPage2Future = new TbResultSetFuture(page2Future);
ExecutionInfo page1ExecInfo = mock(ExecutionInfo.class);
when(page1ExecInfo.getResponseSizeInBytes()).thenReturn(3000);
when(page1ExecInfo.getPagingState()).thenReturn(null);
AsyncResultSet page1 = createMockResultSet(List.of(row1), true, 3000);
when(page1.getExecutionInfo()).thenReturn(page1ExecInfo);
Function<Statement, TbResultSetFuture> executeAsync = s -> tbPage2Future;
TbResultSet tbResultSet = new TbResultSet(statement, page1, executeAsync);
ListenableFuture<List<Row>> future = tbResultSet.allRows(MoreExecutors.directExecutor(), 5000);
assertThatThrownBy(future::get)
.isInstanceOf(ExecutionException.class)
.hasCauseInstanceOf(ResultSetSizeLimitExceededException.class);
}
@Test
void allRows_unlimitedWithZero_returnsAllRowsRegardlessOfSize() throws Exception {
Row row = mock(Row.class);
AsyncResultSet asyncResultSet = createMockResultSet(List.of(row), false, 999999);
Statement<?> statement = mock(Statement.class);
TbResultSet tbResultSet = new TbResultSet(statement, asyncResultSet, s -> null);
ListenableFuture<List<Row>> future = tbResultSet.allRows(MoreExecutors.directExecutor(), 0);
List<Row> result = future.get();
assertThat(result).hasSize(1);
}
@Test
void allRows_noLimitOverload_returnsAllRows() throws Exception {
Row row = mock(Row.class);
AsyncResultSet asyncResultSet = createMockResultSet(List.of(row), false, 999999);
Statement<?> statement = mock(Statement.class);
TbResultSet tbResultSet = new TbResultSet(statement, asyncResultSet, s -> null);
ListenableFuture<List<Row>> future = tbResultSet.allRows(MoreExecutors.directExecutor());
List<Row> result = future.get();
assertThat(result).hasSize(1);
}
private AsyncResultSet createMockResultSet(List<Row> rows, boolean hasMorePages, int responseSizeInBytes) {
AsyncResultSet resultSet = mock(AsyncResultSet.class);
ExecutionInfo executionInfo = mock(ExecutionInfo.class);
ColumnDefinitions columnDefs = mock(ColumnDefinitions.class);
when(executionInfo.getResponseSizeInBytes()).thenReturn(responseSizeInBytes);
when(executionInfo.getPagingState()).thenReturn(null);
when(resultSet.getExecutionInfo()).thenReturn(executionInfo);
when(resultSet.getColumnDefinitions()).thenReturn(columnDefs);
when(resultSet.currentPage()).thenReturn(rows);
when(resultSet.hasMorePages()).thenReturn(hasMorePages);
when(resultSet.remaining()).thenReturn(rows.size());
return resultSet;
}
}

3
dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractAsyncDao.java

@ -37,6 +37,9 @@ public abstract class CassandraAbstractAsyncDao extends CassandraAbstractDao {
@Value("${cassandra.query.result_processing_threads:50}")
private int threadPoolSize;
@Value("${cassandra.query.max_result_set_size_in_bytes:52428800}")
protected long maxResultSetSizeBytes;
@PostConstruct
public void startExecutor() {
readResultsProcessingExecutor = ThingsBoardExecutors.newWorkStealingPool(threadPoolSize, "cassandra-callback");

88
dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java

@ -52,6 +52,7 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntryAggWrapper;
import org.thingsboard.server.common.data.kv.TsKvQuery;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.nosql.ResultSetSizeLimitExceededException;
import org.thingsboard.server.dao.nosql.TbResultSet;
import org.thingsboard.server.dao.nosql.TbResultSetFuture;
import org.thingsboard.server.dao.sqlts.AggregationTimeseriesDao;
@ -257,6 +258,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
@Override
public void onFailure(Throwable t) {
log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t);
resultFuture.setException(t);
}
}, readResultsProcessingExecutor);
return resultFuture;
@ -331,6 +333,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
@Override
public void onFailure(Throwable t) {
log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), toPartitionTs(query.getStartTs()), toPartitionTs(query.getEndTs()), t);
resultFuture.setException(t);
}
}, readResultsProcessingExecutor);
@ -372,8 +375,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
cursor.addData(convertResultToTsKvEntryList(Collections.emptyList()));
findAllAsyncSequentiallyWithLimit(tenantId, cursor, resultFuture);
} else {
Futures.addCallback(result.allRows(readResultsProcessingExecutor), new FutureCallback<List<Row>>() {
Futures.addCallback(result.allRows(readResultsProcessingExecutor, maxResultSetSizeBytes), new FutureCallback<List<Row>>() {
@Override
public void onSuccess(@Nullable List<Row> result) {
cursor.addData(convertResultToTsKvEntryList(result == null ? Collections.emptyList() : result));
@ -382,7 +384,13 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
@Override
public void onFailure(Throwable t) {
log.error("[{}][{}] Failed to fetch data for query {}-{}", stmt, t);
if (t instanceof ResultSetSizeLimitExceededException e) {
log.warn("[{}][{}] Result set size limit exceeded for key [{}], query [{}]: {} bytes, limit {} bytes",
cursor.getEntityType(), cursor.getEntityId(), cursor.getKey(), stmt.getPreparedStatement().getQuery(), e.getActualBytes(), e.getLimitBytes());
} else {
log.error("[{}][{}] Failed to fetch data for key [{}], query [{}]", cursor.getEntityType(), cursor.getEntityId(), cursor.getKey(), stmt.getPreparedStatement().getQuery(), t);
}
resultFuture.setException(t);
}
}, readResultsProcessingExecutor);
@ -392,7 +400,8 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
@Override
public void onFailure(Throwable t) {
log.error("[{}][{}] Failed to fetch data for query {}-{}", stmt, t);
log.error("[{}][{}] Failed to fetch data for key [{}], query [{}]", cursor.getEntityType(), cursor.getEntityId(), cursor.getKey(), stmt.getPreparedStatement().getQuery(), t);
resultFuture.setException(t);
}
}, readResultsProcessingExecutor);
}
@ -425,7 +434,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
}
if (!isUseTsKeyValuePartitioningOnRead()) {
final long estimatedPartitionCount = estimatePartitionCount(minPartition, maxPartition);
if (estimatedPartitionCount <= useTsKeyValuePartitioningOnReadMaxEstimatedPartitionCount) {
if (estimatedPartitionCount <= useTsKeyValuePartitioningOnReadMaxEstimatedPartitionCount) {
return Futures.immediateFuture(calculatePartitions(minPartition, maxPartition, (int) estimatedPartitionCount));
}
}
@ -446,7 +455,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
}
List<Long> calculatePartitions(long minPartition, long maxPartition) {
return calculatePartitions(minPartition, maxPartition, 0);
return calculatePartitions(minPartition, maxPartition, 0);
}
List<Long> calculatePartitions(long minPartition, long maxPartition, int estimatedPartitionCount) {
@ -533,6 +542,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
public void onFailure(Throwable t) {
}
}
private long computeTtl(long ttl) {
@ -569,7 +579,8 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
@Override
public void onFailure(Throwable t) {
log.error("[{}][{}] Failed to delete data for query {}-{}", stmt, t);
log.error("[{}][{}] Failed to delete data for key [{}], query [{}]", cursor.getEntityType(), cursor.getEntityId(), cursor.getKey(), stmt.getPreparedStatement().getQuery(), t);
resultFuture.setException(t);
}
}, readResultsProcessingExecutor);
}
@ -581,12 +592,12 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
try {
if (deleteStmt == null) {
deleteStmt = prepare("DELETE FROM " + ModelConstants.TS_KV_CF +
" WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.TS_COLUMN + " >= ? "
+ "AND " + ModelConstants.TS_COLUMN + " < ?");
" WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.TS_COLUMN + " >= ? "
+ "AND " + ModelConstants.TS_COLUMN + " < ?");
}
} finally {
stmtCreationLock.unlock();
@ -661,13 +672,13 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
private String getPreparedStatementQuery(DataType type) {
return INSERT_INTO + ModelConstants.TS_KV_CF +
"(" + ModelConstants.ENTITY_TYPE_COLUMN +
"," + ModelConstants.ENTITY_ID_COLUMN +
"," + ModelConstants.KEY_COLUMN +
"," + ModelConstants.PARTITION_COLUMN +
"," + ModelConstants.TS_COLUMN +
"," + getColumnName(type) + ")" +
" VALUES(?, ?, ?, ?, ?, ?)";
"(" + ModelConstants.ENTITY_TYPE_COLUMN +
"," + ModelConstants.ENTITY_ID_COLUMN +
"," + ModelConstants.KEY_COLUMN +
"," + ModelConstants.PARTITION_COLUMN +
"," + ModelConstants.TS_COLUMN +
"," + getColumnName(type) + ")" +
" VALUES(?, ?, ?, ?, ?, ?)";
}
private String getPreparedStatementQueryWithTtl(DataType type) {
@ -680,11 +691,11 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
try {
if (partitionInsertStmt == null) {
partitionInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF +
"(" + ModelConstants.ENTITY_TYPE_COLUMN +
"," + ModelConstants.ENTITY_ID_COLUMN +
"," + ModelConstants.PARTITION_COLUMN +
"," + ModelConstants.KEY_COLUMN + ")" +
" VALUES(?, ?, ?, ?)");
"(" + ModelConstants.ENTITY_TYPE_COLUMN +
"," + ModelConstants.ENTITY_ID_COLUMN +
"," + ModelConstants.PARTITION_COLUMN +
"," + ModelConstants.KEY_COLUMN + ")" +
" VALUES(?, ?, ?, ?)");
}
} finally {
stmtCreationLock.unlock();
@ -699,11 +710,11 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
try {
if (partitionInsertTtlStmt == null) {
partitionInsertTtlStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF +
"(" + ModelConstants.ENTITY_TYPE_COLUMN +
"," + ModelConstants.ENTITY_ID_COLUMN +
"," + ModelConstants.PARTITION_COLUMN +
"," + ModelConstants.KEY_COLUMN + ")" +
" VALUES(?, ?, ?, ?) USING TTL ?");
"(" + ModelConstants.ENTITY_TYPE_COLUMN +
"," + ModelConstants.ENTITY_ID_COLUMN +
"," + ModelConstants.PARTITION_COLUMN +
"," + ModelConstants.KEY_COLUMN + ")" +
" VALUES(?, ?, ?, ?) USING TTL ?");
}
} finally {
stmtCreationLock.unlock();
@ -809,16 +820,17 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()];
} else {
fetchStmts[type.ordinal()] = prepare(SELECT_PREFIX +
String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
+ " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.TS_COLUMN + " >= ? "
+ "AND " + ModelConstants.TS_COLUMN + " < ?"
+ (type == Aggregation.NONE ? " ORDER BY " + ModelConstants.TS_COLUMN + " " + orderBy + " LIMIT ?" : ""));
String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
+ " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants.TS_COLUMN + " >= ? "
+ "AND " + ModelConstants.TS_COLUMN + " < ?"
+ (type == Aggregation.NONE ? " ORDER BY " + ModelConstants.TS_COLUMN + " " + orderBy + " LIMIT ?" : ""));
}
}
return fetchStmts;
}
}

2
dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java

@ -184,7 +184,7 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes
}
private ListenableFuture<List<TsKvEntry>> convertAsyncResultSetToTsKvEntryList(TbResultSet rs) {
return Futures.transform(rs.allRows(readResultsProcessingExecutor),
return Futures.transform(rs.allRows(readResultsProcessingExecutor, maxResultSetSizeBytes),
rows -> this.convertResultToTsKvEntryList(rows), readResultsProcessingExecutor);
}

4
dao/src/main/java/org/thingsboard/server/dao/timeseries/SimpleListenableFuture.java

@ -26,4 +26,8 @@ public class SimpleListenableFuture<V> extends AbstractFuture<V> {
return super.set(value);
}
public boolean setException(Throwable throwable) {
return super.setException(throwable);
}
}

42
dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlTest.java

@ -16,7 +16,10 @@
package org.thingsboard.server.dao.service.timeseries.nosql;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
@ -27,9 +30,12 @@ import org.thingsboard.server.common.data.kv.JsonDataEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.nosql.ResultSetSizeLimitExceededException;
import org.thingsboard.server.dao.service.DaoNoSqlTest;
import org.thingsboard.server.dao.service.timeseries.BaseTimeseriesServiceTest;
import org.thingsboard.server.dao.timeseries.CassandraBaseTimeseriesDao;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
@ -37,6 +43,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@ -44,6 +51,9 @@ import static org.junit.Assert.assertTrue;
@DaoNoSqlTest
public class TimeseriesServiceNoSqlTest extends BaseTimeseriesServiceTest {
@Autowired
private CassandraBaseTimeseriesDao cassandraBaseTimeseriesDao;
@Test
public void shouldSaveEntryOfEachTypeWithTtl() throws ExecutionException, InterruptedException, TimeoutException {
long ttlInSec = TimeUnit.SECONDS.toSeconds(3);
@ -94,4 +104,36 @@ public class TimeseriesServiceNoSqlTest extends BaseTimeseriesServiceTest {
double expectedValue = (doubleValue + longValue)/ 2;
assertThat(listWithAgg.get(0).getDoubleValue().get()).isEqualTo(expectedValue);
}
@Test
public void testResultSetSizeLimitExceeded() throws Exception {
DeviceId deviceId = new DeviceId(Uuids.timeBased());
String value = "x".repeat(500);
List<TsKvEntry> entries = new ArrayList<>();
for (int i = 0; i < 5; i++) {
entries.add(new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(i + 1), new StringDataEntry("bigKey", value)));
}
tsService.save(tenantId, deviceId, entries, 0);
long originalLimit = (long) ReflectionTestUtils.getField(cassandraBaseTimeseriesDao, "maxResultSetSizeBytes");
try {
// Set a very low limit to trigger the exception
ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "maxResultSetSizeBytes", 1024L);
assertThatThrownBy(() -> tsService.findAll(tenantId, deviceId, Collections.singletonList(
new BaseReadTsKvQuery("bigKey", 0L, TimeUnit.MINUTES.toMillis(6), 1000, 10, Aggregation.NONE)
)).get(MAX_TIMEOUT, TimeUnit.SECONDS))
.isInstanceOf(ExecutionException.class)
.satisfies(e -> assertThat(ExceptionUtils.getRootCause(e)).isInstanceOf(ResultSetSizeLimitExceededException.class));
} finally {
ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "maxResultSetSizeBytes", originalLimit);
}
// Verify query succeeds with original limit restored
List<TsKvEntry> result = tsService.findAll(tenantId, deviceId, Collections.singletonList(
new BaseReadTsKvQuery("bigKey", 0L, TimeUnit.MINUTES.toMillis(6), 1000, 10, Aggregation.NONE)
)).get(MAX_TIMEOUT, TimeUnit.SECONDS);
assertThat(result).hasSize(5);
}
}

Loading…
Cancel
Save