Browse Source

improve logic

pull/5905/head
van-vanich 4 years ago
parent
commit
7d7701723c
  1. 36
      dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java
  2. 12
      dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java
  3. 29
      dao/src/main/java/org/thingsboard/server/dao/sqlts/AggregationTimeseriesDao.java
  4. 23
      dao/src/main/java/org/thingsboard/server/dao/sqlts/BaseAbstractSqlTimeseriesDao.java
  5. 1
      dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java
  6. 5
      dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java
  7. 97
      dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregationTimeseriesDao.java
  8. 46
      dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
  9. 1
      dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java
  10. 45
      dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java

36
dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java

@ -26,6 +26,7 @@ import org.springframework.data.domain.Sort;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry;
@ -118,12 +119,28 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
}
@Override
public long getTsForReadTsKvQuery(long startTs, long endTs) {
return startTs + (endTs - startTs) / 2;
public ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
if (query.getAggregation() == Aggregation.NONE) {
return findAllAsyncWithLimit(entityId, query);
} else {
List<ListenableFuture<Optional<TsKvEntry>>> futures = new ArrayList<>();
long endPeriod = query.getEndTs();
long startPeriod = query.getStartTs();
long step = query.getInterval();
while (startPeriod <= endPeriod) {
long startTs = startPeriod;
long endTs = Math.min(startPeriod + step, endPeriod + 1);
long ts = startTs + (endTs - startTs) / 2;
ReadTsKvQuery subQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, ts, 1, query.getAggregation(), query.getOrder());
ListenableFuture<Optional<TsKvEntry>> aggregateTsKvEntry = findAndAggregateAsync(entityId, subQuery.getKey(), startTs, endTs, ts, query.getAggregation());
futures.add(aggregateTsKvEntry);
startPeriod = endTs;
}
return getTskvEntriesFuture(Futures.allAsList(futures));
}
}
@Override
public ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) {
Integer keyId = getOrSaveKeyId(query.getKey());
List<TsKvEntity> tsKvEntities = tsKvRepository.findAllWithLimit(
entityId.getId(),
@ -136,17 +153,6 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
return Futures.immediateFuture(DaoUtil.convertDataList(tsKvEntities));
}
@Override
public ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query, long startTs, long endTs, Aggregation aggregation) {
return findAndAggregateAsync(entityId,
query.getKey(),
startTs,
endTs,
getTsForReadTsKvQuery(startTs, endTs),
aggregation);
}
ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {
List<CompletableFuture<TsKvEntity>> entitiesFutures = new ArrayList<>();
switchAggregation(entityId, key, startTs, endTs, aggregation, entitiesFutures);

12
dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java

@ -27,7 +27,6 @@ import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
import org.thingsboard.server.dao.timeseries.AggregationTimeseriesDao;
import javax.annotation.Nullable;
import java.sql.Connection;
@ -36,7 +35,6 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -122,14 +120,4 @@ public abstract class AbstractSqlTimeseriesDao extends BaseAbstractSqlTimeseries
protected int getDataPointDays(TsKvEntry tsKvEntry, long ttl) {
return tsKvEntry.getDataPoints() * Math.max(1, (int) (ttl / SECONDS_IN_DAY));
}
@Override
public Executor getExecutor() {
return service;
}
@Override
public long getIntervalGreaterOrEqualsMinAggregationStep(long interval) {
return interval;
}
}

29
dao/src/main/java/org/thingsboard/server/dao/sqlts/AggregationTimeseriesDao.java

@ -0,0 +1,29 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sqlts;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import java.util.List;
public interface AggregationTimeseriesDao {
ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query);
}

23
dao/src/main/java/org/thingsboard/server/dao/sqlts/BaseAbstractSqlTimeseriesDao.java

