From e33a30c8f0eeafff4961173201bb42f57252d89d Mon Sep 17 00:00:00 2001 From: Dmytro Skarzhynets Date: Tue, 24 Feb 2026 22:35:17 +0200 Subject: [PATCH] Fix blocking JPA queries on access-validator single thread When agg=NONE (the default), findAllAsyncWithLimit() executed a synchronous JPA query on the calling thread and wrapped the result in Futures.immediateFuture(). Since this runs inside the access-validator single-thread executor callback chain, a slow query blocks all other telemetry and attribute requests. Offload findAllAsyncWithLimit() to the JpaExecutorService work-stealing pool via service.submit(), matching the pattern already used by aggregated queries (findAndAggregateAsync). Closes #15095 Co-Authored-By: Claude Opus 4.6 --- .../dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java | 4 ++-- .../server/dao/sqlts/timescale/TimescaleTimeseriesDao.java | 4 ++-- .../sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java index 0d35e08b5b..d9e77a34e1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java @@ -120,7 +120,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq public ListenableFuture findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { var aggParams = query.getAggParameters(); if (Aggregation.NONE.equals(aggParams.getAggregation()) || aggParams.getInterval() < 1) { - return Futures.immediateFuture(findAllAsyncWithLimit(entityId, query)); + return service.submit(() -> findAllWithLimit(entityId, query)); } else { List>> futures = new ArrayList<>(); var intervalType = aggParams.getIntervalType(); @@ -144,7 +144,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq } } - ReadTsKvQueryResult findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) { + ReadTsKvQueryResult findAllWithLimit(EntityId entityId, ReadTsKvQuery query) { Integer keyId = keyDictionaryDao.getOrSaveKeyId(query.getKey()); List tsKvEntities = tsKvRepository.findAllWithLimit( entityId.getId(), diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java index ae1a95c9ec..c273dd646a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java @@ -152,7 +152,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements var aggParams = query.getAggParameters(); var intervalType = aggParams.getIntervalType(); if (query.getAggregation() == Aggregation.NONE) { - return Futures.immediateFuture(findAllAsyncWithLimit(entityId, query)); + return service.submit(() -> findAllWithLimit(entityId, query)); } else if (IntervalType.MILLISECONDS.equals(intervalType)) { long startTs = query.getStartTs(); long endTs = Math.max(query.getStartTs() + 1, query.getEndTs()); @@ -179,7 +179,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements super.cleanup(systemTtl); } - private ReadTsKvQueryResult findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) { + private ReadTsKvQueryResult findAllWithLimit(EntityId entityId, ReadTsKvQuery query) { String strKey = query.getKey(); Integer keyId = keyDictionaryDao.getOrSaveKeyId(strKey); List timescaleTsKvEntities = tsKvRepository.findAllWithLimit( diff --git a/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java b/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java index 583fc4116d..8ffa1da2aa 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java @@ -51,7 +51,7 @@ public class AbstractChunkedAggregationTimeseriesDaoTest { Optional optionalListenableFuture = Optional.of(mock(TsKvEntry.class)); willReturn(Futures.immediateFuture(optionalListenableFuture)).given(tsDao).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any()); willReturn(Futures.immediateFuture(mock(ReadTsKvQueryResult.class))).given(tsDao).getReadTsKvQueryResultFuture(any(), any()); - willReturn(mock(ReadTsKvQueryResult.class)).given(tsDao).findAllAsyncWithLimit(any(), any()); + willReturn(mock(ReadTsKvQueryResult.class)).given(tsDao).findAllWithLimit(any(), any()); } @Test @@ -161,7 +161,7 @@ public class AbstractChunkedAggregationTimeseriesDaoTest { ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, 1, 3000, interval, LIMIT, COUNT, DESC); willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); - verify(tsDao, times(1)).findAllAsyncWithLimit(any(), any()); + verify(tsDao, times(1)).findAllWithLimit(any(), any()); verify(tsDao, times(0)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any()); }