From 53b57b1351143ed6f23422e7f983e563b3860fb1 Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Fri, 2 Feb 2024 14:45:28 +0200 Subject: [PATCH 1/4] updated CassandraBaseTimeseriesDao to do one db insert instead of 5 when setNullValuesEnabled=true --- .../CassandraBaseTimeseriesDao.java | 142 ++++++++++-------- .../timeseries/BaseTimeseriesServiceTest.java | 35 ++++- ...eseriesServiceNoSqlSetNullEnabledTest.java | 72 +++++++++ .../nosql/TimeseriesServiceNoSqlTest.java | 96 ++++++++++++ 4 files changed, 281 insertions(+), 64 deletions(-) create mode 100644 dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlSetNullEnabledTest.java diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 31e29434e8..8d96dd293b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -65,6 +65,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -117,6 +118,8 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD private PreparedStatement[] fetchStmtsAsc; private PreparedStatement[] fetchStmtsDesc; private PreparedStatement deleteStmt; + private PreparedStatement saveWithNullStmt; + private PreparedStatement saveWithNullWithTtlStmt; private final Lock stmtCreationLock = new ReentrantLock(); private boolean isInstall() { @@ -159,19 +162,36 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD ttl = computeTtl(ttl); int dataPointDays = tsKvEntry.getDataPoints() * Math.max(1, (int) (ttl / SECONDS_IN_DAY)); long partition = toPartitionTs(tsKvEntry.getTs()); + String entityType = entityId.getEntityType().name(); + UUID entityIdId = entityId.getId(); + String entryKey = tsKvEntry.getKey(); + long ts = tsKvEntry.getTs(); DataType type = tsKvEntry.getDataType(); + BoundStatementBuilder stmtBuilder; if (setNullValuesEnabled) { - processSetNullValues(tenantId, entityId, tsKvEntry, ttl, futures, partition, type); - } - BoundStatementBuilder stmtBuilder = new BoundStatementBuilder((ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind()); - stmtBuilder.setString(0, entityId.getEntityType().name()) - .setUuid(1, entityId.getId()) - .setString(2, tsKvEntry.getKey()) - .setLong(3, partition) - .setLong(4, tsKvEntry.getTs()); - addValue(tsKvEntry, stmtBuilder, 5); - if (ttl > 0) { - stmtBuilder.setInt(6, (int) ttl); + Boolean booleanValue = tsKvEntry.getBooleanValue().orElse(null); + String strValue = tsKvEntry.getStrValue().orElse(null); + Long longValue = tsKvEntry.getLongValue().orElse(null); + Double doubleValue = tsKvEntry.getDoubleValue().orElse(null); + String jsonValue = tsKvEntry.getJsonValue().orElse(null); + if (ttl == 0) { + stmtBuilder = new BoundStatementBuilder(getSaveWithNullStmt() + .bind(entityType, entityIdId, entryKey, partition, ts, booleanValue, strValue, longValue, doubleValue, jsonValue)); + } else { + stmtBuilder = new BoundStatementBuilder(getSaveWithNullWithTtlStmt() + .bind(entityType, entityIdId, entryKey, partition, ts, booleanValue, strValue, longValue, doubleValue, jsonValue, (int) ttl)); + } + } else { + stmtBuilder = new BoundStatementBuilder((ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind()); + stmtBuilder.setString(0, entityType) + .setUuid(1, entityIdId) + .setString(2, entryKey) + .setLong(3, partition) + .setLong(4, ts); + addValue(tsKvEntry, stmtBuilder, 5); + if (ttl > 0) { + stmtBuilder.setInt(6, (int) ttl); + } } BoundStatement stmt = stmtBuilder.build(); futures.add(getFuture(executeAsyncWrite(tenantId, stmt), rs -> null)); @@ -449,56 +469,6 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD return tsFormat.getTruncateUnit().equals(ChronoUnit.FOREVER); } - private void processSetNullValues(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl, List> futures, long partition, DataType type) { - switch (type) { - case LONG: - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.BOOLEAN)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.DOUBLE)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.STRING)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.JSON)); - break; - case BOOLEAN: - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.DOUBLE)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.LONG)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.STRING)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.JSON)); - break; - case DOUBLE: - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.BOOLEAN)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.LONG)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.STRING)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.JSON)); - break; - case STRING: - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.BOOLEAN)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.DOUBLE)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.LONG)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.JSON)); - break; - case JSON: - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.BOOLEAN)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.DOUBLE)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.LONG)); - futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.STRING)); - break; - } - } - - private ListenableFuture saveNull(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl, long partition, DataType type) { - BoundStatementBuilder stmtBuilder = new BoundStatementBuilder((ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind()); - stmtBuilder.setString(0, entityId.getEntityType().name()) - .setUuid(1, entityId.getId()) - .setString(2, tsKvEntry.getKey()) - .setLong(3, partition) - .setLong(4, tsKvEntry.getTs()); - stmtBuilder.setToNull(getColumnName(type)); - if (ttl > 0) { - stmtBuilder.setInt(6, (int) ttl); - } - BoundStatement stmt = stmtBuilder.build(); - return getFuture(executeAsyncWrite(tenantId, stmt), rs -> null); - } - private ListenableFuture doSavePartition(TenantId tenantId, EntityId entityId, String key, long ttl, long partition) { log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key); PreparedStatement preparedStatement = ttl == 0 ? getPartitionInsertStmt() : getPartitionInsertTtlStmt(); @@ -591,6 +561,56 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD return deleteStmt; } + private PreparedStatement getSaveWithNullStmt() { + if (saveWithNullStmt == null) { + stmtCreationLock.lock(); + try { + if (saveWithNullStmt == null) { + saveWithNullStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_CF + + "(" + ModelConstants.ENTITY_TYPE_COLUMN + + "," + ModelConstants.ENTITY_ID_COLUMN + + "," + ModelConstants.KEY_COLUMN + + "," + ModelConstants.PARTITION_COLUMN + + "," + ModelConstants.TS_COLUMN + + "," + ModelConstants.BOOLEAN_VALUE_COLUMN + + "," + ModelConstants.STRING_VALUE_COLUMN + + "," + ModelConstants.LONG_VALUE_COLUMN + + "," + ModelConstants.DOUBLE_VALUE_COLUMN + + "," + ModelConstants.JSON_VALUE_COLUMN + ")" + + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"); + } + } finally { + stmtCreationLock.unlock(); + } + } + return saveWithNullStmt; + } + + private PreparedStatement getSaveWithNullWithTtlStmt() { + if (saveWithNullWithTtlStmt == null) { + stmtCreationLock.lock(); + try { + if (saveWithNullWithTtlStmt == null) { + saveWithNullWithTtlStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_CF + + "(" + ModelConstants.ENTITY_TYPE_COLUMN + + "," + ModelConstants.ENTITY_ID_COLUMN + + "," + ModelConstants.KEY_COLUMN + + "," + ModelConstants.PARTITION_COLUMN + + "," + ModelConstants.TS_COLUMN + + "," + ModelConstants.BOOLEAN_VALUE_COLUMN + + "," + ModelConstants.STRING_VALUE_COLUMN + + "," + ModelConstants.LONG_VALUE_COLUMN + + "," + ModelConstants.DOUBLE_VALUE_COLUMN + + "," + ModelConstants.JSON_VALUE_COLUMN + ")" + + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?) USING TTL ?"); + } + } finally { + stmtCreationLock.unlock(); + } + } + return saveWithNullWithTtlStmt; + } + private PreparedStatement getSaveStmt(DataType dataType) { if (saveStmts == null) { stmtCreationLock.lock(); diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java index ad351bb9f2..b06394e8d2 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java @@ -33,6 +33,7 @@ import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BooleanDataEntry; import org.thingsboard.server.common.data.kv.DoubleDataEntry; +import org.thingsboard.server.common.data.kv.JsonDataEntry; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; @@ -54,7 +55,9 @@ import java.util.concurrent.TimeoutException; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; /** * @author Andrew Shvayka @@ -64,12 +67,12 @@ import static org.junit.Assert.assertNotNull; public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Autowired - TimeseriesService tsService; + protected TimeseriesService tsService; @Autowired EntityViewService entityViewService; - static final int MAX_TIMEOUT = 30; + protected static final int MAX_TIMEOUT = 30; private static final String STRING_KEY = "stringKey"; private static final String LONG_KEY = "longKey"; @@ -84,7 +87,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { KvEntry doubleKvEntry = new DoubleDataEntry(DOUBLE_KEY, Double.MAX_VALUE); KvEntry booleanKvEntry = new BooleanDataEntry(BOOLEAN_KEY, Boolean.TRUE); - private TenantId tenantId; + protected TenantId tenantId; @Before public void before() { @@ -673,6 +676,29 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { assertEquals(3, list.size()); } + @Test + public void shouldSaveEntryOfEachType() throws Exception { + List timeseries = List.of( + new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(1), new BooleanDataEntry("test", true)), + new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(2), new StringDataEntry("test", "text")), + new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(3), new LongDataEntry("test", 15L)), + new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(4), new DoubleDataEntry("test", 10.5)), + new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(5), new JsonDataEntry("test", "{\"test\":\"testValue\"}"))); + + DeviceId deviceId = new DeviceId(Uuids.timeBased()); + for (TsKvEntry tsKvEntry : timeseries) { + save(tenantId, deviceId, tsKvEntry); + } + + List listUntil3Minutes = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("test", 0L, + TimeUnit.MINUTES.toMillis(3), 1000, 10, Aggregation.NONE))).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertEquals(2, listUntil3Minutes.size()); + + List fullList = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("test", 0L, + TimeUnit.MINUTES.toMillis(6), 1000, 10, Aggregation.NONE))).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertEquals(5, fullList.size()); + } + private TsKvEntry save(DeviceId deviceId, long ts, long value) throws Exception { TsKvEntry entry = new BasicTsKvEntry(ts, new LongDataEntry(LONG_KEY, value)); tsService.save(tenantId, deviceId, entry).get(MAX_TIMEOUT, TimeUnit.SECONDS); @@ -691,6 +717,9 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { return entry; } + private void save(TenantId tenantId, DeviceId deviceId, TsKvEntry tsKvEntry) throws Exception { + tsService.save(tenantId, deviceId, tsKvEntry).get(MAX_TIMEOUT, TimeUnit.SECONDS); + } private void saveEntries(DeviceId deviceId, long ts) throws ExecutionException, InterruptedException, TimeoutException { tsService.save(tenantId, deviceId, toTsEntry(ts, stringKvEntry)).get(MAX_TIMEOUT, TimeUnit.SECONDS); diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlSetNullEnabledTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlSetNullEnabledTest.java new file mode 100644 index 0000000000..1bbb37659c --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlSetNullEnabledTest.java @@ -0,0 +1,72 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.service.timeseries.nosql; + +import com.datastax.oss.driver.api.core.uuid.Uuids; +import org.junit.Test; +import org.springframework.test.context.TestPropertySource; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.kv.Aggregation; +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.DoubleDataEntry; +import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.dao.service.DaoNoSqlTest; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@DaoNoSqlTest +@TestPropertySource(properties = { + "cassandra.query.set_null_values_enabled=true", +}) +public class TimeseriesServiceNoSqlSetNullEnabledTest extends TimeseriesServiceNoSqlTest { + + @Override + @Test + public void testNullValuesOfNoneTargetColumn() throws ExecutionException, InterruptedException, TimeoutException { + long ts = TimeUnit.MINUTES.toMillis(1); + TsKvEntry longEntry = new BasicTsKvEntry(ts, new LongDataEntry("temp", 0L)); + double doubleValue = 20.6; + TsKvEntry doubleEntry = new BasicTsKvEntry(ts, new DoubleDataEntry("temp", doubleValue)); + DeviceId deviceId = new DeviceId(Uuids.timeBased()); + tsService.save(tenantId, deviceId, longEntry).get(MAX_TIMEOUT, TimeUnit.SECONDS); + tsService.save(tenantId, deviceId, doubleEntry).get(MAX_TIMEOUT, TimeUnit.SECONDS); + + List listWithoutAgg = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("temp", 0L, + ts + 1 , 1000, 3, Aggregation.NONE))).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertEquals(1, listWithoutAgg.size()); + assertFalse(listWithoutAgg.get(0).getLongValue().isPresent()); + assertTrue(listWithoutAgg.get(0).getDoubleValue().isPresent()); + assertThat(listWithoutAgg.get(0).getDoubleValue().get()).isEqualTo(doubleValue); + + // long value should be set to null after second insert, so avg = doubleValue + List listWithAgg = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("temp", 0L, + ts + 1 , 1000, 3, Aggregation.AVG))).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertEquals(1, listWithAgg.size()); + assertTrue(listWithAgg.get(0).getDoubleValue().isPresent()); + assertThat(listWithAgg.get(0).getDoubleValue().get()).isEqualTo(doubleValue); + } +} diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlTest.java index 229d5f5842..82916aad1c 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlTest.java @@ -15,9 +15,105 @@ */ package org.thingsboard.server.dao.service.timeseries.nosql; +import com.datastax.oss.driver.api.core.uuid.Uuids; +import org.junit.Test; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.kv.Aggregation; +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.BooleanDataEntry; +import org.thingsboard.server.common.data.kv.DoubleDataEntry; +import org.thingsboard.server.common.data.kv.JsonDataEntry; +import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.common.data.kv.StringDataEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.dao.service.DaoNoSqlTest; import org.thingsboard.server.dao.service.timeseries.BaseTimeseriesServiceTest; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + @DaoNoSqlTest public class TimeseriesServiceNoSqlTest extends BaseTimeseriesServiceTest { + + @Test + public void shouldSaveEntryOfEachTypeWithTtl() throws ExecutionException, InterruptedException, TimeoutException { + long ttlInSec = TimeUnit.SECONDS.toSeconds(3); + List timeseries = List.of( + new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(1), new BooleanDataEntry("test", true)), + new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(2), new StringDataEntry("test", "text")), + new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(3), new LongDataEntry("test", 15L)), + new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(4), new DoubleDataEntry("test", 10.5)), + new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(5), new JsonDataEntry("test", "{\"test\":\"testValue\"}"))); + + DeviceId deviceId = new DeviceId(Uuids.timeBased()); + tsService.save(tenantId, deviceId, timeseries, ttlInSec); + + List fullList = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("test", 0L, + TimeUnit.MINUTES.toMillis(6), 1000, 10, Aggregation.NONE))).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertEquals(5, fullList.size()); + + // check entries after ttl + Thread.sleep(TimeUnit.SECONDS.toMillis(ttlInSec)); + List listAfterTtl = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("test", 0L, + TimeUnit.MINUTES.toMillis(6), 1000, 10, Aggregation.NONE))).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertEquals(0, listAfterTtl.size()); + } + + @Test + public void shouldDeleteValueAfterTtl() throws ExecutionException, InterruptedException, TimeoutException { + long longEntryTs = TimeUnit.MINUTES.toMillis(2); + long doubleEntryTs = TimeUnit.MINUTES.toMillis(3); + long ttlInSec = TimeUnit.SECONDS.toSeconds(3); + long longValue = 0L; + TsKvEntry longEntry = new BasicTsKvEntry(longEntryTs, new LongDataEntry("temp", longValue)); + double doubleValue = 20.6; + TsKvEntry doubleEntry = new BasicTsKvEntry(doubleEntryTs, new DoubleDataEntry("temp", doubleValue)); + DeviceId deviceId = new DeviceId(Uuids.timeBased()); + tsService.save(tenantId, deviceId, longEntry).get(MAX_TIMEOUT, TimeUnit.SECONDS); + tsService.save(tenantId, deviceId, Collections.singletonList(doubleEntry), ttlInSec).get(MAX_TIMEOUT, TimeUnit.SECONDS); + + Thread.sleep(TimeUnit.SECONDS.toMillis(ttlInSec)); + List listAfterTtl = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("temp", 0L, + doubleEntryTs + 1 , 1000, 3, Aggregation.NONE))).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertEquals(1, listAfterTtl.size()); + assertTrue(listAfterTtl.get(0).getLongValue().isPresent()); + assertThat(listAfterTtl.get(0).getLongValue().get()).isEqualTo(longValue); + assertFalse(listAfterTtl.get(0).getDoubleValue().isPresent()); + } + + @Test + public void testNullValuesOfNoneTargetColumn() throws ExecutionException, InterruptedException, TimeoutException { + long ts = TimeUnit.MINUTES.toMillis(1); + long longValue = 10L; + TsKvEntry longEntry = new BasicTsKvEntry(ts, new LongDataEntry("temp", longValue)); + double doubleValue = 20.6; + TsKvEntry doubleEntry = new BasicTsKvEntry(ts, new DoubleDataEntry("temp", doubleValue)); + DeviceId deviceId = new DeviceId(Uuids.timeBased()); + tsService.save(tenantId, deviceId, longEntry).get(MAX_TIMEOUT, TimeUnit.SECONDS); + tsService.save(tenantId, deviceId, doubleEntry).get(MAX_TIMEOUT, TimeUnit.SECONDS); + + List listWithoutAgg = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("temp", 0L, + ts + 1 , 1000, 3, Aggregation.NONE))).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertEquals(1, listWithoutAgg.size()); + assertTrue(listWithoutAgg.get(0).getLongValue().isPresent()); + assertFalse(listWithoutAgg.get(0).getDoubleValue().isPresent()); + assertThat(listWithoutAgg.get(0).getLongValue().get()).isEqualTo(longValue); + + // long value should not be reset to null, so avg = (doubleValue + longValue)/ 2 + List listWithAgg = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("temp", 0L, + ts + 1, 200000, 3, Aggregation.AVG))).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertEquals(1, listWithAgg.size()); + assertTrue(listWithAgg.get(0).getDoubleValue().isPresent()); + double expectedValue = (doubleValue + longValue)/ 2; + assertThat(listWithAgg.get(0).getDoubleValue().get()).isEqualTo(expectedValue); + } } From 9fa357735a7a327082e32dd883e79f63220453c5 Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Fri, 2 Feb 2024 15:55:18 +0200 Subject: [PATCH 2/4] timeseries test improvements --- .../timeseries/BaseTimeseriesServiceTest.java | 15 +++++++----- .../nosql/TimeseriesServiceNoSqlTest.java | 24 +------------------ 2 files changed, 10 insertions(+), 29 deletions(-) diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java index b06394e8d2..a624a2b6e5 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java @@ -678,12 +678,12 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void shouldSaveEntryOfEachType() throws Exception { - List timeseries = List.of( - new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(1), new BooleanDataEntry("test", true)), - new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(2), new StringDataEntry("test", "text")), - new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(3), new LongDataEntry("test", 15L)), - new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(4), new DoubleDataEntry("test", 10.5)), - new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(5), new JsonDataEntry("test", "{\"test\":\"testValue\"}"))); + BasicTsKvEntry booleanEntry = new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(1), new BooleanDataEntry("test", true)); + BasicTsKvEntry stringEntry = new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(2), new StringDataEntry("test", "text")); + BasicTsKvEntry longEntry = new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(3), new LongDataEntry("test", 15L)); + BasicTsKvEntry doubleEntry = new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(4), new DoubleDataEntry("test", 10.5)); + BasicTsKvEntry jsonEntry = new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(5), new JsonDataEntry("test", "{\"test\":\"testValue\"}")); + List timeseries = List.of(booleanEntry, stringEntry, longEntry, doubleEntry, jsonEntry); DeviceId deviceId = new DeviceId(Uuids.timeBased()); for (TsKvEntry tsKvEntry : timeseries) { @@ -693,10 +693,13 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { List listUntil3Minutes = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("test", 0L, TimeUnit.MINUTES.toMillis(3), 1000, 10, Aggregation.NONE))).get(MAX_TIMEOUT, TimeUnit.SECONDS); assertEquals(2, listUntil3Minutes.size()); + assertThat(listUntil3Minutes).containsOnlyOnceElementsOf(List.of( + booleanEntry, stringEntry)); List fullList = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("test", 0L, TimeUnit.MINUTES.toMillis(6), 1000, 10, Aggregation.NONE))).get(MAX_TIMEOUT, TimeUnit.SECONDS); assertEquals(5, fullList.size()); + assertThat(fullList).containsOnlyOnceElementsOf(timeseries); } private TsKvEntry save(DeviceId deviceId, long ts, long value) throws Exception { diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlTest.java index 82916aad1c..b8970feca3 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/nosql/TimeseriesServiceNoSqlTest.java @@ -62,34 +62,12 @@ public class TimeseriesServiceNoSqlTest extends BaseTimeseriesServiceTest { assertEquals(5, fullList.size()); // check entries after ttl - Thread.sleep(TimeUnit.SECONDS.toMillis(ttlInSec)); + Thread.sleep(TimeUnit.SECONDS.toMillis(ttlInSec + 1)); List listAfterTtl = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("test", 0L, TimeUnit.MINUTES.toMillis(6), 1000, 10, Aggregation.NONE))).get(MAX_TIMEOUT, TimeUnit.SECONDS); assertEquals(0, listAfterTtl.size()); } - @Test - public void shouldDeleteValueAfterTtl() throws ExecutionException, InterruptedException, TimeoutException { - long longEntryTs = TimeUnit.MINUTES.toMillis(2); - long doubleEntryTs = TimeUnit.MINUTES.toMillis(3); - long ttlInSec = TimeUnit.SECONDS.toSeconds(3); - long longValue = 0L; - TsKvEntry longEntry = new BasicTsKvEntry(longEntryTs, new LongDataEntry("temp", longValue)); - double doubleValue = 20.6; - TsKvEntry doubleEntry = new BasicTsKvEntry(doubleEntryTs, new DoubleDataEntry("temp", doubleValue)); - DeviceId deviceId = new DeviceId(Uuids.timeBased()); - tsService.save(tenantId, deviceId, longEntry).get(MAX_TIMEOUT, TimeUnit.SECONDS); - tsService.save(tenantId, deviceId, Collections.singletonList(doubleEntry), ttlInSec).get(MAX_TIMEOUT, TimeUnit.SECONDS); - - Thread.sleep(TimeUnit.SECONDS.toMillis(ttlInSec)); - List listAfterTtl = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery("temp", 0L, - doubleEntryTs + 1 , 1000, 3, Aggregation.NONE))).get(MAX_TIMEOUT, TimeUnit.SECONDS); - assertEquals(1, listAfterTtl.size()); - assertTrue(listAfterTtl.get(0).getLongValue().isPresent()); - assertThat(listAfterTtl.get(0).getLongValue().get()).isEqualTo(longValue); - assertFalse(listAfterTtl.get(0).getDoubleValue().isPresent()); - } - @Test public void testNullValuesOfNoneTargetColumn() throws ExecutionException, InterruptedException, TimeoutException { long ts = TimeUnit.MINUTES.toMillis(1); From dec09fc479890cc3d051fa1f7f24854f0e4a7951 Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Tue, 6 Feb 2024 15:21:57 +0200 Subject: [PATCH 3/4] extracted duplicated string to variable --- .../CassandraBaseTimeseriesDao.java | 38 +++++++------------ 1 file changed, 14 insertions(+), 24 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 8d96dd293b..833a18ad39 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -86,6 +86,18 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD public static final String ASC_ORDER = "ASC"; public static final long SECONDS_IN_DAY = TimeUnit.DAYS.toSeconds(1); protected static final List FIXED_PARTITION = List.of(0L); + private static String INSERT_WITH_NULL = INSERT_INTO + ModelConstants.TS_KV_CF + + "(" + ModelConstants.ENTITY_TYPE_COLUMN + + "," + ModelConstants.ENTITY_ID_COLUMN + + "," + ModelConstants.KEY_COLUMN + + "," + ModelConstants.PARTITION_COLUMN + + "," + ModelConstants.TS_COLUMN + + "," + ModelConstants.BOOLEAN_VALUE_COLUMN + + "," + ModelConstants.STRING_VALUE_COLUMN + + "," + ModelConstants.LONG_VALUE_COLUMN + + "," + ModelConstants.DOUBLE_VALUE_COLUMN + + "," + ModelConstants.JSON_VALUE_COLUMN + ")" + + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; private CassandraTsPartitionsCache cassandraTsPartitionsCache; @@ -566,18 +578,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD stmtCreationLock.lock(); try { if (saveWithNullStmt == null) { - saveWithNullStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_CF + - "(" + ModelConstants.ENTITY_TYPE_COLUMN + - "," + ModelConstants.ENTITY_ID_COLUMN + - "," + ModelConstants.KEY_COLUMN + - "," + ModelConstants.PARTITION_COLUMN + - "," + ModelConstants.TS_COLUMN + - "," + ModelConstants.BOOLEAN_VALUE_COLUMN + - "," + ModelConstants.STRING_VALUE_COLUMN + - "," + ModelConstants.LONG_VALUE_COLUMN + - "," + ModelConstants.DOUBLE_VALUE_COLUMN + - "," + ModelConstants.JSON_VALUE_COLUMN + ")" + - " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"); + saveWithNullStmt = prepare(INSERT_WITH_NULL); } } finally { stmtCreationLock.unlock(); @@ -591,18 +592,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD stmtCreationLock.lock(); try { if (saveWithNullWithTtlStmt == null) { - saveWithNullWithTtlStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_CF + - "(" + ModelConstants.ENTITY_TYPE_COLUMN + - "," + ModelConstants.ENTITY_ID_COLUMN + - "," + ModelConstants.KEY_COLUMN + - "," + ModelConstants.PARTITION_COLUMN + - "," + ModelConstants.TS_COLUMN + - "," + ModelConstants.BOOLEAN_VALUE_COLUMN + - "," + ModelConstants.STRING_VALUE_COLUMN + - "," + ModelConstants.LONG_VALUE_COLUMN + - "," + ModelConstants.DOUBLE_VALUE_COLUMN + - "," + ModelConstants.JSON_VALUE_COLUMN + ")" + - " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?) USING TTL ?"); + saveWithNullWithTtlStmt = prepare(INSERT_WITH_NULL + " USING TTL ?"); } } finally { stmtCreationLock.unlock(); From 23545a64838746a4b322bf098defc3346d46a407 Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Tue, 6 Feb 2024 15:28:55 +0200 Subject: [PATCH 4/4] CassandraBaseTimeseriesDao refactoring --- .../server/dao/timeseries/CassandraBaseTimeseriesDao.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 833a18ad39..6b2ab7e07b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -86,7 +86,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD public static final String ASC_ORDER = "ASC"; public static final long SECONDS_IN_DAY = TimeUnit.DAYS.toSeconds(1); protected static final List FIXED_PARTITION = List.of(0L); - private static String INSERT_WITH_NULL = INSERT_INTO + ModelConstants.TS_KV_CF + + protected static final String INSERT_WITH_NULL = INSERT_INTO + ModelConstants.TS_KV_CF + "(" + ModelConstants.ENTITY_TYPE_COLUMN + "," + ModelConstants.ENTITY_ID_COLUMN + "," + ModelConstants.KEY_COLUMN +