@ -20,6 +20,7 @@ import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet ;
import com.datastax.driver.core.ResultSetFuture ;
import com.datastax.driver.core.Row ;
import com.datastax.driver.core.Statement ;
import com.datastax.driver.core.querybuilder.QueryBuilder ;
import com.datastax.driver.core.querybuilder.Select ;
import com.google.common.base.Function ;
@ -34,16 +35,17 @@ import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component ;
import org.thingsboard.server.common.data.id.EntityId ;
import org.thingsboard.server.common.data.kv.Aggregation ;
import org.thingsboard.server.common.data.kv.BaseTsKvQuery ;
import org.thingsboard.server.common.data.kv.BaseRead TsKvQuery ;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry ;
import org.thingsboard.server.common.data.kv.BooleanDataEntry ;
import org.thingsboard.server.common.data.kv.DataType ;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery ;
import org.thingsboard.server.common.data.kv.DoubleDataEntry ;
import org.thingsboard.server.common.data.kv.KvEntry ;
import org.thingsboard.server.common.data.kv.LongDataEntry ;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery ;
import org.thingsboard.server.common.data.kv.StringDataEntry ;
import org.thingsboard.server.common.data.kv.TsKvEntry ;
import org.thingsboard.server.common.data.kv.TsKvQuery ;
import org.thingsboard.server.dao.model.ModelConstants ;
import org.thingsboard.server.dao.nosql.CassandraAbstractAsyncDao ;
import org.thingsboard.server.dao.util.NoSqlDao ;
@ -54,11 +56,11 @@ import javax.annotation.PreDestroy;
import java.time.Instant ;
import java.time.LocalDateTime ;
import java.time.ZoneOffset ;
import java.util.ArrayList ;
import java.util.Arrays ;
import java.util.Collections ;
import java.util.List ;
import java.util.ArrayList ;
import java.util.Optional ;
import java.util.Collections ;
import java.util.stream.Collectors ;
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq ;
@ -76,8 +78,8 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
public static final String GENERATED_QUERY_FOR_ENTITY_TYPE_AND_ENTITY_ID = "Generated query [{}] for entityType {} and entityId {}" ;
public static final String SELECT_PREFIX = "SELECT " ;
public static final String EQUALS_PARAM = " = ? " ;
public static final String ASC_ORDER = "ASC" ;
public static final String DESC_ORDER = "DESC" ;
private static List < Long > FIXED_PARTITION = Arrays . asList ( new Long [ ] { 0L } ) ;
@Autowired
@ -96,9 +98,12 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
private PreparedStatement latestInsertStmt ;
private PreparedStatement [ ] saveStmts ;
private PreparedStatement [ ] saveTtlStmts ;
private PreparedStatement [ ] fetchStmts ;
private PreparedStatement [ ] fetchStmtsAsc ;
private PreparedStatement [ ] fetchStmtsDesc ;
private PreparedStatement findLatestStmt ;
private PreparedStatement findAllLatestStmt ;
private PreparedStatement deleteStmt ;
private PreparedStatement deletePartitionStmt ;
private boolean isInstall ( ) {
return environment . acceptsProfiles ( "install" ) ;
@ -108,7 +113,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
public void init ( ) {
super . startExecutor ( ) ;
if ( ! isInstall ( ) ) {
getFetchStmt ( Aggregation . NONE ) ;
getFetchStmt ( Aggregation . NONE , DESC_ORDER ) ;
Optional < TsPartitionDate > partition = TsPartitionDate . parse ( partitioning ) ;
if ( partition . isPresent ( ) ) {
tsFormat = partition . get ( ) ;
@ -125,7 +130,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
}
@Override
public ListenableFuture < List < TsKvEntry > > findAllAsync ( EntityId entityId , List < TsKvQuery > queries ) {
public ListenableFuture < List < TsKvEntry > > findAllAsync ( EntityId entityId , List < Read TsKvQuery> queries ) {
List < ListenableFuture < List < TsKvEntry > > > futures = queries . stream ( ) . map ( query - > findAllAsync ( entityId , query ) ) . collect ( Collectors . toList ( ) ) ;
return Futures . transform ( Futures . allAsList ( futures ) , new Function < List < List < TsKvEntry > > , List < TsKvEntry > > ( ) {
@Nullable
@ -142,7 +147,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
}
private ListenableFuture < List < TsKvEntry > > findAllAsync ( EntityId entityId , TsKvQuery query ) {
private ListenableFuture < List < TsKvEntry > > findAllAsync ( EntityId entityId , Read TsKvQuery query ) {
if ( query . getAggregation ( ) = = Aggregation . NONE ) {
return findAllAsyncWithLimit ( entityId , query ) ;
} else {
@ -152,7 +157,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
while ( stepTs < query . getEndTs ( ) ) {
long startTs = stepTs ;
long endTs = stepTs + step ;
TsKvQuery subQuery = new BaseTsKvQuery ( query . getKey ( ) , startTs , endTs , step , 1 , query . getAggregation ( ) ) ;
Read TsKvQuery subQuery = new BaseRead TsKvQuery ( query . getKey ( ) , startTs , endTs , step , 1 , query . getAggregation ( ) , query . getOrderBy ( ) ) ;
futures . add ( findAndAggregateAsync ( entityId , subQuery , toPartitionTs ( startTs ) , toPartitionTs ( endTs ) ) ) ;
stepTs = endTs ;
}
@ -171,7 +176,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
return tsFormat . getTruncateUnit ( ) . equals ( TsPartitionDate . EPOCH_START ) ;
}
private ListenableFuture < List < Long > > getPartitionsFuture ( TsKvQuery query , EntityId entityId , long minPartition , long maxPartition ) {
private ListenableFuture < List < Long > > getPartitionsFuture ( Read TsKvQuery query , EntityId entityId , long minPartition , long maxPartition ) {
if ( isFixedPartitioning ( ) ) { //no need to fetch partitions from DB
return Futures . immediateFuture ( FIXED_PARTITION ) ;
}
@ -179,11 +184,9 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
return Futures . transform ( partitionsFuture , getPartitionsArrayFunction ( ) , readResultsProcessingExecutor ) ;
}
private ListenableFuture < List < TsKvEntry > > findAllAsyncWithLimit ( EntityId entityId , TsKvQuery query ) {
private ListenableFuture < List < TsKvEntry > > findAllAsyncWithLimit ( EntityId entityId , ReadTsKvQuery query ) {
long minPartition = toPartitionTs ( query . getStartTs ( ) ) ;
long maxPartition = toPartitionTs ( query . getEndTs ( ) ) ;
final ListenableFuture < List < Long > > partitionsListFuture = getPartitionsFuture ( query , entityId , minPartition , maxPartition ) ;
final SimpleListenableFuture < List < TsKvEntry > > resultFuture = new SimpleListenableFuture < > ( ) ;
@ -212,7 +215,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
if ( cursor . isFull ( ) | | ! cursor . hasNextPartition ( ) ) {
resultFuture . set ( cursor . getData ( ) ) ;
} else {
PreparedStatement proto = getFetchStmt ( Aggregation . NONE ) ;
PreparedStatement proto = getFetchStmt ( Aggregation . NONE , cursor . getOrderBy ( ) ) ;
BoundStatement stmt = proto . bind ( ) ;
stmt . setString ( 0 , cursor . getEntityType ( ) ) ;
stmt . setUUID ( 1 , cursor . getEntityId ( ) ) ;
@ -237,14 +240,12 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
}
}
private ListenableFuture < Optional < TsKvEntry > > findAndAggregateAsync ( EntityId entityId , TsKvQuery query , long minPartition , long maxPartition ) {
private ListenableFuture < Optional < TsKvEntry > > findAndAggregateAsync ( EntityId entityId , Read TsKvQuery query , long minPartition , long maxPartition ) {
final Aggregation aggregation = query . getAggregation ( ) ;
final String key = query . getKey ( ) ;
final long startTs = query . getStartTs ( ) ;
final long endTs = query . getEndTs ( ) ;
final long ts = startTs + ( endTs - startTs ) / 2 ;
ListenableFuture < List < Long > > partitionsListFuture = getPartitionsFuture ( query , entityId , minPartition , maxPartition ) ;
ListenableFuture < List < ResultSet > > aggregationChunks = Futures . transformAsync ( partitionsListFuture ,
getFetchChunksAsyncFunction ( entityId , key , aggregation , startTs , endTs ) , readResultsProcessingExecutor ) ;
@ -260,7 +261,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
private AsyncFunction < List < Long > , List < ResultSet > > getFetchChunksAsyncFunction ( EntityId entityId , String key , Aggregation aggregation , long startTs , long endTs ) {
return partitions - > {
try {
PreparedStatement proto = getFetchStmt ( aggregation ) ;
PreparedStatement proto = getFetchStmt ( aggregation , DESC_ORDER ) ;
List < ResultSetFuture > futures = new ArrayList < > ( partitions . size ( ) ) ;
for ( Long partition : partitions ) {
log . trace ( "Fetching data for partition [{}] for entityType {} and entityId {}" , partition , entityId . getEntityType ( ) , entityId . getId ( ) ) ;
@ -363,6 +364,204 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
return getFuture ( executeAsyncWrite ( stmt ) , rs - > null ) ;
}
@Override
public ListenableFuture < Void > remove ( EntityId entityId , DeleteTsKvQuery query ) {
long minPartition = toPartitionTs ( query . getStartTs ( ) ) ;
long maxPartition = toPartitionTs ( query . getEndTs ( ) ) ;
ResultSetFuture partitionsFuture = fetchPartitions ( entityId , query . getKey ( ) , minPartition , maxPartition ) ;
final SimpleListenableFuture < Void > resultFuture = new SimpleListenableFuture < > ( ) ;
final ListenableFuture < List < Long > > partitionsListFuture = Futures . transform ( partitionsFuture , getPartitionsArrayFunction ( ) , readResultsProcessingExecutor ) ;
Futures . addCallback ( partitionsListFuture , new FutureCallback < List < Long > > ( ) {
@Override
public void onSuccess ( @Nullable List < Long > partitions ) {
QueryCursor cursor = new QueryCursor ( entityId . getEntityType ( ) . name ( ) , entityId . getId ( ) , query , partitions ) ;
deleteAsync ( cursor , resultFuture ) ;
}
@Override
public void onFailure ( Throwable t ) {
log . error ( "[{}][{}] Failed to fetch partitions for interval {}-{}" , entityId . getEntityType ( ) . name ( ) , entityId . getId ( ) , minPartition , maxPartition , t ) ;
}
} , readResultsProcessingExecutor ) ;
return resultFuture ;
}
private void deleteAsync ( final QueryCursor cursor , final SimpleListenableFuture < Void > resultFuture ) {
if ( ! cursor . hasNextPartition ( ) ) {
resultFuture . set ( null ) ;
} else {
PreparedStatement proto = getDeleteStmt ( ) ;
BoundStatement stmt = proto . bind ( ) ;
stmt . setString ( 0 , cursor . getEntityType ( ) ) ;
stmt . setUUID ( 1 , cursor . getEntityId ( ) ) ;
stmt . setString ( 2 , cursor . getKey ( ) ) ;
stmt . setLong ( 3 , cursor . getNextPartition ( ) ) ;
stmt . setLong ( 4 , cursor . getStartTs ( ) ) ;
stmt . setLong ( 5 , cursor . getEndTs ( ) ) ;
Futures . addCallback ( executeAsyncWrite ( stmt ) , new FutureCallback < ResultSet > ( ) {
@Override
public void onSuccess ( @Nullable ResultSet result ) {
deleteAsync ( cursor , resultFuture ) ;
}
@Override
public void onFailure ( Throwable t ) {
log . error ( "[{}][{}] Failed to delete data for query {}-{}" , stmt , t ) ;
}
} , readResultsProcessingExecutor ) ;
}
}
private PreparedStatement getDeleteStmt ( ) {
if ( deleteStmt = = null ) {
deleteStmt = prepare ( "DELETE FROM " + ModelConstants . TS_KV_CF +
" WHERE " + ModelConstants . ENTITY_TYPE_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants . ENTITY_ID_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants . KEY_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants . PARTITION_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants . TS_COLUMN + " > ? "
+ "AND " + ModelConstants . TS_COLUMN + " <= ?" ) ;
}
return deleteStmt ;
}
@Override
public ListenableFuture < Void > removeLatest ( EntityId entityId , DeleteTsKvQuery query ) {
ListenableFuture < TsKvEntry > latestEntryFuture = findLatest ( entityId , query . getKey ( ) ) ;
ListenableFuture < Boolean > booleanFuture = Futures . transformAsync ( latestEntryFuture , latestEntry - > {
long ts = latestEntry . getTs ( ) ;
if ( ts > = query . getStartTs ( ) & & ts < = query . getEndTs ( ) ) {
return Futures . immediateFuture ( true ) ;
} else {
log . trace ( "Won't be deleted latest value for [{}], key - {}" , entityId , query . getKey ( ) ) ;
}
return Futures . immediateFuture ( false ) ;
} , readResultsProcessingExecutor ) ;
ListenableFuture < Void > removedLatestFuture = Futures . transformAsync ( booleanFuture , isRemove - > {
if ( isRemove ) {
return deleteLatest ( entityId , query . getKey ( ) ) ;
}
return Futures . immediateFuture ( null ) ;
} , readResultsProcessingExecutor ) ;
if ( query . getRewriteLatestIfDeleted ( ) ) {
ListenableFuture < Void > savedLatestFuture = Futures . transformAsync ( booleanFuture , isRemove - > {
if ( isRemove ) {
return getNewLatestEntryFuture ( entityId , query ) ;
}
return Futures . immediateFuture ( null ) ;
} , readResultsProcessingExecutor ) ;
return Futures . transformAsync ( Futures . allAsList ( Arrays . asList ( savedLatestFuture , removedLatestFuture ) ) ,
list - > Futures . immediateFuture ( null ) , readResultsProcessingExecutor ) ;
}
return removedLatestFuture ;
}
private ListenableFuture < Void > getNewLatestEntryFuture ( EntityId entityId , DeleteTsKvQuery query ) {
long startTs = 0 ;
long endTs = query . getStartTs ( ) - 1 ;
ReadTsKvQuery findNewLatestQuery = new BaseReadTsKvQuery ( query . getKey ( ) , startTs , endTs , endTs - startTs , 1 ,
Aggregation . NONE , DESC_ORDER ) ;
ListenableFuture < List < TsKvEntry > > future = findAllAsync ( entityId , findNewLatestQuery ) ;
return Futures . transformAsync ( future , entryList - > {
if ( entryList . size ( ) = = 1 ) {
return saveLatest ( entityId , entryList . get ( 0 ) ) ;
} else {
log . trace ( "Could not find new latest value for [{}], key - {}" , entityId , query . getKey ( ) ) ;
}
return Futures . immediateFuture ( null ) ;
} , readResultsProcessingExecutor ) ;
}
private ListenableFuture < Void > deleteLatest ( EntityId entityId , String key ) {
Statement delete = QueryBuilder . delete ( ) . all ( ) . from ( ModelConstants . TS_KV_LATEST_CF )
. where ( eq ( ModelConstants . ENTITY_TYPE_COLUMN , entityId . getEntityType ( ) ) )
. and ( eq ( ModelConstants . ENTITY_ID_COLUMN , entityId . getId ( ) ) )
. and ( eq ( ModelConstants . KEY_COLUMN , key ) ) ;
log . debug ( "Remove request: {}" , delete . toString ( ) ) ;
return getFuture ( executeAsyncWrite ( delete ) , rs - > null ) ;
}
@Override
public ListenableFuture < Void > removePartition ( EntityId entityId , DeleteTsKvQuery query ) {
long minPartition = toPartitionTs ( query . getStartTs ( ) ) ;
long maxPartition = toPartitionTs ( query . getEndTs ( ) ) ;
if ( minPartition = = maxPartition ) {
return Futures . immediateFuture ( null ) ;
} else {
ResultSetFuture partitionsFuture = fetchPartitions ( entityId , query . getKey ( ) , minPartition , maxPartition ) ;
final SimpleListenableFuture < Void > resultFuture = new SimpleListenableFuture < > ( ) ;
final ListenableFuture < List < Long > > partitionsListFuture = Futures . transform ( partitionsFuture , getPartitionsArrayFunction ( ) , readResultsProcessingExecutor ) ;
Futures . addCallback ( partitionsListFuture , new FutureCallback < List < Long > > ( ) {
@Override
public void onSuccess ( @Nullable List < Long > partitions ) {
int index = 0 ;
if ( minPartition ! = query . getStartTs ( ) ) {
index = 1 ;
}
List < Long > partitionsToDelete = new ArrayList < > ( ) ;
for ( int i = index ; i < partitions . size ( ) - 1 ; i + + ) {
partitionsToDelete . add ( partitions . get ( i ) ) ;
}
QueryCursor cursor = new QueryCursor ( entityId . getEntityType ( ) . name ( ) , entityId . getId ( ) , query , partitionsToDelete ) ;
deletePartitionAsync ( cursor , resultFuture ) ;
}
@Override
public void onFailure ( Throwable t ) {
log . error ( "[{}][{}] Failed to fetch partitions for interval {}-{}" , entityId . getEntityType ( ) . name ( ) , entityId . getId ( ) , minPartition , maxPartition , t ) ;
}
} , readResultsProcessingExecutor ) ;
return resultFuture ;
}
}
private void deletePartitionAsync ( final QueryCursor cursor , final SimpleListenableFuture < Void > resultFuture ) {
if ( ! cursor . hasNextPartition ( ) ) {
resultFuture . set ( null ) ;
} else {
PreparedStatement proto = getDeletePartitionStmt ( ) ;
BoundStatement stmt = proto . bind ( ) ;
stmt . setString ( 0 , cursor . getEntityType ( ) ) ;
stmt . setUUID ( 1 , cursor . getEntityId ( ) ) ;
stmt . setLong ( 2 , cursor . getNextPartition ( ) ) ;
stmt . setString ( 3 , cursor . getKey ( ) ) ;
Futures . addCallback ( executeAsyncWrite ( stmt ) , new FutureCallback < ResultSet > ( ) {
@Override
public void onSuccess ( @Nullable ResultSet result ) {
deletePartitionAsync ( cursor , resultFuture ) ;
}
@Override
public void onFailure ( Throwable t ) {
log . error ( "[{}][{}] Failed to delete data for query {}-{}" , stmt , t ) ;
}
} , readResultsProcessingExecutor ) ;
}
}
private PreparedStatement getDeletePartitionStmt ( ) {
if ( deletePartitionStmt = = null ) {
deletePartitionStmt = prepare ( "DELETE FROM " + ModelConstants . TS_KV_PARTITIONS_CF +
" WHERE " + ModelConstants . ENTITY_TYPE_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants . ENTITY_ID_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants . PARTITION_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants . KEY_COLUMN + EQUALS_PARAM ) ;
}
return deletePartitionStmt ;
}
private List < TsKvEntry > convertResultToTsKvEntryList ( List < Row > rows ) {
List < TsKvEntry > entries = new ArrayList < > ( rows . size ( ) ) ;
if ( ! rows . isEmpty ( ) ) {
@ -458,28 +657,43 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
return saveTtlStmts [ dataType . ordinal ( ) ] ;
}
private PreparedStatement getFetchStmt ( Aggregation aggType ) {
if ( fetchStmts = = null ) {
fetchStmts = new PreparedStatement [ Aggregation . values ( ) . length ] ;
for ( Aggregation type : Aggregation . values ( ) ) {
if ( type = = Aggregation . SUM & & fetchStmts [ Aggregation . AVG . ordinal ( ) ] ! = null ) {
fetchStmts [ type . ordinal ( ) ] = fetchStmts [ Aggregation . AVG . ordinal ( ) ] ;
} else if ( type = = Aggregation . AVG & & fetchStmts [ Aggregation . SUM . ordinal ( ) ] ! = null ) {
fetchStmts [ type . ordinal ( ) ] = fetchStmts [ Aggregation . SUM . ordinal ( ) ] ;
} else {
fetchStmts [ type . ordinal ( ) ] = prepare ( SELECT_PREFIX +
String . join ( ", " , ModelConstants . getFetchColumnNames ( type ) ) + " FROM " + ModelConstants . TS_KV_CF
+ " WHERE " + ModelConstants . ENTITY_TYPE_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants . ENTITY_ID_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants . KEY_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants . PARTITION_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants . TS_COLUMN + " > ? "
+ "AND " + ModelConstants . TS_COLUMN + " <= ?"
+ ( type = = Aggregation . NONE ? " ORDER BY " + ModelConstants . TS_COLUMN + " DESC LIMIT ?" : "" ) ) ;
private PreparedStatement getFetchStmt ( Aggregation aggType , String orderBy ) {
switch ( orderBy ) {
case ASC_ORDER :
if ( fetchStmtsAsc = = null ) {
fetchStmtsAsc = initFetchStmt ( orderBy ) ;
}
return fetchStmtsAsc [ aggType . ordinal ( ) ] ;
case DESC_ORDER :
if ( fetchStmtsDesc = = null ) {
fetchStmtsDesc = initFetchStmt ( orderBy ) ;
}
return fetchStmtsDesc [ aggType . ordinal ( ) ] ;
default :
throw new RuntimeException ( "Not supported" + orderBy + "order!" ) ;
}
}
private PreparedStatement [ ] initFetchStmt ( String orderBy ) {
PreparedStatement [ ] fetchStmts = new PreparedStatement [ Aggregation . values ( ) . length ] ;
for ( Aggregation type : Aggregation . values ( ) ) {
if ( type = = Aggregation . SUM & & fetchStmts [ Aggregation . AVG . ordinal ( ) ] ! = null ) {
fetchStmts [ type . ordinal ( ) ] = fetchStmts [ Aggregation . AVG . ordinal ( ) ] ;
} else if ( type = = Aggregation . AVG & & fetchStmts [ Aggregation . SUM . ordinal ( ) ] ! = null ) {
fetchStmts [ type . ordinal ( ) ] = fetchStmts [ Aggregation . SUM . ordinal ( ) ] ;
} else {
fetchStmts [ type . ordinal ( ) ] = prepare ( SELECT_PREFIX +
String . join ( ", " , ModelConstants . getFetchColumnNames ( type ) ) + " FROM " + ModelConstants . TS_KV_CF
+ " WHERE " + ModelConstants . ENTITY_TYPE_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants . ENTITY_ID_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants . KEY_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants . PARTITION_COLUMN + EQUALS_PARAM
+ "AND " + ModelConstants . TS_COLUMN + " > ? "
+ "AND " + ModelConstants . TS_COLUMN + " <= ?"
+ ( type = = Aggregation . NONE ? " ORDER BY " + ModelConstants . TS_COLUMN + " " + orderBy + " LIMIT ?" : "" ) ) ;
}
}
return fetchStmts [ aggType . ordinal ( ) ] ;
return fetchStmts ;
}
private PreparedStatement getLatestStmt ( ) {