@ -15,18 +15,25 @@
*/
package org.thingsboard.server.dao.sqlts;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.hibernate.exception.ConstraintViolationException;
import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.model.sqlts.dictionary.TsKvDictionary;
import org.thingsboard.server.dao.model.sqlts.dictionary.TsKvDictionaryCompositeKey;
import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
import org.thingsboard.server.dao.sqlts.dictionary.TsKvDictionaryRepository;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@Slf4j
public abstract class BaseAbstractSqlTimeseriesDao extends JpaAbstractDaoListeningExecutorService {
@ -72,4 +79,20 @@ public abstract class BaseAbstractSqlTimeseriesDao extends JpaAbstractDaoListeni
return keyId;
}
protected ListenableFuture<List<TsKvEntry>> getTskvEntriesFuture(ListenableFuture<List<Optional<TsKvEntry>>> future) {
return Futures.transform(future, new Function<List<Optional<TsKvEntry>>, List<TsKvEntry>>() {
@Nullable
@Override
public List<TsKvEntry> apply(@Nullable List<Optional<TsKvEntry>> results) {
if (results == null || results.isEmpty()) {
return null;
}
return results.stream()
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
}
}, service);
}
}

1
dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java

@ -45,7 +45,6 @@ import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository;
import org.thingsboard.server.dao.sqlts.latest.SearchTsKvLatestRepository;
import org.thingsboard.server.dao.sqlts.latest.TsKvLatestRepository;
import org.thingsboard.server.dao.timeseries.AggregationTimeseriesDao;
import org.thingsboard.server.dao.timeseries.TimeseriesLatestDao;
import org.thingsboard.server.dao.util.SqlTsLatestAnyDao;

5
dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java

@ -165,11 +165,6 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
super.cleanup(systemTtl);
}
@Override
public ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
return findAllAsyncWithLimit(entityId, query);
}
private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) {
String strKey = query.getKey();
Integer keyId = getOrSaveKeyId(strKey);

97
dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregationTimeseriesDao.java

@ -1,97 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.timeseries;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
public interface AggregationTimeseriesDao {
default ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
if (query.getAggregation() == Aggregation.NONE) {
return findAllAsyncWithLimit(tenantId, entityId, query);
} else {
List<ListenableFuture<Optional<TsKvEntry>>> futures = findIntervals(tenantId, entityId, query);
return getTskvEntriesFuture(Futures.allAsList(futures));
}
}
default List<ListenableFuture<Optional<TsKvEntry>>> findIntervals(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
List<ListenableFuture<Optional<TsKvEntry>>> futures = new ArrayList<>();
long endPeriod = query.getEndTs();
long startPeriod = query.getStartTs();
long step = query.getInterval();
while (startPeriod <= endPeriod) {
long startTs = startPeriod;
long endTs = Math.min(startPeriod + step, endPeriod + 1);
long ts = getTsForReadTsKvQuery(startTs, endTs);
ReadTsKvQuery subQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, ts, 1, query.getAggregation(), query.getOrder());
ListenableFuture<Optional<TsKvEntry>> aggregateTsKvEntry = findAndAggregateAsync(tenantId, entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs), query.getAggregation());
futures.add(aggregateTsKvEntry);
startPeriod = endTs;
}
return futures;
}
default long getTsForReadTsKvQuery(long startTs, long endTs) {
return endTs - startTs;
}
long getIntervalGreaterOrEqualsMinAggregationStep(long interval);
default ListenableFuture<List<TsKvEntry>> getTskvEntriesFuture(ListenableFuture<List<Optional<TsKvEntry>>> allAsList) {
return Futures.transform(allAsList, new Function<>() {
@Nullable
@Override
public List<TsKvEntry> apply(@Nullable List<Optional<TsKvEntry>> results) {
if (results == null || results.isEmpty()) {
return null;
}
return results.stream()
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
}
}, getExecutor());
}
Executor getExecutor();
default ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery key, long startTs, long endTs, Aggregation aggregation) {
return Futures.immediateFuture(null);
}
ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query);
default long toPartitionTs(long ts) {
return ts;
}
}

46
dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java

