Browse Source

Fix blocking JPA queries on access-validator single thread

When agg=NONE (the default), findAllAsyncWithLimit() executed a
synchronous JPA query on the calling thread and wrapped the result in
Futures.immediateFuture(). Since this runs inside the access-validator
single-thread executor callback chain, a slow query blocks all other
telemetry and attribute requests.

Offload findAllAsyncWithLimit() to the JpaExecutorService work-stealing
pool via service.submit(), matching the pattern already used by
aggregated queries (findAndAggregateAsync).

Closes #15095

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
pull/15101/head
Dmytro Skarzhynets 3 months ago
parent
commit
e33a30c8f0
No known key found for this signature in database GPG Key ID: 2B51652F224037DF
  1. 4
      dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java
  2. 4
      dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java
  3. 4
      dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java

4
dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java

@ -120,7 +120,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
public ListenableFuture<ReadTsKvQueryResult> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
var aggParams = query.getAggParameters();
if (Aggregation.NONE.equals(aggParams.getAggregation()) || aggParams.getInterval() < 1) {
return Futures.immediateFuture(findAllAsyncWithLimit(entityId, query));
return service.submit(() -> findAllWithLimit(entityId, query));
} else {
List<ListenableFuture<Optional<TsKvEntity>>> futures = new ArrayList<>();
var intervalType = aggParams.getIntervalType();
@ -144,7 +144,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
}
}
ReadTsKvQueryResult findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) {
ReadTsKvQueryResult findAllWithLimit(EntityId entityId, ReadTsKvQuery query) {
Integer keyId = keyDictionaryDao.getOrSaveKeyId(query.getKey());
List<TsKvEntity> tsKvEntities = tsKvRepository.findAllWithLimit(
entityId.getId(),

4
dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java

@ -152,7 +152,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
var aggParams = query.getAggParameters();
var intervalType = aggParams.getIntervalType();
if (query.getAggregation() == Aggregation.NONE) {
return Futures.immediateFuture(findAllAsyncWithLimit(entityId, query));
return service.submit(() -> findAllWithLimit(entityId, query));
} else if (IntervalType.MILLISECONDS.equals(intervalType)) {
long startTs = query.getStartTs();
long endTs = Math.max(query.getStartTs() + 1, query.getEndTs());
@ -179,7 +179,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
super.cleanup(systemTtl);
}
private ReadTsKvQueryResult findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) {
private ReadTsKvQueryResult findAllWithLimit(EntityId entityId, ReadTsKvQuery query) {
String strKey = query.getKey();
Integer keyId = keyDictionaryDao.getOrSaveKeyId(strKey);
List<TimescaleTsKvEntity> timescaleTsKvEntities = tsKvRepository.findAllWithLimit(

4
dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java

@ -51,7 +51,7 @@ public class AbstractChunkedAggregationTimeseriesDaoTest {
Optional<TsKvEntry> optionalListenableFuture = Optional.of(mock(TsKvEntry.class));
willReturn(Futures.immediateFuture(optionalListenableFuture)).given(tsDao).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any());
willReturn(Futures.immediateFuture(mock(ReadTsKvQueryResult.class))).given(tsDao).getReadTsKvQueryResultFuture(any(), any());
willReturn(mock(ReadTsKvQueryResult.class)).given(tsDao).findAllAsyncWithLimit(any(), any());
willReturn(mock(ReadTsKvQueryResult.class)).given(tsDao).findAllWithLimit(any(), any());
}
@Test
@ -161,7 +161,7 @@ public class AbstractChunkedAggregationTimeseriesDaoTest {
ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, 1, 3000, interval, LIMIT, COUNT, DESC);
willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
verify(tsDao, times(1)).findAllAsyncWithLimit(any(), any());
verify(tsDao, times(1)).findAllWithLimit(any(), any());
verify(tsDao, times(0)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any());
}

Loading…
Cancel
Save