Browse Source

Merge pull request #10089 from dashevchenko/cassandra_set_null

Updated CassandraBaseTimeseriesDao to do one db insert instead of 5 when setNullValuesEnabled=true
pull/10149/head
Andrew Shvayka 2 years ago
committed by GitHub
parent
commit
74ea54d68b
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 132
      dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
  2. 38
      dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
  3. 72
      dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlSetNullEnabledTest.java
  4. 74
      dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlTest.java

132
dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java

@ -65,6 +65,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -85,6 +86,18 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
public static final String ASC_ORDER = "ASC";
public static final long SECONDS_IN_DAY = TimeUnit.DAYS.toSeconds(1);
protected static final List<Long> FIXED_PARTITION = List.of(0L);
protected static final String INSERT_WITH_NULL = INSERT_INTO + ModelConstants.TS_KV_CF +
"(" + ModelConstants.ENTITY_TYPE_COLUMN +
"," + ModelConstants.ENTITY_ID_COLUMN +
"," + ModelConstants.KEY_COLUMN +
"," + ModelConstants.PARTITION_COLUMN +
"," + ModelConstants.TS_COLUMN +
"," + ModelConstants.BOOLEAN_VALUE_COLUMN +
"," + ModelConstants.STRING_VALUE_COLUMN +
"," + ModelConstants.LONG_VALUE_COLUMN +
"," + ModelConstants.DOUBLE_VALUE_COLUMN +
"," + ModelConstants.JSON_VALUE_COLUMN + ")" +
" VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
private CassandraTsPartitionsCache cassandraTsPartitionsCache;
@ -117,6 +130,8 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
private PreparedStatement[] fetchStmtsAsc;
private PreparedStatement[] fetchStmtsDesc;
private PreparedStatement deleteStmt;
private PreparedStatement saveWithNullStmt;
private PreparedStatement saveWithNullWithTtlStmt;
private final Lock stmtCreationLock = new ReentrantLock();
private boolean isInstall() {
@ -159,19 +174,36 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
ttl = computeTtl(ttl);
int dataPointDays = tsKvEntry.getDataPoints() * Math.max(1, (int) (ttl / SECONDS_IN_DAY));
long partition = toPartitionTs(tsKvEntry.getTs());
String entityType = entityId.getEntityType().name();
UUID entityIdId = entityId.getId();
String entryKey = tsKvEntry.getKey();
long ts = tsKvEntry.getTs();
DataType type = tsKvEntry.getDataType();
BoundStatementBuilder stmtBuilder;
if (setNullValuesEnabled) {
processSetNullValues(tenantId, entityId, tsKvEntry, ttl, futures, partition, type);
}
BoundStatementBuilder stmtBuilder = new BoundStatementBuilder((ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind());
stmtBuilder.setString(0, entityId.getEntityType().name())
.setUuid(1, entityId.getId())
.setString(2, tsKvEntry.getKey())
.setLong(3, partition)
.setLong(4, tsKvEntry.getTs());
addValue(tsKvEntry, stmtBuilder, 5);
if (ttl > 0) {
stmtBuilder.setInt(6, (int) ttl);
Boolean booleanValue = tsKvEntry.getBooleanValue().orElse(null);
String strValue = tsKvEntry.getStrValue().orElse(null);
Long longValue = tsKvEntry.getLongValue().orElse(null);
Double doubleValue = tsKvEntry.getDoubleValue().orElse(null);
String jsonValue = tsKvEntry.getJsonValue().orElse(null);
if (ttl == 0) {
stmtBuilder = new BoundStatementBuilder(getSaveWithNullStmt()
.bind(entityType, entityIdId, entryKey, partition, ts, booleanValue, strValue, longValue, doubleValue, jsonValue));
} else {
stmtBuilder = new BoundStatementBuilder(getSaveWithNullWithTtlStmt()
.bind(entityType, entityIdId, entryKey, partition, ts, booleanValue, strValue, longValue, doubleValue, jsonValue, (int) ttl));
}
} else {
stmtBuilder = new BoundStatementBuilder((ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind());
stmtBuilder.setString(0, entityType)
.setUuid(1, entityIdId)
.setString(2, entryKey)
.setLong(3, partition)
.setLong(4, ts);
addValue(tsKvEntry, stmtBuilder, 5);
if (ttl > 0) {
stmtBuilder.setInt(6, (int) ttl);
}
}
BoundStatement stmt = stmtBuilder.build();
futures.add(getFuture(executeAsyncWrite(tenantId, stmt), rs -> null));
@ -449,56 +481,6 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
return tsFormat.getTruncateUnit().equals(ChronoUnit.FOREVER);
}
private void processSetNullValues(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl, List<ListenableFuture<Void>> futures, long partition, DataType type) {
switch (type) {
case LONG:
futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.BOOLEAN));
futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.DOUBLE));
futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.STRING));
futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.JSON));
break;
case BOOLEAN:
futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.DOUBLE));
futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.LONG));
futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.STRING));
futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.JSON));
break;
case DOUBLE:
futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.BOOLEAN));
futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.LONG));
futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.STRING));
futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.JSON));
break;
case STRING:
futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.BOOLEAN));
futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.DOUBLE));
futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.LONG));
futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.JSON));
break;
case JSON:
futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.BOOLEAN));
futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.DOUBLE));
futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.LONG));
futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.STRING));
break;
}
}
private ListenableFuture<Void> saveNull(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl, long partition, DataType type) {
BoundStatementBuilder stmtBuilder = new BoundStatementBuilder((ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind());
stmtBuilder.setString(0, entityId.getEntityType().name())
.setUuid(1, entityId.getId())
.setString(2, tsKvEntry.getKey())
.setLong(3, partition)
.setLong(4, tsKvEntry.getTs());
stmtBuilder.setToNull(getColumnName(type));
if (ttl > 0) {
stmtBuilder.setInt(6, (int) ttl);
}
BoundStatement stmt = stmtBuilder.build();
return getFuture(executeAsyncWrite(tenantId, stmt), rs -> null);
}
private ListenableFuture<Integer> doSavePartition(TenantId tenantId, EntityId entityId, String key, long ttl, long partition) {
log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key);
PreparedStatement preparedStatement = ttl == 0 ? getPartitionInsertStmt() : getPartitionInsertTtlStmt();
@ -591,6 +573,34 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
return deleteStmt;
}
private PreparedStatement getSaveWithNullStmt() {
if (saveWithNullStmt == null) {
stmtCreationLock.lock();
try {
if (saveWithNullStmt == null) {
saveWithNullStmt = prepare(INSERT_WITH_NULL);
}
} finally {
stmtCreationLock.unlock();
}
}
return saveWithNullStmt;
}
private PreparedStatement getSaveWithNullWithTtlStmt() {
if (saveWithNullWithTtlStmt == null) {
stmtCreationLock.lock();
try {
if (saveWithNullWithTtlStmt == null) {
saveWithNullWithTtlStmt = prepare(INSERT_WITH_NULL + " USING TTL ?");
}
} finally {
stmtCreationLock.unlock();
}
}
return saveWithNullWithTtlStmt;
}
private PreparedStatement getSaveStmt(DataType dataType) {
if (saveStmts == null) {
stmtCreationLock.lock();

38
dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java

@ -33,6 +33,7 @@ import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.JsonDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
@ -54,7 +55,9 @@ import java.util.concurrent.TimeoutException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* @author Andrew Shvayka
@ -64,12 +67,12 @@ import static org.junit.Assert.assertNotNull;
public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
@Autowired
TimeseriesService tsService;
protected TimeseriesService tsService;
@Autowired
EntityViewService entityViewService;
static final int MAX_TIMEOUT = 30;
protected static final int MAX_TIMEOUT = 30;
private static final String STRING_KEY = "stringKey";
private static final String LONG_KEY = "longKey";
@ -84,7 +87,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
KvEntry doubleKvEntry = new DoubleDataEntry(DOUBLE_KEY, Double.MAX_VALUE);
KvEntry booleanKvEntry = new BooleanDataEntry(BOOLEAN_KEY, Boolean.TRUE);
private TenantId tenantId;
protected TenantId tenantId;
@Before
public void before() {
@ -673,6 +676,32 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
assertEquals(3, list.size());
}
@Test
public void shouldSaveEntryOfEachType() throws Exception {
BasicTsKvEntry booleanEntry = new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(1), new BooleanDataEntry("test", true));
BasicTsKvEntry stringEntry = new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(2), new StringDataEntry("test", "text"));
BasicTsKvEntry longEntry = new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(3), new LongDataEntry("test", 15L));
BasicTsKvEntry doubleEntry = new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(4), new DoubleDataEntry("test", 10.5));
BasicTsKvEntry jsonEntry = new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(5), new JsonDataEntry("test", "{\"test\":\"testValue\"}"));
List<TsKvEntry> timeseries = List.of(booleanEntry, stringEntry, longEntry, doubleEntry, jsonEntry);
DeviceId deviceId = new DeviceId(Uuids.timeBased());
for (TsKvEntry tsKvEntry : timeseries) {
save(tenantId, deviceId, tsKvEntry);
}
List<TsKvEntry> listUntil3Minutes = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("test", 0L,
TimeUnit.MINUTES.toMillis(3), 1000, 10, Aggregation.NONE))).get(MAX_TIMEOUT, TimeUnit.SECONDS);
assertEquals(2, listUntil3Minutes.size());
assertThat(listUntil3Minutes).containsOnlyOnceElementsOf(List.of(
booleanEntry, stringEntry));
List<TsKvEntry> fullList = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("test", 0L,
TimeUnit.MINUTES.toMillis(6), 1000, 10, Aggregation.NONE))).get(MAX_TIMEOUT, TimeUnit.SECONDS);
assertEquals(5, fullList.size());
assertThat(fullList).containsOnlyOnceElementsOf(timeseries);
}
private TsKvEntry save(DeviceId deviceId, long ts, long value) throws Exception {
TsKvEntry entry = new BasicTsKvEntry(ts, new LongDataEntry(LONG_KEY, value));
tsService.save(tenantId, deviceId, entry).get(MAX_TIMEOUT, TimeUnit.SECONDS);
@ -691,6 +720,9 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
return entry;
}
private void save(TenantId tenantId, DeviceId deviceId, TsKvEntry tsKvEntry) throws Exception {
tsService.save(tenantId, deviceId, tsKvEntry).get(MAX_TIMEOUT, TimeUnit.SECONDS);
}
private void saveEntries(DeviceId deviceId, long ts) throws ExecutionException, InterruptedException, TimeoutException {
tsService.save(tenantId, deviceId, toTsEntry(ts, stringKvEntry)).get(MAX_TIMEOUT, TimeUnit.SECONDS);

72
dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlSetNullEnabledTest.java

@ -0,0 +1,72 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.service.timeseries.nosql;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import org.junit.Test;
import org.springframework.test.context.TestPropertySource;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.service.DaoNoSqlTest;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@DaoNoSqlTest
@TestPropertySource(properties = {
"cassandra.query.set_null_values_enabled=true",
})
public class TimeseriesServiceNoSqlSetNullEnabledTest extends TimeseriesServiceNoSqlTest {
@Override
@Test
public void testNullValuesOfNoneTargetColumn() throws ExecutionException, InterruptedException, TimeoutException {
long ts = TimeUnit.MINUTES.toMillis(1);
TsKvEntry longEntry = new BasicTsKvEntry(ts, new LongDataEntry("temp", 0L));
double doubleValue = 20.6;
TsKvEntry doubleEntry = new BasicTsKvEntry(ts, new DoubleDataEntry("temp", doubleValue));
DeviceId deviceId = new DeviceId(Uuids.timeBased());
tsService.save(tenantId, deviceId, longEntry).get(MAX_TIMEOUT, TimeUnit.SECONDS);
tsService.save(tenantId, deviceId, doubleEntry).get(MAX_TIMEOUT, TimeUnit.SECONDS);
List<TsKvEntry> listWithoutAgg = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("temp", 0L,
ts + 1 , 1000, 3, Aggregation.NONE))).get(MAX_TIMEOUT, TimeUnit.SECONDS);
assertEquals(1, listWithoutAgg.size());
assertFalse(listWithoutAgg.get(0).getLongValue().isPresent());
assertTrue(listWithoutAgg.get(0).getDoubleValue().isPresent());
assertThat(listWithoutAgg.get(0).getDoubleValue().get()).isEqualTo(doubleValue);
// long value should be set to null after second insert, so avg = doubleValue
List<TsKvEntry> listWithAgg = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("temp", 0L,
ts + 1 , 1000, 3, Aggregation.AVG))).get(MAX_TIMEOUT, TimeUnit.SECONDS);
assertEquals(1, listWithAgg.size());
assertTrue(listWithAgg.get(0).getDoubleValue().isPresent());
assertThat(listWithAgg.get(0).getDoubleValue().get()).isEqualTo(doubleValue);
}
}