@ -37,6 +37,7 @@ import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.KvEntry;
@ -45,6 +46,7 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.nosql.TbResultSet;
import org.thingsboard.server.dao.nosql.TbResultSetFuture;
import org.thingsboard.server.dao.sqlts.AggregationTimeseriesDao;
import org.thingsboard.server.dao.util.NoSqlTsDao;
import javax.annotation.Nullable;
@ -59,7 +61,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -265,13 +266,31 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
}
@Override
public long getIntervalGreaterOrEqualsMinAggregationStep(long interval) {
return Math.max(interval, MIN_AGGREGATION_STEP_MS);
}
@Override
public ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query, long startTs, long endTs, Aggregation aggregation) {
return findAndAggregateAsync(tenantId, entityId, query, startTs, endTs);
public ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
if (query.getAggregation() == Aggregation.NONE) {
return findAllAsyncWithLimit(tenantId, entityId, query);
} else {
long startPeriod = query.getStartTs();
long endPeriod = query.getEndTs();
long step = Math.max(query.getInterval(), MIN_AGGREGATION_STEP_MS);
List<ListenableFuture<Optional<TsKvEntry>>> futures = new ArrayList<>();
while (startPeriod <= endPeriod) {
long startTs = startPeriod;
long endTs = Math.min(startPeriod + step, endPeriod + 1);
long ts = endTs - startTs;
ReadTsKvQuery subQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, ts, 1, query.getAggregation(), query.getOrder());
futures.add(findAndAggregateAsync(tenantId, entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs)));
startPeriod = endTs;
}
ListenableFuture<List<Optional<TsKvEntry>>> future = Futures.allAsList(futures);
return Futures.transform(future, new Function<>() {
@Nullable
@Override
public List<TsKvEntry> apply(@Nullable List<Optional<TsKvEntry>> input) {
return input == null ? Collections.emptyList() : input.stream().filter(v -> v.isPresent()).map(v -> v.get()).collect(Collectors.toList());
}
}, readResultsProcessingExecutor);
}
}
@Override
@ -279,8 +298,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
//Cleanup by TTL is native for Cassandra
}
@Override
public ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
long minPartition = toPartitionTs(query.getStartTs());
long maxPartition = toPartitionTs(query.getEndTs());
final ListenableFuture<List<Long>> partitionsListFuture = getPartitionsFuture(tenantId, query, entityId, minPartition, maxPartition);
@ -302,17 +320,11 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
return resultFuture;
}
@Override
public long toPartitionTs(long ts) {
private long toPartitionTs(long ts) {
LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);
return tsFormat.truncatedTo(time).toInstant(ZoneOffset.UTC).toEpochMilli();
}
@Override
public Executor getExecutor() {
return readResultsProcessingExecutor;
}
private void findAllAsyncSequentiallyWithLimit(TenantId tenantId, final TsKvQueryCursor cursor, final SimpleListenableFuture<List<TsKvEntry>> resultFuture) {
if (cursor.isFull() || !cursor.hasNextPartition()) {
resultFuture.set(cursor.getData());

1
dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java

@ -37,6 +37,7 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.nosql.TbResultSet;
import org.thingsboard.server.dao.sqlts.AggregationTimeseriesDao;
import org.thingsboard.server.dao.util.NoSqlTsLatestDao;
import java.util.Collections;

45
dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java

@ -59,9 +59,9 @@ public class AbstractChunkedAggregationTimeseriesDaoTest {
ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, 1, 2001, 1001, LIMIT, COUNT, DESC);
ReadTsKvQuery subQuerySecond = new BaseReadTsKvQuery(TEMP, 2001, 3001, 2501, LIMIT, COUNT, DESC);
tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
verify(tsDao, times(2)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, SYS_TENANT_ID, subQueryFirst, 1, 2001, COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, SYS_TENANT_ID, subQuerySecond, 2001, 3000 + 1, COUNT);
verify(tsDao, times(2)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 1, 2001, getTsForReadTsKvQuery(1, 2001), COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQuerySecond.getKey(), 2001, 3000 + 1, getTsForReadTsKvQuery(2001, 3001), COUNT);
}
@Test
@ -70,8 +70,8 @@ public class AbstractChunkedAggregationTimeseriesDaoTest {
ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, 1, 3001, 1501, LIMIT, COUNT, DESC);
willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
assertThat(tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query)).isNotNull();
verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, SYS_TENANT_ID, subQueryFirst, 1, 3000 + 1, COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 1, 3000 + 1, getTsForReadTsKvQuery(1, 3001), COUNT);
}
@Test
@ -81,9 +81,9 @@ public class AbstractChunkedAggregationTimeseriesDaoTest {
ReadTsKvQuery subQuerySecond = new BaseReadTsKvQuery(TEMP, 3000, 3001, 3000, LIMIT, COUNT, DESC);
willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
verify(tsDao, times(2)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, SYS_TENANT_ID, subQueryFirst, 1, 3000, COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, SYS_TENANT_ID, subQuerySecond, 3000, 3001, COUNT);
verify(tsDao, times(2)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 1, 3000, getTsForReadTsKvQuery(1, 3000), COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQuerySecond.getKey(), 3000, 3001, getTsForReadTsKvQuery(3000, 3001), COUNT);
}
@ -93,8 +93,8 @@ public class AbstractChunkedAggregationTimeseriesDaoTest {
ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, 1, 3001, 1501, LIMIT, COUNT, DESC);
willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, SYS_TENANT_ID, subQueryFirst, 1, 3001, COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 1, 3001, getTsForReadTsKvQuery(1, 3001), COUNT);
}
@Test
@ -103,8 +103,8 @@ public class AbstractChunkedAggregationTimeseriesDaoTest {
ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, 0, 1, 0, LIMIT, COUNT, DESC);
willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, SYS_TENANT_ID, subQueryFirst, 0, 1, COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 0, 1, getTsForReadTsKvQuery(0, 1), COUNT);
}
@Test
@ -113,8 +113,8 @@ public class AbstractChunkedAggregationTimeseriesDaoTest {
ReadTsKvQuery subQuery = new BaseReadTsKvQuery(TEMP, 1, 2, 1, LIMIT, COUNT, DESC);
willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, SYS_TENANT_ID, subQuery, 1, 2, COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQuery.getKey(), 1, 2, getTsForReadTsKvQuery(1, 2), COUNT);
}
@Test
@ -123,8 +123,8 @@ public class AbstractChunkedAggregationTimeseriesDaoTest {
ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, Integer.MAX_VALUE, Integer.MAX_VALUE + 1L, Integer.MAX_VALUE, LIMIT, COUNT, DESC);
willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, SYS_TENANT_ID, subQueryFirst, Integer.MAX_VALUE, Integer.MAX_VALUE + 1L, COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), Integer.MAX_VALUE, 1L + Integer.MAX_VALUE, getTsForReadTsKvQuery(Integer.MAX_VALUE, 1L + Integer.MAX_VALUE), COUNT);
}
@Test
@ -133,8 +133,8 @@ public class AbstractChunkedAggregationTimeseriesDaoTest {
ReadTsKvQuery subQueryFirst = new BaseReadTsKvQuery(TEMP, 1, 3001, 1501, LIMIT, COUNT, DESC);
willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, SYS_TENANT_ID, subQueryFirst, 1, 3001, COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any());
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, subQueryFirst.getKey(), 1, 3001, getTsForReadTsKvQuery(1, 3001), COUNT);
}
@Test
@ -142,10 +142,15 @@ public class AbstractChunkedAggregationTimeseriesDaoTest {
ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, 1, 3000, 3, LIMIT, COUNT, DESC);
willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query);
verify(tsDao, times(1000)).findAndAggregateAsync(any(), any(), any(), anyLong(), anyLong(), any());
verify(tsDao, times(1000)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any());
for (long i = 1; i <= 3000; i += 3) {
ReadTsKvQuery querySub = new BaseReadTsKvQuery(TEMP, i, i + 3, i + (i + 3 - i) / 2, LIMIT, COUNT, DESC);
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, SYS_TENANT_ID, querySub, i, i + 3, COUNT);
verify(tsDao, times(1)).findAndAggregateAsync(SYS_TENANT_ID, querySub.getKey(), i, i + 3, getTsForReadTsKvQuery(i, i + 3), COUNT);
}
}
long getTsForReadTsKvQuery(long startTs, long endTs) {
return startTs + (endTs - startTs) / 2L;
}
}

Loading…
Cancel
Save