From 412fbefdbbb4678da582304e2ea598b5303dda1e Mon Sep 17 00:00:00 2001 From: Viacheslav Klimov Date: Thu, 19 Feb 2026 14:28:04 +0200 Subject: [PATCH] Add Cassandra result set byte-size limit to prevent OOM on large queries Adds page-level byte-size tracking in TbResultSet.allRows() using ExecutionInfo.getResponseSizeInBytes() to fail early when accumulated result set size exceeds the configurable limit (default 50MB). Also fixes pre-existing bugs where onFailure callbacks in CassandraBaseTimeseriesDao only logged errors but never completed the future, causing callers to hang indefinitely on failures. Co-Authored-By: Claude Opus 4.6 --- .../src/main/resources/thingsboard.yml | 2 + .../ResultSetSizeLimitExceededException.java | 32 ++++ .../server/dao/nosql/TbResultSet.java | 24 ++- .../server/dao/nosql/TbResultSetTest.java | 140 ++++++++++++++++++ .../dao/nosql/CassandraAbstractAsyncDao.java | 3 + .../CassandraBaseTimeseriesDao.java | 88 ++++++----- .../CassandraBaseTimeseriesLatestDao.java | 2 +- .../timeseries/SimpleListenableFuture.java | 4 + .../nosql/TimeseriesServiceNoSqlTest.java | 42 ++++++ 9 files changed, 295 insertions(+), 42 deletions(-) create mode 100644 common/dao-api/src/main/java/org/thingsboard/server/dao/nosql/ResultSetSizeLimitExceededException.java create mode 100644 common/dao-api/src/test/java/org/thingsboard/server/dao/nosql/TbResultSetTest.java diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 25463bdc6c..43ad34ac61 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -330,6 +330,8 @@ cassandra: set_null_values_enabled: "${CASSANDRA_QUERY_SET_NULL_VALUES_ENABLED:true}" # log one of cassandra queries with specified frequency (0 - logging is disabled) print_queries_freq: "${CASSANDRA_QUERY_PRINT_FREQ:0}" + # Maximum total size in bytes of a Cassandra query result set across all pages. Default is 50MB. 0 means unlimited + max_result_set_size_in_bytes: "${CASSANDRA_QUERY_MAX_RESULT_SET_SIZE_IN_BYTES:52428800}" tenant_rate_limits: # Whether to print rate-limited tenant names when printing Cassandra query queue statistic print_tenant_names: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_PRINT_TENANT_NAMES:false}" diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/nosql/ResultSetSizeLimitExceededException.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/nosql/ResultSetSizeLimitExceededException.java new file mode 100644 index 0000000000..f20a8ebde0 --- /dev/null +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/nosql/ResultSetSizeLimitExceededException.java @@ -0,0 +1,32 @@ +/** + * Copyright © 2016-2026 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.nosql; + +import lombok.Getter; + +@Getter +public class ResultSetSizeLimitExceededException extends IllegalArgumentException { + + private final long limitBytes; + private final long actualBytes; + + public ResultSetSizeLimitExceededException(long limitBytes, long actualBytes) { + super("Result set size exceeds the maximum allowed limit. Please narrow your query"); + this.limitBytes = limitBytes; + this.actualBytes = actualBytes; + } + +} diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/nosql/TbResultSet.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/nosql/TbResultSet.java index 9dd5c6f4b1..c2f12524b7 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/nosql/TbResultSet.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/nosql/TbResultSet.java @@ -34,6 +34,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; public class TbResultSet implements AsyncResultSet { @@ -89,9 +90,14 @@ public class TbResultSet implements AsyncResultSet { } public ListenableFuture> allRows(Executor executor) { + return allRows(executor, 0); + } + + public ListenableFuture> allRows(Executor executor, long maxResultSetSizeBytes) { List allRows = new ArrayList<>(); SettableFuture> resultFuture = SettableFuture.create(); - this.processRows(originalStatement, delegate, allRows, resultFuture, executor); + AtomicLong accumulatedBytes = new AtomicLong(0); + this.processRows(originalStatement, delegate, allRows, resultFuture, executor, maxResultSetSizeBytes, accumulatedBytes); return resultFuture; } @@ -99,7 +105,19 @@ public class TbResultSet implements AsyncResultSet { AsyncResultSet resultSet, List allRows, SettableFuture> resultFuture, - Executor executor) { + Executor executor, + long maxResultSetSizeBytes, + AtomicLong accumulatedBytes) { + if (maxResultSetSizeBytes > 0) { + int pageSize = resultSet.getExecutionInfo().getResponseSizeInBytes(); + if (pageSize > 0) { + accumulatedBytes.addAndGet(pageSize); + } + if (accumulatedBytes.get() > maxResultSetSizeBytes) { + resultFuture.setException(new ResultSetSizeLimitExceededException(maxResultSetSizeBytes, accumulatedBytes.get())); + return; + } + } allRows.addAll(loadRows(resultSet)); if (resultSet.hasMorePages()) { ByteBuffer nextPagingState = resultSet.getExecutionInfo().getPagingState(); @@ -110,7 +128,7 @@ public class TbResultSet implements AsyncResultSet { @Override public void onSuccess(@Nullable TbResultSet result) { processRows(nextStatement, result, - allRows, resultFuture, executor); + allRows, resultFuture, executor, maxResultSetSizeBytes, accumulatedBytes); } @Override diff --git a/common/dao-api/src/test/java/org/thingsboard/server/dao/nosql/TbResultSetTest.java b/common/dao-api/src/test/java/org/thingsboard/server/dao/nosql/TbResultSetTest.java new file mode 100644 index 0000000000..01c1c2c506 --- /dev/null +++ b/common/dao-api/src/test/java/org/thingsboard/server/dao/nosql/TbResultSetTest.java @@ -0,0 +1,140 @@ +/** + * Copyright © 2016-2026 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.nosql; + +import com.datastax.oss.driver.api.core.cql.AsyncResultSet; +import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.cql.Statement; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class TbResultSetTest { + + @Test + void allRows_withinLimit_returnsAllRows() throws Exception { + Row row = mock(Row.class); + AsyncResultSet asyncResultSet = createMockResultSet(List.of(row), false, 1000); + Statement statement = mock(Statement.class); + + TbResultSet tbResultSet = new TbResultSet(statement, asyncResultSet, s -> null); + ListenableFuture> future = tbResultSet.allRows(MoreExecutors.directExecutor(), 5000); + + List result = future.get(); + assertThat(result).hasSize(1); + assertThat(result.get(0)).isSameAs(row); + } + + @Test + void allRows_exceedsLimitOnFirstPage_failsWithException() { + Row row = mock(Row.class); + AsyncResultSet asyncResultSet = createMockResultSet(List.of(row), false, 6000); + Statement statement = mock(Statement.class); + + TbResultSet tbResultSet = new TbResultSet(statement, asyncResultSet, s -> null); + ListenableFuture> future = tbResultSet.allRows(MoreExecutors.directExecutor(), 5000); + + assertThatThrownBy(future::get) + .isInstanceOf(ExecutionException.class) + .hasCauseInstanceOf(ResultSetSizeLimitExceededException.class); + } + + @Test + void allRows_exceedsLimitOnSecondPage_failsAfterSecondPage() { + Row row1 = mock(Row.class); + Row row2 = mock(Row.class); + Statement statement = mock(Statement.class); + doReturn(statement).when(statement).setPagingState((ByteBuffer) null); + + AsyncResultSet page2 = createMockResultSet(List.of(row2), false, 3000); + TbResultSet tbResultSetPage2 = new TbResultSet(statement, page2, s -> null); + SettableFuture page2Future = SettableFuture.create(); + page2Future.set(tbResultSetPage2); + TbResultSetFuture tbPage2Future = new TbResultSetFuture(page2Future); + + ExecutionInfo page1ExecInfo = mock(ExecutionInfo.class); + when(page1ExecInfo.getResponseSizeInBytes()).thenReturn(3000); + when(page1ExecInfo.getPagingState()).thenReturn(null); + + AsyncResultSet page1 = createMockResultSet(List.of(row1), true, 3000); + when(page1.getExecutionInfo()).thenReturn(page1ExecInfo); + + Function executeAsync = s -> tbPage2Future; + TbResultSet tbResultSet = new TbResultSet(statement, page1, executeAsync); + ListenableFuture> future = tbResultSet.allRows(MoreExecutors.directExecutor(), 5000); + + assertThatThrownBy(future::get) + .isInstanceOf(ExecutionException.class) + .hasCauseInstanceOf(ResultSetSizeLimitExceededException.class); + } + + @Test + void allRows_unlimitedWithZero_returnsAllRowsRegardlessOfSize() throws Exception { + Row row = mock(Row.class); + AsyncResultSet asyncResultSet = createMockResultSet(List.of(row), false, 999999); + Statement statement = mock(Statement.class); + + TbResultSet tbResultSet = new TbResultSet(statement, asyncResultSet, s -> null); + ListenableFuture> future = tbResultSet.allRows(MoreExecutors.directExecutor(), 0); + + List result = future.get(); + assertThat(result).hasSize(1); + } + + @Test + void allRows_noLimitOverload_returnsAllRows() throws Exception { + Row row = mock(Row.class); + AsyncResultSet asyncResultSet = createMockResultSet(List.of(row), false, 999999); + Statement statement = mock(Statement.class); + + TbResultSet tbResultSet = new TbResultSet(statement, asyncResultSet, s -> null); + ListenableFuture> future = tbResultSet.allRows(MoreExecutors.directExecutor()); + + List result = future.get(); + assertThat(result).hasSize(1); + } + + private AsyncResultSet createMockResultSet(List rows, boolean hasMorePages, int responseSizeInBytes) { + AsyncResultSet resultSet = mock(AsyncResultSet.class); + ExecutionInfo executionInfo = mock(ExecutionInfo.class); + ColumnDefinitions columnDefs = mock(ColumnDefinitions.class); + + when(executionInfo.getResponseSizeInBytes()).thenReturn(responseSizeInBytes); + when(executionInfo.getPagingState()).thenReturn(null); + when(resultSet.getExecutionInfo()).thenReturn(executionInfo); + when(resultSet.getColumnDefinitions()).thenReturn(columnDefs); + when(resultSet.currentPage()).thenReturn(rows); + when(resultSet.hasMorePages()).thenReturn(hasMorePages); + when(resultSet.remaining()).thenReturn(rows.size()); + + return resultSet; + } + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractAsyncDao.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractAsyncDao.java index 900958dcba..3d2ebfd1fb 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractAsyncDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractAsyncDao.java @@ -37,6 +37,9 @@ public abstract class CassandraAbstractAsyncDao extends CassandraAbstractDao { @Value("${cassandra.query.result_processing_threads:50}") private int threadPoolSize; + @Value("${cassandra.query.max_result_set_size_in_bytes:52428800}") + protected long maxResultSetSizeBytes; + @PostConstruct public void startExecutor() { readResultsProcessingExecutor = ThingsBoardExecutors.newWorkStealingPool(threadPoolSize, "cassandra-callback"); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index e68e315b4b..35da636429 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -52,6 +52,7 @@ import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntryAggWrapper; import org.thingsboard.server.common.data.kv.TsKvQuery; import org.thingsboard.server.dao.model.ModelConstants; +import org.thingsboard.server.dao.nosql.ResultSetSizeLimitExceededException; import org.thingsboard.server.dao.nosql.TbResultSet; import org.thingsboard.server.dao.nosql.TbResultSetFuture; import org.thingsboard.server.dao.sqlts.AggregationTimeseriesDao; @@ -257,6 +258,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD @Override public void onFailure(Throwable t) { log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t); + resultFuture.setException(t); } }, readResultsProcessingExecutor); return resultFuture; @@ -331,6 +333,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD @Override public void onFailure(Throwable t) { log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), toPartitionTs(query.getStartTs()), toPartitionTs(query.getEndTs()), t); + resultFuture.setException(t); } }, readResultsProcessingExecutor); @@ -372,8 +375,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD cursor.addData(convertResultToTsKvEntryList(Collections.emptyList())); findAllAsyncSequentiallyWithLimit(tenantId, cursor, resultFuture); } else { - Futures.addCallback(result.allRows(readResultsProcessingExecutor), new FutureCallback>() { - + Futures.addCallback(result.allRows(readResultsProcessingExecutor, maxResultSetSizeBytes), new FutureCallback>() { @Override public void onSuccess(@Nullable List result) { cursor.addData(convertResultToTsKvEntryList(result == null ? Collections.emptyList() : result)); @@ -382,7 +384,13 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD @Override public void onFailure(Throwable t) { - log.error("[{}][{}] Failed to fetch data for query {}-{}", stmt, t); + if (t instanceof ResultSetSizeLimitExceededException e) { + log.warn("[{}][{}] Result set size limit exceeded for key [{}], query [{}]: {} bytes, limit {} bytes", + cursor.getEntityType(), cursor.getEntityId(), cursor.getKey(), stmt.getPreparedStatement().getQuery(), e.getActualBytes(), e.getLimitBytes()); + } else { + log.error("[{}][{}] Failed to fetch data for key [{}], query [{}]", cursor.getEntityType(), cursor.getEntityId(), cursor.getKey(), stmt.getPreparedStatement().getQuery(), t); + } + resultFuture.setException(t); } }, readResultsProcessingExecutor); @@ -392,7 +400,8 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD @Override public void onFailure(Throwable t) { - log.error("[{}][{}] Failed to fetch data for query {}-{}", stmt, t); + log.error("[{}][{}] Failed to fetch data for key [{}], query [{}]", cursor.getEntityType(), cursor.getEntityId(), cursor.getKey(), stmt.getPreparedStatement().getQuery(), t); + resultFuture.setException(t); } }, readResultsProcessingExecutor); } @@ -425,7 +434,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD } if (!isUseTsKeyValuePartitioningOnRead()) { final long estimatedPartitionCount = estimatePartitionCount(minPartition, maxPartition); - if (estimatedPartitionCount <= useTsKeyValuePartitioningOnReadMaxEstimatedPartitionCount) { + if (estimatedPartitionCount <= useTsKeyValuePartitioningOnReadMaxEstimatedPartitionCount) { return Futures.immediateFuture(calculatePartitions(minPartition, maxPartition, (int) estimatedPartitionCount)); } } @@ -446,7 +455,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD } List calculatePartitions(long minPartition, long maxPartition) { - return calculatePartitions(minPartition, maxPartition, 0); + return calculatePartitions(minPartition, maxPartition, 0); } List calculatePartitions(long minPartition, long maxPartition, int estimatedPartitionCount) { @@ -533,6 +542,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD public void onFailure(Throwable t) { } + } private long computeTtl(long ttl) { @@ -569,7 +579,8 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD @Override public void onFailure(Throwable t) { - log.error("[{}][{}] Failed to delete data for query {}-{}", stmt, t); + log.error("[{}][{}] Failed to delete data for key [{}], query [{}]", cursor.getEntityType(), cursor.getEntityId(), cursor.getKey(), stmt.getPreparedStatement().getQuery(), t); + resultFuture.setException(t); } }, readResultsProcessingExecutor); } @@ -581,12 +592,12 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD try { if (deleteStmt == null) { deleteStmt = prepare("DELETE FROM " + ModelConstants.TS_KV_CF + - " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM - + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM - + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM - + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM - + "AND " + ModelConstants.TS_COLUMN + " >= ? " - + "AND " + ModelConstants.TS_COLUMN + " < ?"); + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM + + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM + + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM + + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM + + "AND " + ModelConstants.TS_COLUMN + " >= ? " + + "AND " + ModelConstants.TS_COLUMN + " < ?"); } } finally { stmtCreationLock.unlock(); @@ -661,13 +672,13 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD private String getPreparedStatementQuery(DataType type) { return INSERT_INTO + ModelConstants.TS_KV_CF + - "(" + ModelConstants.ENTITY_TYPE_COLUMN + - "," + ModelConstants.ENTITY_ID_COLUMN + - "," + ModelConstants.KEY_COLUMN + - "," + ModelConstants.PARTITION_COLUMN + - "," + ModelConstants.TS_COLUMN + - "," + getColumnName(type) + ")" + - " VALUES(?, ?, ?, ?, ?, ?)"; + "(" + ModelConstants.ENTITY_TYPE_COLUMN + + "," + ModelConstants.ENTITY_ID_COLUMN + + "," + ModelConstants.KEY_COLUMN + + "," + ModelConstants.PARTITION_COLUMN + + "," + ModelConstants.TS_COLUMN + + "," + getColumnName(type) + ")" + + " VALUES(?, ?, ?, ?, ?, ?)"; } private String getPreparedStatementQueryWithTtl(DataType type) { @@ -680,11 +691,11 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD try { if (partitionInsertStmt == null) { partitionInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF + - "(" + ModelConstants.ENTITY_TYPE_COLUMN + - "," + ModelConstants.ENTITY_ID_COLUMN + - "," + ModelConstants.PARTITION_COLUMN + - "," + ModelConstants.KEY_COLUMN + ")" + - " VALUES(?, ?, ?, ?)"); + "(" + ModelConstants.ENTITY_TYPE_COLUMN + + "," + ModelConstants.ENTITY_ID_COLUMN + + "," + ModelConstants.PARTITION_COLUMN + + "," + ModelConstants.KEY_COLUMN + ")" + + " VALUES(?, ?, ?, ?)"); } } finally { stmtCreationLock.unlock(); @@ -699,11 +710,11 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD try { if (partitionInsertTtlStmt == null) { partitionInsertTtlStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF + - "(" + ModelConstants.ENTITY_TYPE_COLUMN + - "," + ModelConstants.ENTITY_ID_COLUMN + - "," + ModelConstants.PARTITION_COLUMN + - "," + ModelConstants.KEY_COLUMN + ")" + - " VALUES(?, ?, ?, ?) USING TTL ?"); + "(" + ModelConstants.ENTITY_TYPE_COLUMN + + "," + ModelConstants.ENTITY_ID_COLUMN + + "," + ModelConstants.PARTITION_COLUMN + + "," + ModelConstants.KEY_COLUMN + ")" + + " VALUES(?, ?, ?, ?) USING TTL ?"); } } finally { stmtCreationLock.unlock(); @@ -809,16 +820,17 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()]; } else { fetchStmts[type.ordinal()] = prepare(SELECT_PREFIX + - String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF - + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM - + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM - + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM - + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM - + "AND " + ModelConstants.TS_COLUMN + " >= ? " - + "AND " + ModelConstants.TS_COLUMN + " < ?" - + (type == Aggregation.NONE ? " ORDER BY " + ModelConstants.TS_COLUMN + " " + orderBy + " LIMIT ?" : "")); + String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF + + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM + + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM + + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM + + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM + + "AND " + ModelConstants.TS_COLUMN + " >= ? " + + "AND " + ModelConstants.TS_COLUMN + " < ?" + + (type == Aggregation.NONE ? " ORDER BY " + ModelConstants.TS_COLUMN + " " + orderBy + " LIMIT ?" : "")); } } return fetchStmts; } + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java index cc9c2919f2..72f667bfcc 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java @@ -184,7 +184,7 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes } private ListenableFuture> convertAsyncResultSetToTsKvEntryList(TbResultSet rs) { - return Futures.transform(rs.allRows(readResultsProcessingExecutor), + return Futures.transform(rs.allRows(readResultsProcessingExecutor, maxResultSetSizeBytes), rows -> this.convertResultToTsKvEntryList(rows), readResultsProcessingExecutor); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/SimpleListenableFuture.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/SimpleListenableFuture.java index 01581026d6..ded8c74b98 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/SimpleListenableFuture.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/SimpleListenableFuture.java @@ -26,4 +26,8 @@ public class SimpleListenableFuture extends AbstractFuture { return super.set(value); } + public boolean setException(Throwable throwable) { + return super.setException(throwable); + } + } diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlTest.java index 495ffc131b..146542f501 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlTest.java @@ -16,7 +16,10 @@ package org.thingsboard.server.dao.service.timeseries.nosql; import com.datastax.oss.driver.api.core.uuid.Uuids; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.kv.Aggregation; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; @@ -27,9 +30,12 @@ import org.thingsboard.server.common.data.kv.JsonDataEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.dao.nosql.ResultSetSizeLimitExceededException; import org.thingsboard.server.dao.service.DaoNoSqlTest; import org.thingsboard.server.dao.service.timeseries.BaseTimeseriesServiceTest; +import org.thingsboard.server.dao.timeseries.CassandraBaseTimeseriesDao; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutionException; @@ -37,6 +43,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -44,6 +51,9 @@ import static org.junit.Assert.assertTrue; @DaoNoSqlTest public class TimeseriesServiceNoSqlTest extends BaseTimeseriesServiceTest { + @Autowired + private CassandraBaseTimeseriesDao cassandraBaseTimeseriesDao; + @Test public void shouldSaveEntryOfEachTypeWithTtl() throws ExecutionException, InterruptedException, TimeoutException { long ttlInSec = TimeUnit.SECONDS.toSeconds(3); @@ -94,4 +104,36 @@ public class TimeseriesServiceNoSqlTest extends BaseTimeseriesServiceTest { double expectedValue = (doubleValue + longValue)/ 2; assertThat(listWithAgg.get(0).getDoubleValue().get()).isEqualTo(expectedValue); } + + @Test + public void testResultSetSizeLimitExceeded() throws Exception { + DeviceId deviceId = new DeviceId(Uuids.timeBased()); + + String value = "x".repeat(500); + List entries = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + entries.add(new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(i + 1), new StringDataEntry("bigKey", value))); + } + tsService.save(tenantId, deviceId, entries, 0); + + long originalLimit = (long) ReflectionTestUtils.getField(cassandraBaseTimeseriesDao, "maxResultSetSizeBytes"); + try { + // Set a very low limit to trigger the exception + ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "maxResultSetSizeBytes", 1024L); + + assertThatThrownBy(() -> tsService.findAll(tenantId, deviceId, Collections.singletonList( + new BaseReadTsKvQuery("bigKey", 0L, TimeUnit.MINUTES.toMillis(6), 1000, 10, Aggregation.NONE) + )).get(MAX_TIMEOUT, TimeUnit.SECONDS)) + .isInstanceOf(ExecutionException.class) + .satisfies(e -> assertThat(ExceptionUtils.getRootCause(e)).isInstanceOf(ResultSetSizeLimitExceededException.class)); + } finally { + ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "maxResultSetSizeBytes", originalLimit); + } + + // Verify query succeeds with original limit restored + List result = tsService.findAll(tenantId, deviceId, Collections.singletonList( + new BaseReadTsKvQuery("bigKey", 0L, TimeUnit.MINUTES.toMillis(6), 1000, 10, Aggregation.NONE) + )).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertThat(result).hasSize(5); + } }