74
dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlTest.java

@ -15,9 +15,83 @@
*/
package org.thingsboard.server.dao.service.timeseries.nosql;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import org.junit.Test;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.JsonDataEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.service.DaoNoSqlTest;
import org.thingsboard.server.dao.service.timeseries.BaseTimeseriesServiceTest;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@DaoNoSqlTest
public class TimeseriesServiceNoSqlTest extends BaseTimeseriesServiceTest {
@Test
public void shouldSaveEntryOfEachTypeWithTtl() throws ExecutionException, InterruptedException, TimeoutException {
long ttlInSec = TimeUnit.SECONDS.toSeconds(3);
List<TsKvEntry> timeseries = List.of(
new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(1), new BooleanDataEntry("test", true)),
new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(2), new StringDataEntry("test", "text")),
new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(3), new LongDataEntry("test", 15L)),
new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(4), new DoubleDataEntry("test", 10.5)),
new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(5), new JsonDataEntry("test", "{\"test\":\"testValue\"}")));
DeviceId deviceId = new DeviceId(Uuids.timeBased());
tsService.save(tenantId, deviceId, timeseries, ttlInSec);
List<TsKvEntry> fullList = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("test", 0L,
TimeUnit.MINUTES.toMillis(6), 1000, 10, Aggregation.NONE))).get(MAX_TIMEOUT, TimeUnit.SECONDS);
assertEquals(5, fullList.size());
// check entries after ttl
Thread.sleep(TimeUnit.SECONDS.toMillis(ttlInSec + 1));
List<TsKvEntry> listAfterTtl = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("test", 0L,
TimeUnit.MINUTES.toMillis(6), 1000, 10, Aggregation.NONE))).get(MAX_TIMEOUT, TimeUnit.SECONDS);
assertEquals(0, listAfterTtl.size());
}
@Test
public void testNullValuesOfNoneTargetColumn() throws ExecutionException, InterruptedException, TimeoutException {
long ts = TimeUnit.MINUTES.toMillis(1);
long longValue = 10L;
TsKvEntry longEntry = new BasicTsKvEntry(ts, new LongDataEntry("temp", longValue));
double doubleValue = 20.6;
TsKvEntry doubleEntry = new BasicTsKvEntry(ts, new DoubleDataEntry("temp", doubleValue));
DeviceId deviceId = new DeviceId(Uuids.timeBased());
tsService.save(tenantId, deviceId, longEntry).get(MAX_TIMEOUT, TimeUnit.SECONDS);
tsService.save(tenantId, deviceId, doubleEntry).get(MAX_TIMEOUT, TimeUnit.SECONDS);
List<TsKvEntry> listWithoutAgg = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("temp", 0L,
ts + 1 , 1000, 3, Aggregation.NONE))).get(MAX_TIMEOUT, TimeUnit.SECONDS);
assertEquals(1, listWithoutAgg.size());
assertTrue(listWithoutAgg.get(0).getLongValue().isPresent());
assertFalse(listWithoutAgg.get(0).getDoubleValue().isPresent());
assertThat(listWithoutAgg.get(0).getLongValue().get()).isEqualTo(longValue);
// long value should not be reset to null, so avg = (doubleValue + longValue)/ 2
List<TsKvEntry> listWithAgg = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("temp", 0L,
ts + 1, 200000, 3, Aggregation.AVG))).get(MAX_TIMEOUT, TimeUnit.SECONDS);
assertEquals(1, listWithAgg.size());
assertTrue(listWithAgg.get(0).getDoubleValue().isPresent());
double expectedValue = (doubleValue + longValue)/ 2;
assertThat(listWithAgg.get(0).getDoubleValue().get()).isEqualTo(expectedValue);
}
}

Loading…
Cancel
Save