diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java index 57b458a663..d68271798a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java @@ -26,6 +26,7 @@ import org.springframework.data.domain.Sort; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.Aggregation; +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; @@ -118,12 +119,28 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq } @Override - public long getTsForReadTsKvQuery(long startTs, long endTs) { - return startTs + (endTs - startTs) / 2; + public ListenableFuture> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { + if (query.getAggregation() == Aggregation.NONE) { + return findAllAsyncWithLimit(entityId, query); + } else { + List>> futures = new ArrayList<>(); + long endPeriod = query.getEndTs(); + long startPeriod = query.getStartTs(); + long step = query.getInterval(); + while (startPeriod <= endPeriod) { + long startTs = startPeriod; + long endTs = Math.min(startPeriod + step, endPeriod + 1); + long ts = startTs + (endTs - startTs) / 2; + ReadTsKvQuery subQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, ts, 1, query.getAggregation(), query.getOrder()); + ListenableFuture> aggregateTsKvEntry = findAndAggregateAsync(entityId, subQuery.getKey(), startTs, endTs, ts, query.getAggregation()); + futures.add(aggregateTsKvEntry); + startPeriod = endTs; + } + return getTskvEntriesFuture(Futures.allAsList(futures)); + } } - @Override - public ListenableFuture> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { + private ListenableFuture> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) { Integer keyId = getOrSaveKeyId(query.getKey()); List tsKvEntities = tsKvRepository.findAllWithLimit( entityId.getId(), @@ -136,17 +153,6 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq return Futures.immediateFuture(DaoUtil.convertDataList(tsKvEntities)); } - @Override - public ListenableFuture> findAndAggregateAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query, long startTs, long endTs, Aggregation aggregation) { - return findAndAggregateAsync(entityId, - query.getKey(), - startTs, - endTs, - getTsForReadTsKvQuery(startTs, endTs), - aggregation); - - } - ListenableFuture> findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) { List> entitiesFutures = new ArrayList<>(); switchAggregation(entityId, key, startTs, endTs, aggregation, entitiesFutures); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java index c32123cb94..aa0d9b1c47 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java @@ -27,7 +27,6 @@ import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent; -import org.thingsboard.server.dao.timeseries.AggregationTimeseriesDao; import javax.annotation.Nullable; import java.sql.Connection; @@ -36,7 +35,6 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.List; import java.util.Objects; -import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -122,14 +120,4 @@ public abstract class AbstractSqlTimeseriesDao extends BaseAbstractSqlTimeseries protected int getDataPointDays(TsKvEntry tsKvEntry, long ttl) { return tsKvEntry.getDataPoints() * Math.max(1, (int) (ttl / SECONDS_IN_DAY)); } - - @Override - public Executor getExecutor() { - return service; - } - - @Override - public long getIntervalGreaterOrEqualsMinAggregationStep(long interval) { - return interval; - } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AggregationTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AggregationTimeseriesDao.java new file mode 100644 index 0000000000..31270bacc7 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AggregationTimeseriesDao.java @@ -0,0 +1,29 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.sqlts; + +import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.ReadTsKvQuery; +import org.thingsboard.server.common.data.kv.TsKvEntry; + +import java.util.List; + +public interface AggregationTimeseriesDao { + + ListenableFuture> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query); +} \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/BaseAbstractSqlTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/BaseAbstractSqlTimeseriesDao.java index 83a5d11623..dd9d15b8b1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/BaseAbstractSqlTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/BaseAbstractSqlTimeseriesDao.java @@ -15,18 +15,25 @@ */ package org.thingsboard.server.dao.sqlts; +import com.google.common.base.Function; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.hibernate.exception.ConstraintViolationException; import org.springframework.beans.factory.annotation.Autowired; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.dao.model.sqlts.dictionary.TsKvDictionary; import org.thingsboard.server.dao.model.sqlts.dictionary.TsKvDictionaryCompositeKey; import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService; import org.thingsboard.server.dao.sqlts.dictionary.TsKvDictionaryRepository; +import javax.annotation.Nullable; +import java.util.List; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; @Slf4j public abstract class BaseAbstractSqlTimeseriesDao extends JpaAbstractDaoListeningExecutorService { @@ -72,4 +79,20 @@ public abstract class BaseAbstractSqlTimeseriesDao extends JpaAbstractDaoListeni return keyId; } + protected ListenableFuture> getTskvEntriesFuture(ListenableFuture>> future) { + return Futures.transform(future, new Function>, List>() { + @Nullable + @Override + public List apply(@Nullable List> results) { + if (results == null || results.isEmpty()) { + return null; + } + return results.stream() + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + } + }, service); + } + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java index 7808f0e1bd..db30adead2 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java @@ -45,7 +45,6 @@ import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper; import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository; import org.thingsboard.server.dao.sqlts.latest.SearchTsKvLatestRepository; import org.thingsboard.server.dao.sqlts.latest.TsKvLatestRepository; -import org.thingsboard.server.dao.timeseries.AggregationTimeseriesDao; import org.thingsboard.server.dao.timeseries.TimeseriesLatestDao; import org.thingsboard.server.dao.util.SqlTsLatestAnyDao; diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java index 15875eb275..9c03a9e3dd 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java @@ -165,11 +165,6 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements super.cleanup(systemTtl); } - @Override - public ListenableFuture> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { - return findAllAsyncWithLimit(entityId, query); - } - private ListenableFuture> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) { String strKey = query.getKey(); Integer keyId = getOrSaveKeyId(strKey); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregationTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregationTimeseriesDao.java deleted file mode 100644 index b8e42e3165..0000000000 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregationTimeseriesDao.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Copyright © 2016-2022 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.dao.timeseries; - -import com.google.common.base.Function; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.kv.Aggregation; -import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; -import org.thingsboard.server.common.data.kv.ReadTsKvQuery; -import org.thingsboard.server.common.data.kv.TsKvEntry; - -import javax.annotation.Nullable; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.Executor; -import java.util.stream.Collectors; - -public interface AggregationTimeseriesDao { - - default ListenableFuture> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { - if (query.getAggregation() == Aggregation.NONE) { - return findAllAsyncWithLimit(tenantId, entityId, query); - } else { - List>> futures = findIntervals(tenantId, entityId, query); - return getTskvEntriesFuture(Futures.allAsList(futures)); - } - } - - default List>> findIntervals(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { - List>> futures = new ArrayList<>(); - long endPeriod = query.getEndTs(); - long startPeriod = query.getStartTs(); - long step = query.getInterval(); - while (startPeriod <= endPeriod) { - long startTs = startPeriod; - long endTs = Math.min(startPeriod + step, endPeriod + 1); - long ts = getTsForReadTsKvQuery(startTs, endTs); - ReadTsKvQuery subQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, ts, 1, query.getAggregation(), query.getOrder()); - ListenableFuture> aggregateTsKvEntry = findAndAggregateAsync(tenantId, entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs), query.getAggregation()); - futures.add(aggregateTsKvEntry); - startPeriod = endTs; - } - return futures; - } - - default long getTsForReadTsKvQuery(long startTs, long endTs) { - return endTs - startTs; - } - - long getIntervalGreaterOrEqualsMinAggregationStep(long interval); - - default ListenableFuture> getTskvEntriesFuture(ListenableFuture>> allAsList) { - return Futures.transform(allAsList, new Function<>() { - @Nullable - @Override - public List apply(@Nullable List> results) { - if (results == null || results.isEmpty()) { - return null; - } - return results.stream() - .filter(Optional::isPresent) - .map(Optional::get) - .collect(Collectors.toList()); - } - }, getExecutor()); - } - - Executor getExecutor(); - - default ListenableFuture> findAndAggregateAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery key, long startTs, long endTs, Aggregation aggregation) { - return Futures.immediateFuture(null); - } - - ListenableFuture> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query); - - default long toPartitionTs(long ts) { - return ts; - } - -} diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 8b58e547c9..bb3930b99c 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -37,6 +37,7 @@ import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.Aggregation; +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.DataType; import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; import org.thingsboard.server.common.data.kv.KvEntry; @@ -45,6 +46,7 @@ import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.nosql.TbResultSet; import org.thingsboard.server.dao.nosql.TbResultSetFuture; +import org.thingsboard.server.dao.sqlts.AggregationTimeseriesDao; import org.thingsboard.server.dao.util.NoSqlTsDao; import javax.annotation.Nullable; @@ -59,7 +61,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Optional; -import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -265,13 +266,31 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD } @Override - public long getIntervalGreaterOrEqualsMinAggregationStep(long interval) { - return Math.max(interval, MIN_AGGREGATION_STEP_MS); - } - - @Override - public ListenableFuture> findAndAggregateAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query, long startTs, long endTs, Aggregation aggregation) { - return findAndAggregateAsync(tenantId, entityId, query, startTs, endTs); + public ListenableFuture> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { + if (query.getAggregation() == Aggregation.NONE) { + return findAllAsyncWithLimit(tenantId, entityId, query); + } else { + long startPeriod = query.getStartTs(); + long endPeriod = query.getEndTs(); + long step = Math.max(query.getInterval(), MIN_AGGREGATION_STEP_MS); + List>> futures = new ArrayList<>(); + while (startPeriod <= endPeriod) { + long startTs = startPeriod; + long endTs = Math.min(startPeriod + step, endPeriod + 1); + long ts = endTs - startTs; + ReadTsKvQuery subQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, ts, 1, query.getAggregation(), query.getOrder()); + futures.add(findAndAggregateAsync(tenantId, entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs))); + startPeriod = endTs; + } + ListenableFuture>> future = Futures.allAsList(futures); + return Futures.transform(future, new Function<>() { + @Nullable + @Override + public List apply(@Nullable List> input) { + return input == null ? Collections.emptyList() : input.stream().filter(v -> v.isPresent()).map(v -> v.get()).collect(Collectors.toList()); + } + }, readResultsProcessingExecutor); + } } @Override @@ -279,8 +298,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD //Cleanup by TTL is native for Cassandra } - @Override - public ListenableFuture> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { + private ListenableFuture> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { long minPartition = toPartitionTs(query.getStartTs()); long maxPartition = toPartitionTs(query.getEndTs()); final ListenableFuture> partitionsListFuture = getPartitionsFuture(tenantId, query, entityId, minPartition, maxPartition); @@ -302,17 +320,11 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD return resultFuture; } - @Override - public long toPartitionTs(long ts) { + private long toPartitionTs(long ts) { LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC); return tsFormat.truncatedTo(time).toInstant(ZoneOffset.UTC).toEpochMilli(); } - @Override - public Executor getExecutor() { - return readResultsProcessingExecutor; - } - private void findAllAsyncSequentiallyWithLimit(TenantId tenantId, final TsKvQueryCursor cursor, final SimpleListenableFuture> resultFuture) { if (cursor.isFull() || !cursor.hasNextPartition()) { resultFuture.set(cursor.getData()); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java index e7099db3d2..124af03a1a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java @@ -37,6 +37,7 @@ import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult; import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.nosql.TbResultSet; +import org.thingsboard.server.dao.sqlts.AggregationTimeseriesDao; import org.thingsboard.server.dao.util.NoSqlTsLatestDao; import java.util.Collections; diff --git a/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java b/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java index cab869a647..db216f7f6e 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java @@ -59,9 +59,9 @@ public class AbstractChunkedAggregationTimeseriesDaoTest { ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, 1, 2001, 1001, LIMIT, COUNT, DESC); ReadTsKvQuery subQuerySecond = new BaseReadTsKvQuery(TEMP, 2001, 3001, 2501, LIMIT, COUNT, DESC); tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); - verify(tsDao, times(2)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any()); - verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, SYS_TENANT_ID, subQueryFirst, 1, 2001, COUNT); - verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, SYS_TENANT_ID, subQuerySecond, 2001, 3000 + 1, COUNT); + verify(tsDao, times(2)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any()); + verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 1, 2001, getTsForReadTsKvQuery(1, 2001), COUNT); + verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQuerySecond.getKey(), 2001, 3000 + 1, getTsForReadTsKvQuery(2001, 3001), COUNT); } @Test @@ -70,8 +70,8 @@ public class AbstractChunkedAggregationTimeseriesDaoTest { ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, 1, 3001, 1501, LIMIT, COUNT, DESC); willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); assertThat(tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query)).isNotNull(); - verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any()); - verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, SYS_TENANT_ID, subQueryFirst, 1, 3000 + 1, COUNT); + verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any()); + verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 1, 3000 + 1, getTsForReadTsKvQuery(1, 3001), COUNT); } @Test @@ -81,9 +81,9 @@ public class AbstractChunkedAggregationTimeseriesDaoTest { ReadTsKvQuery subQuerySecond = new BaseReadTsKvQuery(TEMP, 3000, 3001, 3000, LIMIT, COUNT, DESC); willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); - verify(tsDao, times(2)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any()); - verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, SYS_TENANT_ID, subQueryFirst, 1, 3000, COUNT); - verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, SYS_TENANT_ID, subQuerySecond, 3000, 3001, COUNT); + verify(tsDao, times(2)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any()); + verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 1, 3000, getTsForReadTsKvQuery(1, 3000), COUNT); + verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQuerySecond.getKey(), 3000, 3001, getTsForReadTsKvQuery(3000, 3001), COUNT); } @@ -93,8 +93,8 @@ public class AbstractChunkedAggregationTimeseriesDaoTest { ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, 1, 3001, 1501, LIMIT, COUNT, DESC); willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); - verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any()); - verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, SYS_TENANT_ID, subQueryFirst, 1, 3001, COUNT); + verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any()); + verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 1, 3001, getTsForReadTsKvQuery(1, 3001), COUNT); } @Test @@ -103,8 +103,8 @@ public class AbstractChunkedAggregationTimeseriesDaoTest { ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, 0, 1, 0, LIMIT, COUNT, DESC); willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); - verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any()); - verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, SYS_TENANT_ID, subQueryFirst, 0, 1, COUNT); + verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any()); + verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 0, 1, getTsForReadTsKvQuery(0, 1), COUNT); } @Test @@ -113,8 +113,8 @@ public class AbstractChunkedAggregationTimeseriesDaoTest { ReadTsKvQuery subQuery = new BaseReadTsKvQuery(TEMP, 1, 2, 1, LIMIT, COUNT, DESC); willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); - verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any()); - verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, SYS_TENANT_ID, subQuery, 1, 2, COUNT); + verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any()); + verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQuery.getKey(), 1, 2, getTsForReadTsKvQuery(1, 2), COUNT); } @Test @@ -123,8 +123,8 @@ public class AbstractChunkedAggregationTimeseriesDaoTest { ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, Integer.MAX_VALUE, Integer.MAX_VALUE + 1L, Integer.MAX_VALUE, LIMIT, COUNT, DESC); willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); - verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any()); - verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, SYS_TENANT_ID, subQueryFirst, Integer.MAX_VALUE, Integer.MAX_VALUE + 1L, COUNT); + verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any()); + verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), Integer.MAX_VALUE, 1L + Integer.MAX_VALUE, getTsForReadTsKvQuery(Integer.MAX_VALUE, 1L + Integer.MAX_VALUE), COUNT); } @Test @@ -133,8 +133,8 @@ public class AbstractChunkedAggregationTimeseriesDaoTest { ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, 1, 3001, 1501, LIMIT, COUNT, DESC); willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); - verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any()); - verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, SYS_TENANT_ID, subQueryFirst, 1, 3001, COUNT); + verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any()); + verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 1, 3001, getTsForReadTsKvQuery(1, 3001), COUNT); } @Test @@ -142,10 +142,15 @@ public class AbstractChunkedAggregationTimeseriesDaoTest { ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, 1, 3000, 3, LIMIT, COUNT, DESC); willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); - verify(tsDao, times(1000)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any()); + verify(tsDao, times(1000)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any()); for (long i = 1; i <= 3000; i += 3) { ReadTsKvQuery querySub = new BaseReadTsKvQuery(TEMP, i, i + 3, i + (i + 3 - i) / 2, LIMIT, COUNT, DESC); - verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, SYS_TENANT_ID, querySub, i, i + 3, COUNT); + verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, querySub.getKey(), i, i + 3, getTsForReadTsKvQuery(i, i + 3), COUNT); } } + + long getTsForReadTsKvQuery(long startTs, long endTs) { + return startTs + (endTs - startTs) / 2L; + } + }