diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 31e29434e8..6b2ab7e07b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -65,6 +65,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -85,6 +86,18 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD public static final String ASC_ORDER = "ASC"; public static final long SECONDS_IN_DAY = TimeUnit.DAYS.toSeconds(1); protected static final List FIXED_PARTITION = List.of(0L); + protected static final String INSERT_WITH_NULL = INSERT_INTO + ModelConstants.TS_KV_CF + + "(" + ModelConstants.ENTITY_TYPE_COLUMN + + "," + ModelConstants.ENTITY_ID_COLUMN + + "," + ModelConstants.KEY_COLUMN + + "," + ModelConstants.PARTITION_COLUMN + + "," + ModelConstants.TS_COLUMN + + "," + ModelConstants.BOOLEAN_VALUE_COLUMN + + "," + ModelConstants.STRING_VALUE_COLUMN + + "," + ModelConstants.LONG_VALUE_COLUMN + + "," + ModelConstants.DOUBLE_VALUE_COLUMN + + "," + ModelConstants.JSON_VALUE_COLUMN + ")" + + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; private CassandraTsPartitionsCache cassandraTsPartitionsCache; @@ -117,6 +130,8 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD private PreparedStatement[] fetchStmtsAsc; private PreparedStatement[] fetchStmtsDesc; private PreparedStatement deleteStmt; + private PreparedStatement saveWithNullStmt; + private PreparedStatement saveWithNullWithTtlStmt; private final Lock stmtCreationLock = new ReentrantLock(); private boolean isInstall() { @@ -159,19 +174,36 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD ttl = computeTtl(ttl); int dataPointDays = tsKvEntry.getDataPoints() * Math.max(1, (int) (ttl / SECONDS_IN_DAY)); long partition = toPartitionTs(tsKvEntry.getTs()); + String entityType = entityId.getEntityType().name(); + UUID entityIdId = entityId.getId(); + String entryKey = tsKvEntry.getKey(); + long ts = tsKvEntry.getTs(); DataType type = tsKvEntry.getDataType(); + BoundStatementBuilder stmtBuilder; if (setNullValuesEnabled) { - processSetNullValues(tenantId, entityId, tsKvEntry, ttl, futures, partition, type); - } - BoundStatementBuilder stmtBuilder = new BoundStatementBuilder((ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind()); - stmtBuilder.setString(0, entityId.getEntityType().name()) - .setUuid(1, entityId.getId()) - .setString(2, tsKvEntry.getKey()) - .setLong(3, partition) - .setLong(4, tsKvEntry.getTs()); - addValue(tsKvEntry, stmtBuilder, 5); - if (ttl > 0) { - stmtBuilder.setInt(6, (int) ttl); + Boolean booleanValue = tsKvEntry.getBooleanValue().orElse(null); + String strValue = tsKvEntry.getStrValue().orElse(null); + Long longValue = tsKvEntry.getLongValue().orElse(null); + Double doubleValue = tsKvEntry.getDoubleValue().orElse(null); + String jsonValue = tsKvEntry.getJsonValue().orElse(null); + if (ttl == 0) { + stmtBuilder = new BoundStatementBuilder(getSaveWithNullStmt() + .bind(entityType, entityIdId, entryKey, partition, ts, booleanValue, strValue, longValue, doubleValue, jsonValue)); + } else { + stmtBuilder = new BoundStatementBuilder(getSaveWithNullWithTtlStmt() + .bind(entityType, entityIdId, entryKey, partition, ts, booleanValue, strValue, longValue, doubleValue, jsonValue, (int) ttl)); + } + } else { + stmtBuilder = new BoundStatementBuilder((ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind()); + stmtBuilder.setString(0, entityType) + .setUuid(1, entityIdId) + .setString(2, entryKey) + .setLong(3, partition) + .setLong(4, ts); + addValue(tsKvEntry, stmtBuilder, 5); + if (ttl > 0) { + stmtBuilder.setInt(6, (int) ttl); + } } BoundStatement stmt = stmtBuilder.build(); futures.add(getFuture(executeAsyncWrite(tenantId, stmt), rs -> null)); @@ -449,56 +481,6 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD return tsFormat.getTruncateUnit().equals(ChronoUnit.FOREVER); } - private void processSetNullValues(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl, List> futures, long partition, DataType type) { - switch (type) { - case LONG: - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.BOOLEAN)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.DOUBLE)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.STRING)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.JSON)); - break; - case BOOLEAN: - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.DOUBLE)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.LONG)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.STRING)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.JSON)); - break; - case DOUBLE: - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.BOOLEAN)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.LONG)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.STRING)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.JSON)); - break; - case STRING: - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.BOOLEAN)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.DOUBLE)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.LONG)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.JSON)); - break; - case JSON: - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.BOOLEAN)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.DOUBLE)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.LONG)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.STRING)); - break; - } - } - - private ListenableFuture saveNull(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl, long partition, DataType type) { - BoundStatementBuilder stmtBuilder = new BoundStatementBuilder((ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind()); - stmtBuilder.setString(0, entityId.getEntityType().name()) - .setUuid(1, entityId.getId()) - .setString(2, tsKvEntry.getKey()) - .setLong(3, partition) - .setLong(4, tsKvEntry.getTs()); - stmtBuilder.setToNull(getColumnName(type)); - if (ttl > 0) { - stmtBuilder.setInt(6, (int) ttl); - } - BoundStatement stmt = stmtBuilder.build(); - return getFuture(executeAsyncWrite(tenantId, stmt), rs -> null); - } - private ListenableFuture doSavePartition(TenantId tenantId, EntityId entityId, String key, long ttl, long partition) { log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key); PreparedStatement preparedStatement = ttl == 0 ? getPartitionInsertStmt() : getPartitionInsertTtlStmt(); @@ -591,6 +573,34 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD return deleteStmt; } + private PreparedStatement getSaveWithNullStmt() { + if (saveWithNullStmt == null) { + stmtCreationLock.lock(); + try { + if (saveWithNullStmt == null) { + saveWithNullStmt = prepare(INSERT_WITH_NULL); + } + } finally { + stmtCreationLock.unlock(); + } + } + return saveWithNullStmt; + } + + private PreparedStatement getSaveWithNullWithTtlStmt() { + if (saveWithNullWithTtlStmt == null) { + stmtCreationLock.lock(); + try { + if (saveWithNullWithTtlStmt == null) { + saveWithNullWithTtlStmt = prepare(INSERT_WITH_NULL + " USING TTL ?"); + } + } finally { + stmtCreationLock.unlock(); + } + } + return saveWithNullWithTtlStmt; + } + private PreparedStatement getSaveStmt(DataType dataType) { if (saveStmts == null) { stmtCreationLock.lock(); diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java index ad351bb9f2..a624a2b6e5 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java @@ -33,6 +33,7 @@ import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BooleanDataEntry; import org.thingsboard.server.common.data.kv.DoubleDataEntry; +import org.thingsboard.server.common.data.kv.JsonDataEntry; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; @@ -54,7 +55,9 @@ import java.util.concurrent.TimeoutException; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; /** * @author Andrew Shvayka @@ -64,12 +67,12 @@ import static org.junit.Assert.assertNotNull; public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Autowired - TimeseriesService tsService; + protected TimeseriesService tsService; @Autowired EntityViewService entityViewService; - static final int MAX_TIMEOUT = 30; + protected static final int MAX_TIMEOUT = 30; private static final String STRING_KEY = "stringKey"; private static final String LONG_KEY = "longKey"; @@ -84,7 +87,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { KvEntry doubleKvEntry = new DoubleDataEntry(DOUBLE_KEY, Double.MAX_VALUE); KvEntry booleanKvEntry = new BooleanDataEntry(BOOLEAN_KEY, Boolean.TRUE); - private TenantId tenantId; + protected TenantId tenantId; @Before public void before() { @@ -673,6 +676,32 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { assertEquals(3, list.size()); } + @Test + public void shouldSaveEntryOfEachType() throws Exception { + BasicTsKvEntry booleanEntry = new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(1), new BooleanDataEntry("test", true)); + BasicTsKvEntry stringEntry = new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(2), new StringDataEntry("test", "text")); + BasicTsKvEntry longEntry = new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(3), new LongDataEntry("test", 15L)); + BasicTsKvEntry doubleEntry = new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(4), new DoubleDataEntry("test", 10.5)); + BasicTsKvEntry jsonEntry = new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(5), new JsonDataEntry("test", "{\"test\":\"testValue\"}")); + List timeseries = List.of(booleanEntry, stringEntry, longEntry, doubleEntry, jsonEntry); + + DeviceId deviceId = new DeviceId(Uuids.timeBased()); + for (TsKvEntry tsKvEntry : timeseries) { + save(tenantId, deviceId, tsKvEntry); + } + + List listUntil3Minutes = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("test", 0L, + TimeUnit.MINUTES.toMillis(3), 1000, 10, Aggregation.NONE))).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertEquals(2, listUntil3Minutes.size()); + assertThat(listUntil3Minutes).containsOnlyOnceElementsOf(List.of( + booleanEntry, stringEntry)); + + List fullList = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("test", 0L, + TimeUnit.MINUTES.toMillis(6), 1000, 10, Aggregation.NONE))).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertEquals(5, fullList.size()); + assertThat(fullList).containsOnlyOnceElementsOf(timeseries); + } + private TsKvEntry save(DeviceId deviceId, long ts, long value) throws Exception { TsKvEntry entry = new BasicTsKvEntry(ts, new LongDataEntry(LONG_KEY, value)); tsService.save(tenantId, deviceId, entry).get(MAX_TIMEOUT, TimeUnit.SECONDS); @@ -691,6 +720,9 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { return entry; } + private void save(TenantId tenantId, DeviceId deviceId, TsKvEntry tsKvEntry) throws Exception { + tsService.save(tenantId, deviceId, tsKvEntry).get(MAX_TIMEOUT, TimeUnit.SECONDS); + } private void saveEntries(DeviceId deviceId, long ts) throws ExecutionException, InterruptedException, TimeoutException { tsService.save(tenantId, deviceId, toTsEntry(ts, stringKvEntry)).get(MAX_TIMEOUT, TimeUnit.SECONDS); diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlSetNullEnabledTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlSetNullEnabledTest.java new file mode 100644 index 0000000000..1bbb37659c --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlSetNullEnabledTest.java @@ -0,0 +1,72 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.service.timeseries.nosql; + +import com.datastax.oss.driver.api.core.uuid.Uuids; +import org.junit.Test; +import org.springframework.test.context.TestPropertySource; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.kv.Aggregation; +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.DoubleDataEntry; +import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.dao.service.DaoNoSqlTest; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@DaoNoSqlTest +@TestPropertySource(properties = { + "cassandra.query.set_null_values_enabled=true", +}) +public class TimeseriesServiceNoSqlSetNullEnabledTest extends TimeseriesServiceNoSqlTest { + + @Override + @Test + public void testNullValuesOfNoneTargetColumn() throws ExecutionException, InterruptedException, TimeoutException { + long ts = TimeUnit.MINUTES.toMillis(1); + TsKvEntry longEntry = new BasicTsKvEntry(ts, new LongDataEntry("temp", 0L)); + double doubleValue = 20.6; + TsKvEntry doubleEntry = new BasicTsKvEntry(ts, new DoubleDataEntry("temp", doubleValue)); + DeviceId deviceId = new DeviceId(Uuids.timeBased()); + tsService.save(tenantId, deviceId, longEntry).get(MAX_TIMEOUT, TimeUnit.SECONDS); + tsService.save(tenantId, deviceId, doubleEntry).get(MAX_TIMEOUT, TimeUnit.SECONDS); + + List listWithoutAgg = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("temp", 0L, + ts + 1 , 1000, 3, Aggregation.NONE))).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertEquals(1, listWithoutAgg.size()); + assertFalse(listWithoutAgg.get(0).getLongValue().isPresent()); + assertTrue(listWithoutAgg.get(0).getDoubleValue().isPresent()); + assertThat(listWithoutAgg.get(0).getDoubleValue().get()).isEqualTo(doubleValue); + + // long value should be set to null after second insert, so avg = doubleValue + List listWithAgg = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("temp", 0L, + ts + 1 , 1000, 3, Aggregation.AVG))).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertEquals(1, listWithAgg.size()); + assertTrue(listWithAgg.get(0).getDoubleValue().isPresent()); + assertThat(listWithAgg.get(0).getDoubleValue().get()).isEqualTo(doubleValue); + } +} diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlTest.java index 229d5f5842..b8970feca3 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlTest.java @@ -15,9 +15,83 @@ */ package org.thingsboard.server.dao.service.timeseries.nosql; +import com.datastax.oss.driver.api.core.uuid.Uuids; +import org.junit.Test; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.kv.Aggregation; +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.BooleanDataEntry; +import org.thingsboard.server.common.data.kv.DoubleDataEntry; +import org.thingsboard.server.common.data.kv.JsonDataEntry; +import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.common.data.kv.StringDataEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.dao.service.DaoNoSqlTest; import org.thingsboard.server.dao.service.timeseries.BaseTimeseriesServiceTest; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + @DaoNoSqlTest public class TimeseriesServiceNoSqlTest extends BaseTimeseriesServiceTest { + + @Test + public void shouldSaveEntryOfEachTypeWithTtl() throws ExecutionException, InterruptedException, TimeoutException { + long ttlInSec = TimeUnit.SECONDS.toSeconds(3); + List timeseries = List.of( + new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(1), new BooleanDataEntry("test", true)), + new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(2), new StringDataEntry("test", "text")), + new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(3), new LongDataEntry("test", 15L)), + new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(4), new DoubleDataEntry("test", 10.5)), + new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(5), new JsonDataEntry("test", "{\"test\":\"testValue\"}"))); + + DeviceId deviceId = new DeviceId(Uuids.timeBased()); + tsService.save(tenantId, deviceId, timeseries, ttlInSec); + + List fullList = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("test", 0L, + TimeUnit.MINUTES.toMillis(6), 1000, 10, Aggregation.NONE))).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertEquals(5, fullList.size()); + + // check entries after ttl + Thread.sleep(TimeUnit.SECONDS.toMillis(ttlInSec + 1)); + List listAfterTtl = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("test", 0L, + TimeUnit.MINUTES.toMillis(6), 1000, 10, Aggregation.NONE))).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertEquals(0, listAfterTtl.size()); + } + + @Test + public void testNullValuesOfNoneTargetColumn() throws ExecutionException, InterruptedException, TimeoutException { + long ts = TimeUnit.MINUTES.toMillis(1); + long longValue = 10L; + TsKvEntry longEntry = new BasicTsKvEntry(ts, new LongDataEntry("temp", longValue)); + double doubleValue = 20.6; + TsKvEntry doubleEntry = new BasicTsKvEntry(ts, new DoubleDataEntry("temp", doubleValue)); + DeviceId deviceId = new DeviceId(Uuids.timeBased()); + tsService.save(tenantId, deviceId, longEntry).get(MAX_TIMEOUT, TimeUnit.SECONDS); + tsService.save(tenantId, deviceId, doubleEntry).get(MAX_TIMEOUT, TimeUnit.SECONDS); + + List listWithoutAgg = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("temp", 0L, + ts + 1 , 1000, 3, Aggregation.NONE))).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertEquals(1, listWithoutAgg.size()); + assertTrue(listWithoutAgg.get(0).getLongValue().isPresent()); + assertFalse(listWithoutAgg.get(0).getDoubleValue().isPresent()); + assertThat(listWithoutAgg.get(0).getLongValue().get()).isEqualTo(longValue); + + // long value should not be reset to null, so avg = (doubleValue + longValue)/ 2 + List listWithAgg = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("temp", 0L, + ts + 1, 200000, 3, Aggregation.AVG))).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertEquals(1, listWithAgg.size()); + assertTrue(listWithAgg.get(0).getDoubleValue().isPresent()); + double expectedValue = (doubleValue + longValue)/ 2; + assertThat(listWithAgg.get(0).getDoubleValue().get()).isEqualTo(expectedValue); + } }