Browse Source

added sequence number for attributes

pull/10975/head
YevhenBondarenko 2 years ago
parent
commit
b666f9499a
  1. 4
      application/src/main/java/org/thingsboard/server/controller/DeviceController.java
  2. 2
      application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
  3. 2
      common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
  4. 8
      common/data/src/main/java/org/thingsboard/server/common/data/util/CollectionsUtil.java
  5. 128
      dao/src/main/java/org/thingsboard/server/dao/AbstractSequenceInsertRepository.java
  6. 47
      dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java
  7. 1
      dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueueParams.java
  8. 15
      dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueueWrapper.java
  9. 8
      dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlQueue.java
  10. 6
      dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlQueueElement.java
  11. 226
      dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvInsertRepository.java
  12. 5
      dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java
  13. 27
      dao/src/main/java/org/thingsboard/server/dao/sql/attributes/SqlAttributesInsertRepository.java
  14. 2
      dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java
  15. 2
      dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java
  16. 2
      dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java
  17. 36
      dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java
  18. 224
      dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java
  19. 8
      dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java
  20. 4
      dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
  21. 2
      dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java
  22. 2
      dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java
  23. 3
      dao/src/main/resources/sql/schema-entities.sql

4
application/src/main/java/org/thingsboard/server/controller/DeviceController.java

@ -20,6 +20,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.ArraySchema;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
@ -30,6 +31,7 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
@ -488,7 +490,7 @@ public class DeviceController extends BaseController {
@RequestMapping(value = "/devices", params = {"deviceIds"}, method = RequestMethod.GET)
@ResponseBody
public List<Device> getDevicesByIds(
@Parameter(description = "A list of devices ids, separated by comma ','")
@Parameter(description = "A list of devices ids, separated by comma ','", array = @ArraySchema(schema = @Schema(type = "string")))
@RequestParam("deviceIds") String[] strDeviceIds) throws ThingsboardException, ExecutionException, InterruptedException {
checkArrayParameter("deviceIds", strDeviceIds);
SecurityUser user = getCurrentUser();

2
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java

@ -280,7 +280,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
@Override
public void saveLatestAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback) {
ListenableFuture<List<Void>> saveFuture = tsService.saveLatest(tenantId, entityId, ts);
ListenableFuture<List<Long>> saveFuture = tsService.saveLatest(tenantId, entityId, ts);
addVoidCallback(saveFuture, callback);
addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, ts));
}

2
common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java

@ -50,7 +50,7 @@ public interface TimeseriesService {
ListenableFuture<Integer> saveWithoutLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
ListenableFuture<List<Void>> saveLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry);
ListenableFuture<List<Long>> saveLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry);
ListenableFuture<List<TsKvLatestRemovingResult>> remove(TenantId tenantId, EntityId entityId, List<DeleteTsKvQuery> queries);

8
common/data/src/main/java/org/thingsboard/server/common/data/util/CollectionsUtil.java

@ -17,6 +17,7 @@ package org.thingsboard.server.common.data.util;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@ -37,6 +38,13 @@ public class CollectionsUtil {
return b.stream().filter(p -> !a.contains(p)).collect(Collectors.toSet());
}
/**
* Returns new list with elements that are present in list B(new) but absent in list A(old).
*/
public static <T> List<T> diffLists(List<T> a, List<T> b) {
return b.stream().filter(p -> !a.contains(p)).collect(Collectors.toList());
}
public static <T> boolean contains(Collection<T> collection, T element) {
return isNotEmpty(collection) && collection.contains(element);
}

128
dao/src/main/java/org/thingsboard/server/dao/AbstractSequenceInsertRepository.java

@ -0,0 +1,128 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.PreparedStatementCreator;
import org.springframework.jdbc.core.SqlProvider;
import org.springframework.jdbc.support.GeneratedKeyHolder;
import org.springframework.jdbc.support.KeyHolder;
import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public abstract class AbstractSequenceInsertRepository<T> extends AbstractInsertRepository {
public static final String SEQ_NUMBER = "seq_number";
public List<Long> saveOrUpdate(List<T> entities) {
return transactionTemplate.execute(status -> {
List<Long> seqNumbers = new ArrayList<>(entities.size());
KeyHolder keyHolder = new GeneratedKeyHolder();
int[] updateResult = onBatchUpdate(entities, keyHolder);
List<Map<String, Object>> seqNumbersList = keyHolder.getKeyList();
int notUpdatedCount = entities.size() - seqNumbersList.size();
List<Integer> toInsertIndexes = new ArrayList<>(notUpdatedCount);
List<T> insertEntities = new ArrayList<>(notUpdatedCount);
int keyHolderIndex = 0;
for (int i = 0; i < updateResult.length; i++) {
if (updateResult[i] == 0) {
insertEntities.add(entities.get(i));
seqNumbers.add(0L);
toInsertIndexes.add(i);
} else {
seqNumbers.add((Long) seqNumbersList.get(keyHolderIndex).get(SEQ_NUMBER));
keyHolderIndex++;
}
}
if (insertEntities.isEmpty()) {
return seqNumbers;
}
onInsertOrUpdate(insertEntities, keyHolder);
seqNumbersList = keyHolder.getKeyList();
for (int i = 0; i < seqNumbersList.size(); i++) {
seqNumbers.set(toInsertIndexes.get(i), (Long) seqNumbersList.get(i).get(SEQ_NUMBER));
}
return seqNumbers;
});
}
private int[] onBatchUpdate(List<T> entities, KeyHolder keyHolder) {
return jdbcTemplate.batchUpdate(new SequencePreparedStatementCreator(getBatchUpdateQuery()), new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
setOnBatchUpdateValues(ps, i, entities);
}
@Override
public int getBatchSize() {
return entities.size();
}
}, keyHolder);
}
private void onInsertOrUpdate(List<T> insertEntities, KeyHolder keyHolder) {
jdbcTemplate.batchUpdate(new SequencePreparedStatementCreator(getInsertOrUpdateQuery()), new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
setOnInsertOrUpdateValues(ps, i, insertEntities);
}
@Override
public int getBatchSize() {
return insertEntities.size();
}
}, keyHolder);
}
protected abstract void setOnBatchUpdateValues(PreparedStatement ps, int i, List<T> entities) throws SQLException;
protected abstract void setOnInsertOrUpdateValues(PreparedStatement ps, int i, List<T> entities) throws SQLException;
protected abstract String getBatchUpdateQuery();
protected abstract String getInsertOrUpdateQuery();
private record SequencePreparedStatementCreator(String sql) implements PreparedStatementCreator, SqlProvider {
private static final String[] COLUMNS = {SEQ_NUMBER};
@Override
public PreparedStatement createPreparedStatement(Connection con) throws SQLException {
return con.prepareStatement(sql, COLUMNS);
}
@Override
public String getSql() {
return this.sql;
}
}
}

