@ -25,7 +25,6 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.TsKvEntry ;
import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity ;
import org.thingsboard.server.dao.sqlts.AbstractChunkedAggregationTimeseriesDao ;
import org.thingsboard.server.dao.sqlts.EntityContainer ;
import org.thingsboard.server.dao.sqlts.insert.psql.PsqlPartitioningRepository ;
import org.thingsboard.server.dao.timeseries.PsqlPartition ;
import org.thingsboard.server.dao.timeseries.SqlTsPartitionDate ;
@ -42,8 +41,6 @@ import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.locks.ReentrantLock ;
import static org.thingsboard.server.dao.timeseries.SqlTsPartitionDate.EPOCH_START ;
@Component
@Slf4j
@ -58,7 +55,6 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa
private PsqlPartitioningRepository partitioningRepository ;
private SqlTsPartitionDate tsFormat ;
private PsqlPartition indefinitePartition ;
@Value ( "${sql.postgres.ts_key_value_partitioning:MONTHS}" )
private String partitioning ;
@ -69,10 +65,6 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa
Optional < SqlTsPartitionDate > partition = SqlTsPartitionDate . parse ( partitioning ) ;
if ( partition . isPresent ( ) ) {
tsFormat = partition . get ( ) ;
if ( tsFormat . equals ( SqlTsPartitionDate . INDEFINITE ) ) {
indefinitePartition = new PsqlPartition ( toMills ( EPOCH_START ) , Long . MAX_VALUE , tsFormat . getPattern ( ) ) ;
savePartition ( indefinitePartition ) ;
}
} else {
log . warn ( "Incorrect configuration of partitioning {}" , partitioning ) ;
throw new RuntimeException ( "Failed to parse partitioning property: " + partitioning + "!" ) ;
@ -81,6 +73,7 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa
@Override
public ListenableFuture < Void > save ( TenantId tenantId , EntityId entityId , TsKvEntry tsKvEntry , long ttl ) {
savePartitionIfNotExist ( tsKvEntry . getTs ( ) ) ;
String strKey = tsKvEntry . getKey ( ) ;
Integer keyId = getOrSaveKeyId ( strKey ) ;
TsKvEntity entity = new TsKvEntity ( ) ;
@ -92,9 +85,23 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa
entity . setLongValue ( tsKvEntry . getLongValue ( ) . orElse ( null ) ) ;
entity . setBooleanValue ( tsKvEntry . getBooleanValue ( ) . orElse ( null ) ) ;
entity . setJsonValue ( tsKvEntry . getJsonValue ( ) . orElse ( null ) ) ;
PsqlPartition psqlPartition = toPartition ( tsKvEntry . getTs ( ) ) ;
log . trace ( "Saving entity: {}" , entity ) ;
return tsQueue . add ( new EntityContainer ( entity , psqlPartition . getPartitionDate ( ) ) ) ;
return tsQueue . add ( entity ) ;
}
private void savePartitionIfNotExist ( long ts ) {
if ( ! tsFormat . equals ( SqlTsPartitionDate . INDEFINITE ) ) {
LocalDateTime time = LocalDateTime . ofInstant ( Instant . ofEpochMilli ( ts ) , ZoneOffset . UTC ) ;
LocalDateTime localDateTimeStart = tsFormat . trancateTo ( time ) ;
long partitionStartTs = toMills ( localDateTimeStart ) ;
if ( partitions . get ( partitionStartTs ) = = null ) {
LocalDateTime localDateTimeEnd = tsFormat . plusTo ( localDateTimeStart ) ;
long partitionEndTs = toMills ( localDateTimeEnd ) ;
ZonedDateTime zonedDateTime = localDateTimeStart . atZone ( ZoneOffset . UTC ) ;
String partitionDate = zonedDateTime . format ( DateTimeFormatter . ofPattern ( tsFormat . getPattern ( ) ) ) ;
savePartition ( new PsqlPartition ( partitionStartTs , partitionEndTs , partitionDate ) ) ;
}
}
}
private void savePartition ( PsqlPartition psqlPartition ) {
@ -111,28 +118,6 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa
}
}
private PsqlPartition toPartition ( long ts ) {
if ( tsFormat . equals ( SqlTsPartitionDate . INDEFINITE ) ) {
return indefinitePartition ;
} else {
LocalDateTime time = LocalDateTime . ofInstant ( Instant . ofEpochMilli ( ts ) , ZoneOffset . UTC ) ;
LocalDateTime localDateTimeStart = tsFormat . trancateTo ( time ) ;
long partitionStartTs = toMills ( localDateTimeStart ) ;
PsqlPartition partition = partitions . get ( partitionStartTs ) ;
if ( partition ! = null ) {
return partition ;
} else {
LocalDateTime localDateTimeEnd = tsFormat . plusTo ( localDateTimeStart ) ;
long partitionEndTs = toMills ( localDateTimeEnd ) ;
ZonedDateTime zonedDateTime = localDateTimeStart . atZone ( ZoneOffset . UTC ) ;
String partitionDate = zonedDateTime . format ( DateTimeFormatter . ofPattern ( tsFormat . getPattern ( ) ) ) ;
partition = new PsqlPartition ( partitionStartTs , partitionEndTs , partitionDate ) ;
savePartition ( partition ) ;
return partition ;
}
}
}
private static long toMills ( LocalDateTime time ) {
return time . toInstant ( ZoneOffset . UTC ) . toEpochMilli ( ) ;
}