47
dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java

@ -19,6 +19,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.util.CollectionsUtil;
import org.thingsboard.server.common.stats.MessagesStats;
import java.util.ArrayList;
@ -29,14 +30,13 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Slf4j
public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
public class TbSqlBlockingQueue<E, R> implements TbSqlQueue<E, R> {
private final BlockingQueue<TbSqlQueueElement<E>> queue = new LinkedBlockingQueue<>();
private final BlockingQueue<TbSqlQueueElement<E, R>> queue = new LinkedBlockingQueue<>();
private final TbSqlBlockingQueueParams params;
private ExecutorService executor;
@ -48,17 +48,17 @@ public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
}
@Override
public void init(ScheduledLogExecutorComponent logExecutor, Consumer<List<E>> saveFunction, Comparator<E> batchUpdateComparator, int index) {
public void init(ScheduledLogExecutorComponent logExecutor, Function<List<E>, List<R>> saveFunction, Comparator<E> batchUpdateComparator, Function<List<TbSqlQueueElement<E, R>>, List<TbSqlQueueElement<E, R>>> filter, int index) {
executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("sql-queue-" + index + "-" + params.getLogName().toLowerCase()));
executor.submit(() -> {
String logName = params.getLogName();
int batchSize = params.getBatchSize();
long maxDelay = params.getMaxDelay();
final List<TbSqlQueueElement<E>> entities = new ArrayList<>(batchSize);
final List<TbSqlQueueElement<E, R>> entities = new ArrayList<>(batchSize);
while (!Thread.interrupted()) {
try {
long currentTs = System.currentTimeMillis();
TbSqlQueueElement<E> attr = queue.poll(maxDelay, TimeUnit.MILLISECONDS);
TbSqlQueueElement<E, R> attr = queue.poll(maxDelay, TimeUnit.MILLISECONDS);
if (attr == null) {
continue;
} else {
@ -70,12 +70,27 @@ public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
log.debug("[{}] Going to save {} entities", logName, entities.size());
log.trace("[{}] Going to save entities: {}", logName, entities);
}
Stream<E> entitiesStream = entities.stream().map(TbSqlQueueElement::getEntity);
saveFunction.accept(
(params.isBatchSortEnabled() ? entitiesStream.sorted(batchUpdateComparator) : entitiesStream)
.collect(Collectors.toList())
);
entities.forEach(v -> v.getFuture().set(null));
List<TbSqlQueueElement<E, R>> entitiesToSave = filter.apply(entities);
if (params.isBatchSortEnabled()) {
entitiesToSave = entitiesToSave.stream().sorted((o1, o2) -> batchUpdateComparator.compare(o1.getEntity(), o2.getEntity())).toList();
}
List<R> result = saveFunction.apply(entitiesToSave.stream().map(TbSqlQueueElement::getEntity).collect(Collectors.toList()));
if (params.isWithResponse()) {
for (int i = 0; i < entitiesToSave.size(); i++) {
entitiesToSave.get(i).getFuture().set(result.get(i));
}
if (entities.size() > entitiesToSave.size()) {
CollectionsUtil.diffLists(entitiesToSave, entities).forEach(v -> v.getFuture().set(null));
}
} else {
entities.forEach(v -> v.getFuture().set(null));
}
stats.incrementSuccessful(entities.size());
if (!fullPack) {
long remainingDelay = maxDelay - (System.currentTimeMillis() - currentTs);
@ -104,7 +119,7 @@ public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
});
logExecutor.scheduleAtFixedRate(() -> {
if (queue.size() > 0 || stats.getTotal() > 0 || stats.getSuccessful() > 0 || stats.getFailed() > 0) {
if (!queue.isEmpty() || stats.getTotal() > 0 || stats.getSuccessful() > 0 || stats.getFailed() > 0) {
log.info("Queue-{} [{}] queueSize [{}] totalAdded [{}] totalSaved [{}] totalFailed [{}]", index,
params.getLogName(), queue.size(), stats.getTotal(), stats.getSuccessful(), stats.getFailed());
stats.reset();
@ -120,8 +135,8 @@ public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
}
@Override
public ListenableFuture<Void> add(E element) {
SettableFuture<Void> future = SettableFuture.create();
public ListenableFuture<R> add(E element) {
SettableFuture<R> future = SettableFuture.create();
queue.add(new TbSqlQueueElement<>(future, element));
stats.incrementTotal();
return future;

1
dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueueParams.java

@ -30,4 +30,5 @@ public class TbSqlBlockingQueueParams {
private final long statsPrintIntervalMs;
private final String statsNamePrefix;
private final boolean batchSortEnabled;
private final boolean withResponse;
}

15
dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueueWrapper.java

@ -29,10 +29,9 @@ import java.util.function.Function;
@Slf4j
@Data
public class TbSqlBlockingQueueWrapper<E> {
private final CopyOnWriteArrayList<TbSqlBlockingQueue<E>> queues = new CopyOnWriteArrayList<>();
public class TbSqlBlockingQueueWrapper<E, R> {
private final CopyOnWriteArrayList<TbSqlBlockingQueue<E, R>> queues = new CopyOnWriteArrayList<>();
private final TbSqlBlockingQueueParams params;
private ScheduledLogExecutorComponent logExecutor;
private final Function<E, Integer> hashCodeFunction;
private final int maxThreads;
private final StatsFactory statsFactory;
@ -46,15 +45,19 @@ public class TbSqlBlockingQueueWrapper<E> {
* NOTE: you must use all of primary key parts in your comparator
*/
public void init(ScheduledLogExecutorComponent logExecutor, Consumer<List<E>> saveFunction, Comparator<E> batchUpdateComparator) {
init(logExecutor, l -> { saveFunction.accept(l); return null; }, batchUpdateComparator, l -> l);
}
public void init(ScheduledLogExecutorComponent logExecutor, Function<List<E>, List<R>> saveFunction, Comparator<E> batchUpdateComparator, Function<List<TbSqlQueueElement<E, R>>, List<TbSqlQueueElement<E, R>>> filter) {
for (int i = 0; i < maxThreads; i++) {
MessagesStats stats = statsFactory.createMessagesStats(params.getStatsNamePrefix() + ".queue." + i);
TbSqlBlockingQueue<E> queue = new TbSqlBlockingQueue<>(params, stats);
TbSqlBlockingQueue<E, R> queue = new TbSqlBlockingQueue<>(params, stats);
queues.add(queue);
queue.init(logExecutor, saveFunction, batchUpdateComparator, i);
queue.init(logExecutor, saveFunction, batchUpdateComparator, filter, i);
}
}
public ListenableFuture<Void> add(E element) {
public ListenableFuture<R> add(E element) {
int queueIndex = element != null ? (hashCodeFunction.apply(element) & 0x7FFFFFFF) % maxThreads : 0;
return queues.get(queueIndex).add(element);
}

8
dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlQueue.java

@ -19,13 +19,13 @@ import com.google.common.util.concurrent.ListenableFuture;
import java.util.Comparator;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
public interface TbSqlQueue<E> {
public interface TbSqlQueue<E, R> {
void init(ScheduledLogExecutorComponent logExecutor, Consumer<List<E>> saveFunction, Comparator<E> batchUpdateComparator, int queueIndex);
void init(ScheduledLogExecutorComponent logExecutor, Function<List<E>, List<R>> saveFunction, Comparator<E> batchUpdateComparator, Function<List<TbSqlQueueElement<E, R>>, List<TbSqlQueueElement<E, R>>> filter, int queueIndex);
void destroy();
ListenableFuture<Void> add(E element);
ListenableFuture<R> add(E element);
}

6
dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlQueueElement.java

@ -20,13 +20,13 @@ import lombok.Getter;
import lombok.ToString;
@ToString(exclude = "future")
public final class TbSqlQueueElement<E> {
public final class TbSqlQueueElement<E, R> {
@Getter
private final SettableFuture<Void> future;
private final SettableFuture<R> future;
@Getter
private final E entity;
public TbSqlQueueElement(SettableFuture<Void> future, E entity) {
public TbSqlQueueElement(SettableFuture<R> future, E entity) {
this.future = future;
this.entity = entity;
}

226
dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvInsertRepository.java

@ -15,162 +15,110 @@
*/
package org.thingsboard.server.dao.sql.attributes;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.dao.AbstractSequenceInsertRepository;
import org.thingsboard.server.dao.model.sql.AttributeKvEntity;
import org.thingsboard.server.dao.util.SqlDao;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
@Repository
@Slf4j
@Transactional
@SqlDao
public abstract class AttributeKvInsertRepository {
public class AttributeKvInsertRepository extends AbstractSequenceInsertRepository<AttributeKvEntity> {
private static final ThreadLocal<Pattern> PATTERN_THREAD_LOCAL = ThreadLocal.withInitial(() -> Pattern.compile(String.valueOf(Character.MIN_VALUE)));
private static final String EMPTY_STR = "";
private static final String BATCH_UPDATE = "UPDATE attribute_kv SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), last_update_ts = ? " +
"WHERE entity_id = ? and attribute_type =? and attribute_key = ?;";
private static final String BATCH_UPDATE = "UPDATE attribute_kv SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), last_update_ts = ?, seq_number = nextval('attribute_kv_latest_seq') " +
"WHERE entity_id = ? and attribute_type =? and attribute_key = ? RETURNING seq_number;";
private static final String INSERT_OR_UPDATE =
"INSERT INTO attribute_kv (entity_id, attribute_type, attribute_key, str_v, long_v, dbl_v, bool_v, json_v, last_update_ts) " +
"VALUES(?, ?, ?, ?, ?, ?, ?, cast(? AS json), ?) " +
"INSERT INTO attribute_kv (entity_id, attribute_type, attribute_key, str_v, long_v, dbl_v, bool_v, json_v, last_update_ts, seq_number) " +
"VALUES(?, ?, ?, ?, ?, ?, ?, cast(? AS json), ?, nextval('attribute_kv_latest_seq')) " +
"ON CONFLICT (entity_id, attribute_type, attribute_key) " +
"DO UPDATE SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), last_update_ts = ?;";
@Autowired
protected JdbcTemplate jdbcTemplate;
@Autowired
private TransactionTemplate transactionTemplate;
@Value("${sql.remove_null_chars:true}")
private boolean removeNullChars;
public void saveOrUpdate(List<AttributeKvEntity> entities) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
int[] result = jdbcTemplate.batchUpdate(BATCH_UPDATE, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
AttributeKvEntity kvEntity = entities.get(i);
ps.setString(1, replaceNullChars(kvEntity.getStrValue()));
if (kvEntity.getLongValue() != null) {
ps.setLong(2, kvEntity.getLongValue());
} else {
ps.setNull(2, Types.BIGINT);
}
if (kvEntity.getDoubleValue() != null) {
ps.setDouble(3, kvEntity.getDoubleValue());
} else {
ps.setNull(3, Types.DOUBLE);
}
if (kvEntity.getBooleanValue() != null) {
ps.setBoolean(4, kvEntity.getBooleanValue());
} else {
ps.setNull(4, Types.BOOLEAN);
}
ps.setString(5, replaceNullChars(kvEntity.getJsonValue()));
ps.setLong(6, kvEntity.getLastUpdateTs());
ps.setObject(7, kvEntity.getId().getEntityId());
ps.setInt(8, kvEntity.getId().getAttributeType());
ps.setInt(9, kvEntity.getId().getAttributeKey());
}
@Override
public int getBatchSize() {
return entities.size();
}
});
int updatedCount = 0;
for (int i = 0; i < result.length; i++) {
if (result[i] == 0) {
updatedCount++;
}
}
List<AttributeKvEntity> insertEntities = new ArrayList<>(updatedCount);
for (int i = 0; i < result.length; i++) {
if (result[i] == 0) {
insertEntities.add(entities.get(i));
}
}
jdbcTemplate.batchUpdate(INSERT_OR_UPDATE, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
AttributeKvEntity kvEntity = insertEntities.get(i);
ps.setObject(1, kvEntity.getId().getEntityId());
ps.setInt(2, kvEntity.getId().getAttributeType());
ps.setInt(3, kvEntity.getId().getAttributeKey());
ps.setString(4, replaceNullChars(kvEntity.getStrValue()));
ps.setString(10, replaceNullChars(kvEntity.getStrValue()));
if (kvEntity.getLongValue() != null) {
ps.setLong(5, kvEntity.getLongValue());
ps.setLong(11, kvEntity.getLongValue());
} else {
ps.setNull(5, Types.BIGINT);
ps.setNull(11, Types.BIGINT);
}
if (kvEntity.getDoubleValue() != null) {
ps.setDouble(6, kvEntity.getDoubleValue());
ps.setDouble(12, kvEntity.getDoubleValue());
} else {
ps.setNull(6, Types.DOUBLE);
ps.setNull(12, Types.DOUBLE);
}
if (kvEntity.getBooleanValue() != null) {
ps.setBoolean(7, kvEntity.getBooleanValue());
ps.setBoolean(13, kvEntity.getBooleanValue());
} else {
ps.setNull(7, Types.BOOLEAN);
ps.setNull(13, Types.BOOLEAN);
}
ps.setString(8, replaceNullChars(kvEntity.getJsonValue()));
ps.setString(14, replaceNullChars(kvEntity.getJsonValue()));
ps.setLong(9, kvEntity.getLastUpdateTs());
ps.setLong(15, kvEntity.getLastUpdateTs());
}
@Override
public int getBatchSize() {
return insertEntities.size();
}
});
}
});
"DO UPDATE SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), last_update_ts = ?, seq_number = nextval('attribute_kv_latest_seq') RETURNING seq_number;";
@Override
protected void setOnBatchUpdateValues(PreparedStatement ps, int i, List<AttributeKvEntity> entities) throws SQLException {
AttributeKvEntity kvEntity = entities.get(i);
ps.setString(1, replaceNullChars(kvEntity.getStrValue()));
if (kvEntity.getLongValue() != null) {
ps.setLong(2, kvEntity.getLongValue());
} else {
ps.setNull(2, Types.BIGINT);
}
if (kvEntity.getDoubleValue() != null) {
ps.setDouble(3, kvEntity.getDoubleValue());
} else {
ps.setNull(3, Types.DOUBLE);
}
if (kvEntity.getBooleanValue() != null) {
ps.setBoolean(4, kvEntity.getBooleanValue());
} else {
ps.setNull(4, Types.BOOLEAN);
}
ps.setString(5, replaceNullChars(kvEntity.getJsonValue()));
ps.setLong(6, kvEntity.getLastUpdateTs());
ps.setObject(7, kvEntity.getId().getEntityId());
ps.setInt(8, kvEntity.getId().getAttributeType());
ps.setInt(9, kvEntity.getId().getAttributeKey());
}
private String replaceNullChars(String strValue) {
if (removeNullChars && strValue != null) {
return PATTERN_THREAD_LOCAL.get().matcher(strValue).replaceAll(EMPTY_STR);
@Override
protected void setOnInsertOrUpdateValues(PreparedStatement ps, int i, List<AttributeKvEntity> insertEntities) throws SQLException {
AttributeKvEntity kvEntity = insertEntities.get(i);
ps.setObject(1, kvEntity.getId().getEntityId());
ps.setInt(2, kvEntity.getId().getAttributeType());
ps.setInt(3, kvEntity.getId().getAttributeKey());
ps.setString(4, replaceNullChars(kvEntity.getStrValue()));
ps.setString(10, replaceNullChars(kvEntity.getStrValue()));
if (kvEntity.getLongValue() != null) {
ps.setLong(5, kvEntity.getLongValue());
ps.setLong(11, kvEntity.getLongValue());
} else {
ps.setNull(5, Types.BIGINT);
ps.setNull(11, Types.BIGINT);
}
return strValue;
if (kvEntity.getDoubleValue() != null) {
ps.setDouble(6, kvEntity.getDoubleValue());
ps.setDouble(12, kvEntity.getDoubleValue());
} else {
ps.setNull(6, Types.DOUBLE);
ps.setNull(12, Types.DOUBLE);
}
if (kvEntity.getBooleanValue() != null) {
ps.setBoolean(7, kvEntity.getBooleanValue());
ps.setBoolean(13, kvEntity.getBooleanValue());
} else {
ps.setNull(7, Types.BOOLEAN);
ps.setNull(13, Types.BOOLEAN);
}
ps.setString(8, replaceNullChars(kvEntity.getJsonValue()));
ps.setString(14, replaceNullChars(kvEntity.getJsonValue()));
ps.setLong(9, kvEntity.getLastUpdateTs());
ps.setLong(15, kvEntity.getLastUpdateTs());
}
@Override
protected String getBatchUpdateQuery() {
return BATCH_UPDATE;
}
@Override
protected String getInsertOrUpdateQuery() {
return INSERT_OR_UPDATE;
}
}

5
dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java

@ -88,7 +88,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
@Value("${sql.batch_sort:true}")
private boolean batchSortEnabled;
private TbSqlBlockingQueueWrapper<AttributeKvEntity> queue;
private TbSqlBlockingQueueWrapper<AttributeKvEntity, Long> queue;
@PostConstruct
private void init() {
@ -99,6 +99,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
.statsPrintIntervalMs(statsPrintIntervalMs)
.statsNamePrefix("attributes")
.batchSortEnabled(batchSortEnabled)
.withResponse(true)
.build();
Function<AttributeKvEntity, Integer> hashcodeFunction = entity -> entity.getId().getEntityId().hashCode();
@ -106,7 +107,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
queue.init(logExecutor, v -> attributeKvInsertRepository.saveOrUpdate(v),
Comparator.comparing((AttributeKvEntity attributeKvEntity) -> attributeKvEntity.getId().getEntityId())
.thenComparing(attributeKvEntity -> attributeKvEntity.getId().getAttributeType())
.thenComparing(attributeKvEntity -> attributeKvEntity.getId().getAttributeKey())
.thenComparing(attributeKvEntity -> attributeKvEntity.getId().getAttributeKey()), l -> l
);
}

27
dao/src/main/java/org/thingsboard/server/dao/sql/attributes/SqlAttributesInsertRepository.java

@ -1,27 +0,0 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sql.attributes;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.dao.util.SqlDao;
@Repository
@Transactional
@SqlDao
public class SqlAttributesInsertRepository extends AttributeKvInsertRepository {
}

2
dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java

@ -90,7 +90,7 @@ public class JpaBaseEdgeEventDao extends JpaPartitionedAbstractDao<EdgeEventEnti
private static final String TABLE_NAME = ModelConstants.EDGE_EVENT_TABLE_NAME;
private TbSqlBlockingQueueWrapper<EdgeEventEntity> queue;
private TbSqlBlockingQueueWrapper<EdgeEventEntity, Void> queue;
@Override
protected Class<EdgeEventEntity> getEntityClass() {

2
dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java

@ -110,7 +110,7 @@ public class JpaBaseEventDao implements EventDao {
@Value("${sql.batch_sort:true}")
private boolean batchSortEnabled;
private TbSqlBlockingQueueWrapper<Event> queue;
private TbSqlBlockingQueueWrapper<Event, Void> queue;
private final Map<EventType, EventRepository<?, ?>> repositories = new ConcurrentHashMap<>();

2
dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java

@ -60,7 +60,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
@Autowired
protected InsertTsRepository<TsKvEntity> insertRepository;
protected TbSqlBlockingQueueWrapper<TsKvEntity> tsQueue;
protected TbSqlBlockingQueueWrapper<TsKvEntity, Void> tsQueue;
@Autowired
private StatsFactory statsFactory;

36
dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java

@ -46,6 +46,7 @@ import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity;
import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
import org.thingsboard.server.dao.sql.TbSqlQueueElement;
import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository;
import org.thingsboard.server.dao.sqlts.latest.SearchTsKvLatestRepository;
import org.thingsboard.server.dao.sqlts.latest.TsKvLatestRepository;
@ -81,7 +82,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
@Autowired
private InsertLatestTsRepository insertLatestTsRepository;
private TbSqlBlockingQueueWrapper<TsKvLatestEntity> tsLatestQueue;
private TbSqlBlockingQueueWrapper<TsKvLatestEntity, Long> tsLatestQueue;
@Value("${sql.ts_latest.batch_size:1000}")
private int tsLatestBatchSize;
@ -115,25 +116,26 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
.maxDelay(tsLatestMaxDelay)
.statsPrintIntervalMs(tsLatestStatsPrintIntervalMs)
.statsNamePrefix("ts.latest")
.batchSortEnabled(false)
.batchSortEnabled(batchSortEnabled)
.withResponse(true)
.build();
java.util.function.Function<TsKvLatestEntity, Integer> hashcodeFunction = entity -> entity.getEntityId().hashCode();
tsLatestQueue = new TbSqlBlockingQueueWrapper<>(tsLatestParams, hashcodeFunction, tsLatestBatchThreads, statsFactory);
tsLatestQueue.init(logExecutor, v -> {
Map<TsKey, TsKvLatestEntity> trueLatest = new HashMap<>();
v.forEach(ts -> {
TsKey key = new TsKey(ts.getEntityId(), ts.getKey());
trueLatest.merge(key, ts, (oldTs, newTs) -> oldTs.getTs() <= newTs.getTs() ? newTs : oldTs);
});
List<TsKvLatestEntity> latestEntities = new ArrayList<>(trueLatest.values());
if (batchSortEnabled) {
latestEntities.sort(Comparator.comparing((Function<TsKvLatestEntity, UUID>) AbstractTsKvEntity::getEntityId)
.thenComparingInt(AbstractTsKvEntity::getKey));
}
insertLatestTsRepository.saveOrUpdate(latestEntities);
}, (l, r) -> 0);
tsLatestQueue.init(logExecutor,
v -> insertLatestTsRepository.saveOrUpdate(v),
Comparator.comparing((Function<TsKvLatestEntity, UUID>) AbstractTsKvEntity::getEntityId)
.thenComparingInt(AbstractTsKvEntity::getKey),
v -> {
Map<TsKey, TbSqlQueueElement<TsKvLatestEntity, Long>> trueLatest = new HashMap<>();
v.forEach(element -> {
var entity = element.getEntity();
TsKey key = new TsKey(entity.getEntityId(), entity.getKey());
trueLatest.merge(key, element, (oldElement, newElement) -> oldElement.getEntity().getTs() <= newElement.getEntity().getTs() ? newElement : oldElement);
});
return new ArrayList<>(trueLatest.values());
});
}
@PreDestroy
@ -144,7 +146,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
}
@Override
public ListenableFuture<Void> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {
public ListenableFuture<Long> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {
return getSaveLatestFuture(entityId, tsKvEntry);
}
@ -247,7 +249,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
searchTsKvLatestRepository.findAllByEntityId(entityId.getId()))));
}
protected ListenableFuture<Void> getSaveLatestFuture(EntityId entityId, TsKvEntry tsKvEntry) {
protected ListenableFuture<Long> getSaveLatestFuture(EntityId entityId, TsKvEntry tsKvEntry) {
TsKvLatestEntity latestEntity = new TsKvLatestEntity();
latestEntity.setEntityId(entityId.getId());
latestEntity.setTs(tsKvEntry.getTs());

224
dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java

@ -17,33 +17,25 @@ package org.thingsboard.server.dao.sqlts.insert.latest.sql;
import jakarta.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.PreparedStatementCreator;
import org.springframework.jdbc.core.SqlProvider;
import org.springframework.jdbc.support.GeneratedKeyHolder;
import org.springframework.jdbc.support.KeyHolder;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.dao.AbstractSequenceInsertRepository;
import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity;
import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository;
import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository;
import org.thingsboard.server.dao.util.SqlDao;
import org.thingsboard.server.dao.util.SqlTsLatestAnyDao;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@SqlTsLatestAnyDao
@Repository
@Transactional
@SqlDao
public class SqlLatestInsertTsRepository extends AbstractInsertRepository implements InsertLatestTsRepository {
public class SqlLatestInsertTsRepository extends AbstractSequenceInsertRepository<TsKvLatestEntity> implements InsertLatestTsRepository {
@Value("${sql.ts_latest.update_by_latest_ts:true}")
private Boolean updateByLatestTs;
@ -61,8 +53,6 @@ public class SqlLatestInsertTsRepository extends AbstractInsertRepository implem
private static final String RETURNING = " RETURNING seq_number";
private static final String SEQ_NUMBER = "seq_number";
private String batchUpdateQuery;
private String insertOrUpdateQuery;
@ -73,161 +63,89 @@ public class SqlLatestInsertTsRepository extends AbstractInsertRepository implem
}
@Override
public List<Long> saveOrUpdate(List<TsKvLatestEntity> entities) {
return transactionTemplate.execute(status -> {
List<Long> seqNumbers = new ArrayList<>(entities.size());
protected void setOnBatchUpdateValues(PreparedStatement ps, int i, List<TsKvLatestEntity> entities) throws SQLException {
TsKvLatestEntity tsKvLatestEntity = entities.get(i);
ps.setLong(1, tsKvLatestEntity.getTs());
if (tsKvLatestEntity.getBooleanValue() != null) {
ps.setBoolean(2, tsKvLatestEntity.getBooleanValue());
} else {
ps.setNull(2, Types.BOOLEAN);
}
KeyHolder keyHolder = new GeneratedKeyHolder();
ps.setString(3, replaceNullChars(tsKvLatestEntity.getStrValue()));
int[] updateResult = onBatchUpdate(entities, keyHolder);
if (tsKvLatestEntity.getLongValue() != null) {
ps.setLong(4, tsKvLatestEntity.getLongValue());
} else {
ps.setNull(4, Types.BIGINT);
}
List<Map<String, Object>> seqNumbersList = keyHolder.getKeyList();
if (tsKvLatestEntity.getDoubleValue() != null) {
ps.setDouble(5, tsKvLatestEntity.getDoubleValue());
} else {
ps.setNull(5, Types.DOUBLE);
}
int notUpdatedCount = entities.size() - seqNumbersList.size();
ps.setString(6, replaceNullChars(tsKvLatestEntity.getJsonValue()));
List<Integer> toInsertIndexes = new ArrayList<>(notUpdatedCount);
List<TsKvLatestEntity> insertEntities = new ArrayList<>(notUpdatedCount);
int keyHolderIndex = 0;
for (int i = 0; i < updateResult.length; i++) {
if (updateResult[i] == 0) {
insertEntities.add(entities.get(i));
seqNumbers.add(0L);
toInsertIndexes.add(i);
} else {
seqNumbers.add((Long) seqNumbersList.get(keyHolderIndex).get(SEQ_NUMBER));
keyHolderIndex++;
}
}
ps.setObject(7, tsKvLatestEntity.getEntityId());
ps.setInt(8, tsKvLatestEntity.getKey());
if (updateByLatestTs) {
ps.setLong(9, tsKvLatestEntity.getTs());
}
}
if (insertEntities.isEmpty()) {
return seqNumbers;
}
@Override
protected void setOnInsertOrUpdateValues(PreparedStatement ps, int i, List<TsKvLatestEntity> insertEntities) throws SQLException {
TsKvLatestEntity tsKvLatestEntity = insertEntities.get(i);
ps.setObject(1, tsKvLatestEntity.getEntityId());
ps.setInt(2, tsKvLatestEntity.getKey());
ps.setLong(3, tsKvLatestEntity.getTs());
ps.setLong(9, tsKvLatestEntity.getTs());
if (updateByLatestTs) {
ps.setLong(15, tsKvLatestEntity.getTs());
}
onInsertOrUpdate(insertEntities, keyHolder);
if (tsKvLatestEntity.getBooleanValue() != null) {
ps.setBoolean(4, tsKvLatestEntity.getBooleanValue());
ps.setBoolean(10, tsKvLatestEntity.getBooleanValue());
} else {
ps.setNull(4, Types.BOOLEAN);
ps.setNull(10, Types.BOOLEAN);
}
seqNumbersList = keyHolder.getKeyList();
ps.setString(5, replaceNullChars(tsKvLatestEntity.getStrValue()));
ps.setString(11, replaceNullChars(tsKvLatestEntity.getStrValue()));
for (int i = 0; i < seqNumbersList.size(); i++) {
seqNumbers.set(toInsertIndexes.get(i), (Long) seqNumbersList.get(i).get(SEQ_NUMBER));
}
if (tsKvLatestEntity.getLongValue() != null) {
ps.setLong(6, tsKvLatestEntity.getLongValue());
ps.setLong(12, tsKvLatestEntity.getLongValue());
} else {
ps.setNull(6, Types.BIGINT);
ps.setNull(12, Types.BIGINT);
}
return seqNumbers;
});
}
if (tsKvLatestEntity.getDoubleValue() != null) {
ps.setDouble(7, tsKvLatestEntity.getDoubleValue());
ps.setDouble(13, tsKvLatestEntity.getDoubleValue());
} else {
ps.setNull(7, Types.DOUBLE);
ps.setNull(13, Types.DOUBLE);
}
private int[] onBatchUpdate(List<TsKvLatestEntity> entities, KeyHolder keyHolder) {
return jdbcTemplate.batchUpdate(new SimplePreparedStatementCreator(batchUpdateQuery), new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
TsKvLatestEntity tsKvLatestEntity = entities.get(i);
ps.setLong(1, tsKvLatestEntity.getTs());
if (tsKvLatestEntity.getBooleanValue() != null) {
ps.setBoolean(2, tsKvLatestEntity.getBooleanValue());
} else {
ps.setNull(2, Types.BOOLEAN);
}
ps.setString(3, replaceNullChars(tsKvLatestEntity.getStrValue()));
if (tsKvLatestEntity.getLongValue() != null) {
ps.setLong(4, tsKvLatestEntity.getLongValue());
} else {
ps.setNull(4, Types.BIGINT);
}
if (tsKvLatestEntity.getDoubleValue() != null) {
ps.setDouble(5, tsKvLatestEntity.getDoubleValue());
} else {
ps.setNull(5, Types.DOUBLE);
}
ps.setString(6, replaceNullChars(tsKvLatestEntity.getJsonValue()));
ps.setObject(7, tsKvLatestEntity.getEntityId());
ps.setInt(8, tsKvLatestEntity.getKey());
if (updateByLatestTs) {
ps.setLong(9, tsKvLatestEntity.getTs());
}
}
@Override
public int getBatchSize() {
return entities.size();
}
}, keyHolder);
ps.setString(8, replaceNullChars(tsKvLatestEntity.getJsonValue()));
ps.setString(14, replaceNullChars(tsKvLatestEntity.getJsonValue()));
}
private void onInsertOrUpdate(List<TsKvLatestEntity> insertEntities, KeyHolder keyHolder) {
jdbcTemplate.batchUpdate(new SimplePreparedStatementCreator(insertOrUpdateQuery), new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
TsKvLatestEntity tsKvLatestEntity = insertEntities.get(i);
ps.setObject(1, tsKvLatestEntity.getEntityId());
ps.setInt(2, tsKvLatestEntity.getKey());
ps.setLong(3, tsKvLatestEntity.getTs());
ps.setLong(9, tsKvLatestEntity.getTs());
if (updateByLatestTs) {
ps.setLong(15, tsKvLatestEntity.getTs());
}
if (tsKvLatestEntity.getBooleanValue() != null) {
ps.setBoolean(4, tsKvLatestEntity.getBooleanValue());
ps.setBoolean(10, tsKvLatestEntity.getBooleanValue());
} else {
ps.setNull(4, Types.BOOLEAN);
ps.setNull(10, Types.BOOLEAN);
}
ps.setString(5, replaceNullChars(tsKvLatestEntity.getStrValue()));
ps.setString(11, replaceNullChars(tsKvLatestEntity.getStrValue()));
if (tsKvLatestEntity.getLongValue() != null) {
ps.setLong(6, tsKvLatestEntity.getLongValue());
ps.setLong(12, tsKvLatestEntity.getLongValue());
} else {
ps.setNull(6, Types.BIGINT);
ps.setNull(12, Types.BIGINT);
}
if (tsKvLatestEntity.getDoubleValue() != null) {
ps.setDouble(7, tsKvLatestEntity.getDoubleValue());
ps.setDouble(13, tsKvLatestEntity.getDoubleValue());
} else {
ps.setNull(7, Types.DOUBLE);
ps.setNull(13, Types.DOUBLE);
}
ps.setString(8, replaceNullChars(tsKvLatestEntity.getJsonValue()));
ps.setString(14, replaceNullChars(tsKvLatestEntity.getJsonValue()));
}
@Override
public int getBatchSize() {
return insertEntities.size();
}
}, keyHolder);
@Override
protected String getBatchUpdateQuery() {
return batchUpdateQuery;
}
private static class SimplePreparedStatementCreator implements PreparedStatementCreator, SqlProvider {
private static final String[] COLUMNS = {SEQ_NUMBER};
private final String sql;
public SimplePreparedStatementCreator(String sql) {
this.sql = sql;
}
@Override
public PreparedStatement createPreparedStatement(Connection con) throws SQLException {
return con.prepareStatement(sql, COLUMNS);
}
@Override
public String getSql() {
return this.sql;
}
@Override
protected String getInsertOrUpdateQuery() {
return insertOrUpdateQuery;
}
}

8
dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java

@ -18,8 +18,9 @@ package org.thingsboard.server.dao.sqlts.timescale;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
@ -38,7 +39,6 @@ import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.dictionary.KeyDictionaryDao;
import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity;
import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity;
import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
import org.thingsboard.server.dao.sqlts.AbstractSqlTimeseriesDao;
@ -47,8 +47,6 @@ import org.thingsboard.server.dao.timeseries.TimeseriesDao;
import org.thingsboard.server.dao.util.TimeUtils;
import org.thingsboard.server.dao.util.TimescaleDBTsDao;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@ -77,7 +75,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
@Autowired
protected KeyDictionaryDao keyDictionaryDao;
protected TbSqlBlockingQueueWrapper<TimescaleTsKvEntity> tsQueue;
protected TbSqlBlockingQueueWrapper<TimescaleTsKvEntity, Void> tsQueue;
@PostConstruct
protected void init() {

4
dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java

@ -187,8 +187,8 @@ public class BaseTimeseriesService implements TimeseriesService {
}
@Override
public ListenableFuture<List<Void>> saveLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries) {
List<ListenableFuture<Void>> futures = new ArrayList<>(tsKvEntries.size());
public ListenableFuture<List<Long>> saveLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries) {
List<ListenableFuture<Long>> futures = new ArrayList<>(tsKvEntries.size());
for (TsKvEntry tsKvEntry : tsKvEntries) {
futures.add(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry));
}

2
dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java

@ -100,7 +100,7 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes
}
@Override
public ListenableFuture<Void> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {
public ListenableFuture<Long> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {
BoundStatementBuilder stmtBuilder = new BoundStatementBuilder(getLatestStmt().bind());
stmtBuilder.setString(0, entityId.getEntityType().name())
.setUuid(1, entityId.getId())

2
dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java

@ -42,7 +42,7 @@ public interface TimeseriesLatestDao {
ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId);
ListenableFuture<Void> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry);
ListenableFuture<Long> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry);
ListenableFuture<TsKvLatestRemovingResult> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query);

3
dao/src/main/resources/sql/schema-entities.sql

@ -102,6 +102,8 @@ CREATE TABLE IF NOT EXISTS audit_log (
action_failure_details varchar(1000000)
) PARTITION BY RANGE (created_time);
CREATE SEQUENCE IF NOT EXISTS attribute_kv_latest_seq cache 1000;
CREATE TABLE IF NOT EXISTS attribute_kv (
entity_id uuid,
attribute_type int,
@ -112,6 +114,7 @@ CREATE TABLE IF NOT EXISTS attribute_kv (
dbl_v double precision,
json_v json,
last_update_ts bigint,
seq_number bigint,
CONSTRAINT attribute_kv_pkey PRIMARY KEY (entity_id, attribute_type, attribute_key)
);

Loading…
Cancel
Save