diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java index fedbd48503..8544f60ca0 100644 --- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java +++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java @@ -20,6 +20,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.media.ArraySchema; import io.swagger.v3.oas.annotations.media.Content; import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.responses.ApiResponse; @@ -30,6 +31,7 @@ import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; @@ -488,7 +490,7 @@ public class DeviceController extends BaseController { @RequestMapping(value = "/devices", params = {"deviceIds"}, method = RequestMethod.GET) @ResponseBody public List getDevicesByIds( - @Parameter(description = "A list of devices ids, separated by comma ','") + @Parameter(description = "A list of devices ids, separated by comma ','", array = @ArraySchema(schema = @Schema(type = "string"))) @RequestParam("deviceIds") String[] strDeviceIds) throws ThingsboardException, ExecutionException, InterruptedException { checkArrayParameter("deviceIds", strDeviceIds); SecurityUser user = getCurrentUser(); diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index b19fde2983..3aace70533 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -280,7 +280,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @Override public void saveLatestAndNotifyInternal(TenantId tenantId, EntityId entityId, List ts, FutureCallback callback) { - ListenableFuture> saveFuture = tsService.saveLatest(tenantId, entityId, ts); + ListenableFuture> saveFuture = tsService.saveLatest(tenantId, entityId, ts); addVoidCallback(saveFuture, callback); addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, ts)); } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java index ffea217da7..eaba144042 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java @@ -50,7 +50,7 @@ public interface TimeseriesService { ListenableFuture saveWithoutLatest(TenantId tenantId, EntityId entityId, List tsKvEntry, long ttl); - ListenableFuture> saveLatest(TenantId tenantId, EntityId entityId, List tsKvEntry); + ListenableFuture> saveLatest(TenantId tenantId, EntityId entityId, List tsKvEntry); ListenableFuture> remove(TenantId tenantId, EntityId entityId, List queries); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/util/CollectionsUtil.java b/common/data/src/main/java/org/thingsboard/server/common/data/util/CollectionsUtil.java index 4c9006e829..e89f3c1ad8 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/util/CollectionsUtil.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/util/CollectionsUtil.java @@ -17,6 +17,7 @@ package org.thingsboard.server.common.data.util; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -37,6 +38,13 @@ public class CollectionsUtil { return b.stream().filter(p -> !a.contains(p)).collect(Collectors.toSet()); } + /** + * Returns new list with elements that are present in list B(new) but absent in list A(old). + */ + public static List diffLists(List a, List b) { + return b.stream().filter(p -> !a.contains(p)).collect(Collectors.toList()); + } + public static boolean contains(Collection collection, T element) { return isNotEmpty(collection) && collection.contains(element); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/AbstractSequenceInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/AbstractSequenceInsertRepository.java new file mode 100644 index 0000000000..0808f5341f --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/AbstractSequenceInsertRepository.java @@ -0,0 +1,128 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao; + +import org.springframework.jdbc.core.BatchPreparedStatementSetter; +import org.springframework.jdbc.core.PreparedStatementCreator; +import org.springframework.jdbc.core.SqlProvider; +import org.springframework.jdbc.support.GeneratedKeyHolder; +import org.springframework.jdbc.support.KeyHolder; +import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public abstract class AbstractSequenceInsertRepository extends AbstractInsertRepository { + + public static final String SEQ_NUMBER = "seq_number"; + + public List saveOrUpdate(List entities) { + return transactionTemplate.execute(status -> { + List seqNumbers = new ArrayList<>(entities.size()); + + KeyHolder keyHolder = new GeneratedKeyHolder(); + + int[] updateResult = onBatchUpdate(entities, keyHolder); + + List> seqNumbersList = keyHolder.getKeyList(); + + int notUpdatedCount = entities.size() - seqNumbersList.size(); + + List toInsertIndexes = new ArrayList<>(notUpdatedCount); + List insertEntities = new ArrayList<>(notUpdatedCount); + int keyHolderIndex = 0; + for (int i = 0; i < updateResult.length; i++) { + if (updateResult[i] == 0) { + insertEntities.add(entities.get(i)); + seqNumbers.add(0L); + toInsertIndexes.add(i); + } else { + seqNumbers.add((Long) seqNumbersList.get(keyHolderIndex).get(SEQ_NUMBER)); + keyHolderIndex++; + } + } + + if (insertEntities.isEmpty()) { + return seqNumbers; + } + + onInsertOrUpdate(insertEntities, keyHolder); + + seqNumbersList = keyHolder.getKeyList(); + + for (int i = 0; i < seqNumbersList.size(); i++) { + seqNumbers.set(toInsertIndexes.get(i), (Long) seqNumbersList.get(i).get(SEQ_NUMBER)); + } + + return seqNumbers; + }); + } + + private int[] onBatchUpdate(List entities, KeyHolder keyHolder) { + return jdbcTemplate.batchUpdate(new SequencePreparedStatementCreator(getBatchUpdateQuery()), new BatchPreparedStatementSetter() { + @Override + public void setValues(PreparedStatement ps, int i) throws SQLException { + setOnBatchUpdateValues(ps, i, entities); + } + + @Override + public int getBatchSize() { + return entities.size(); + } + }, keyHolder); + } + + private void onInsertOrUpdate(List insertEntities, KeyHolder keyHolder) { + jdbcTemplate.batchUpdate(new SequencePreparedStatementCreator(getInsertOrUpdateQuery()), new BatchPreparedStatementSetter() { + @Override + public void setValues(PreparedStatement ps, int i) throws SQLException { + setOnInsertOrUpdateValues(ps, i, insertEntities); + } + + @Override + public int getBatchSize() { + return insertEntities.size(); + } + }, keyHolder); + } + + protected abstract void setOnBatchUpdateValues(PreparedStatement ps, int i, List entities) throws SQLException; + + protected abstract void setOnInsertOrUpdateValues(PreparedStatement ps, int i, List entities) throws SQLException; + + protected abstract String getBatchUpdateQuery(); + + protected abstract String getInsertOrUpdateQuery(); + + private record SequencePreparedStatementCreator(String sql) implements PreparedStatementCreator, SqlProvider { + + private static final String[] COLUMNS = {SEQ_NUMBER}; + + @Override + public PreparedStatement createPreparedStatement(Connection con) throws SQLException { + return con.prepareStatement(sql, COLUMNS); + } + + @Override + public String getSql() { + return this.sql; + } + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java index 8f580811a1..68221ad0ae 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java @@ -19,6 +19,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.common.data.util.CollectionsUtil; import org.thingsboard.server.common.stats.MessagesStats; import java.util.ArrayList; @@ -29,14 +30,13 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.Stream; @Slf4j -public class TbSqlBlockingQueue implements TbSqlQueue { +public class TbSqlBlockingQueue implements TbSqlQueue { - private final BlockingQueue> queue = new LinkedBlockingQueue<>(); + private final BlockingQueue> queue = new LinkedBlockingQueue<>(); private final TbSqlBlockingQueueParams params; private ExecutorService executor; @@ -48,17 +48,17 @@ public class TbSqlBlockingQueue implements TbSqlQueue { } @Override - public void init(ScheduledLogExecutorComponent logExecutor, Consumer> saveFunction, Comparator batchUpdateComparator, int index) { + public void init(ScheduledLogExecutorComponent logExecutor, Function, List> saveFunction, Comparator batchUpdateComparator, Function>, List>> filter, int index) { executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("sql-queue-" + index + "-" + params.getLogName().toLowerCase())); executor.submit(() -> { String logName = params.getLogName(); int batchSize = params.getBatchSize(); long maxDelay = params.getMaxDelay(); - final List> entities = new ArrayList<>(batchSize); + final List> entities = new ArrayList<>(batchSize); while (!Thread.interrupted()) { try { long currentTs = System.currentTimeMillis(); - TbSqlQueueElement attr = queue.poll(maxDelay, TimeUnit.MILLISECONDS); + TbSqlQueueElement attr = queue.poll(maxDelay, TimeUnit.MILLISECONDS); if (attr == null) { continue; } else { @@ -70,12 +70,27 @@ public class TbSqlBlockingQueue implements TbSqlQueue { log.debug("[{}] Going to save {} entities", logName, entities.size()); log.trace("[{}] Going to save entities: {}", logName, entities); } - Stream entitiesStream = entities.stream().map(TbSqlQueueElement::getEntity); - saveFunction.accept( - (params.isBatchSortEnabled() ? entitiesStream.sorted(batchUpdateComparator) : entitiesStream) - .collect(Collectors.toList()) - ); - entities.forEach(v -> v.getFuture().set(null)); + + List> entitiesToSave = filter.apply(entities); + + if (params.isBatchSortEnabled()) { + entitiesToSave = entitiesToSave.stream().sorted((o1, o2) -> batchUpdateComparator.compare(o1.getEntity(), o2.getEntity())).toList(); + } + + List result = saveFunction.apply(entitiesToSave.stream().map(TbSqlQueueElement::getEntity).collect(Collectors.toList())); + + if (params.isWithResponse()) { + for (int i = 0; i < entitiesToSave.size(); i++) { + entitiesToSave.get(i).getFuture().set(result.get(i)); + } + + if (entities.size() > entitiesToSave.size()) { + CollectionsUtil.diffLists(entitiesToSave, entities).forEach(v -> v.getFuture().set(null)); + } + } else { + entities.forEach(v -> v.getFuture().set(null)); + } + stats.incrementSuccessful(entities.size()); if (!fullPack) { long remainingDelay = maxDelay - (System.currentTimeMillis() - currentTs); @@ -104,7 +119,7 @@ public class TbSqlBlockingQueue implements TbSqlQueue { }); logExecutor.scheduleAtFixedRate(() -> { - if (queue.size() > 0 || stats.getTotal() > 0 || stats.getSuccessful() > 0 || stats.getFailed() > 0) { + if (!queue.isEmpty() || stats.getTotal() > 0 || stats.getSuccessful() > 0 || stats.getFailed() > 0) { log.info("Queue-{} [{}] queueSize [{}] totalAdded [{}] totalSaved [{}] totalFailed [{}]", index, params.getLogName(), queue.size(), stats.getTotal(), stats.getSuccessful(), stats.getFailed()); stats.reset(); @@ -120,8 +135,8 @@ public class TbSqlBlockingQueue implements TbSqlQueue { } @Override - public ListenableFuture add(E element) { - SettableFuture future = SettableFuture.create(); + public ListenableFuture add(E element) { + SettableFuture future = SettableFuture.create(); queue.add(new TbSqlQueueElement<>(future, element)); stats.incrementTotal(); return future; diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueueParams.java b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueueParams.java index 56b7ad3af7..42ab6be049 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueueParams.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueueParams.java @@ -30,4 +30,5 @@ public class TbSqlBlockingQueueParams { private final long statsPrintIntervalMs; private final String statsNamePrefix; private final boolean batchSortEnabled; + private final boolean withResponse; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueueWrapper.java b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueueWrapper.java index 9c5bbbc855..3e9620de76 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueueWrapper.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueueWrapper.java @@ -29,10 +29,9 @@ import java.util.function.Function; @Slf4j @Data -public class TbSqlBlockingQueueWrapper { - private final CopyOnWriteArrayList> queues = new CopyOnWriteArrayList<>(); +public class TbSqlBlockingQueueWrapper { + private final CopyOnWriteArrayList> queues = new CopyOnWriteArrayList<>(); private final TbSqlBlockingQueueParams params; - private ScheduledLogExecutorComponent logExecutor; private final Function hashCodeFunction; private final int maxThreads; private final StatsFactory statsFactory; @@ -46,15 +45,19 @@ public class TbSqlBlockingQueueWrapper { * NOTE: you must use all of primary key parts in your comparator */ public void init(ScheduledLogExecutorComponent logExecutor, Consumer> saveFunction, Comparator batchUpdateComparator) { + init(logExecutor, l -> { saveFunction.accept(l); return null; }, batchUpdateComparator, l -> l); + } + + public void init(ScheduledLogExecutorComponent logExecutor, Function, List> saveFunction, Comparator batchUpdateComparator, Function>, List>> filter) { for (int i = 0; i < maxThreads; i++) { MessagesStats stats = statsFactory.createMessagesStats(params.getStatsNamePrefix() + ".queue." + i); - TbSqlBlockingQueue queue = new TbSqlBlockingQueue<>(params, stats); + TbSqlBlockingQueue queue = new TbSqlBlockingQueue<>(params, stats); queues.add(queue); - queue.init(logExecutor, saveFunction, batchUpdateComparator, i); + queue.init(logExecutor, saveFunction, batchUpdateComparator, filter, i); } } - public ListenableFuture add(E element) { + public ListenableFuture add(E element) { int queueIndex = element != null ? (hashCodeFunction.apply(element) & 0x7FFFFFFF) % maxThreads : 0; return queues.get(queueIndex).add(element); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlQueue.java b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlQueue.java index 90b4e0fe67..e1ed8c299c 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlQueue.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlQueue.java @@ -19,13 +19,13 @@ import com.google.common.util.concurrent.ListenableFuture; import java.util.Comparator; import java.util.List; -import java.util.function.Consumer; +import java.util.function.Function; -public interface TbSqlQueue { +public interface TbSqlQueue { - void init(ScheduledLogExecutorComponent logExecutor, Consumer> saveFunction, Comparator batchUpdateComparator, int queueIndex); + void init(ScheduledLogExecutorComponent logExecutor, Function, List> saveFunction, Comparator batchUpdateComparator, Function>, List>> filter, int queueIndex); void destroy(); - ListenableFuture add(E element); + ListenableFuture add(E element); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlQueueElement.java b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlQueueElement.java index 15031be244..016c5c9527 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlQueueElement.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlQueueElement.java @@ -20,13 +20,13 @@ import lombok.Getter; import lombok.ToString; @ToString(exclude = "future") -public final class TbSqlQueueElement { +public final class TbSqlQueueElement { @Getter - private final SettableFuture future; + private final SettableFuture future; @Getter private final E entity; - public TbSqlQueueElement(SettableFuture future, E entity) { + public TbSqlQueueElement(SettableFuture future, E entity) { this.future = future; this.entity = entity; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvInsertRepository.java index 856b2be381..9e188d39b1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvInsertRepository.java @@ -15,162 +15,110 @@ */ package org.thingsboard.server.dao.sql.attributes; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.jdbc.core.BatchPreparedStatementSetter; -import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.support.TransactionCallbackWithoutResult; -import org.springframework.transaction.support.TransactionTemplate; +import org.springframework.transaction.annotation.Transactional; +import org.thingsboard.server.dao.AbstractSequenceInsertRepository; import org.thingsboard.server.dao.model.sql.AttributeKvEntity; import org.thingsboard.server.dao.util.SqlDao; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Types; -import java.util.ArrayList; import java.util.List; -import java.util.regex.Pattern; @Repository -@Slf4j +@Transactional @SqlDao -public abstract class AttributeKvInsertRepository { +public class AttributeKvInsertRepository extends AbstractSequenceInsertRepository { - private static final ThreadLocal PATTERN_THREAD_LOCAL = ThreadLocal.withInitial(() -> Pattern.compile(String.valueOf(Character.MIN_VALUE))); - private static final String EMPTY_STR = ""; - - private static final String BATCH_UPDATE = "UPDATE attribute_kv SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), last_update_ts = ? " + - "WHERE entity_id = ? and attribute_type =? and attribute_key = ?;"; + private static final String BATCH_UPDATE = "UPDATE attribute_kv SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), last_update_ts = ?, seq_number = nextval('attribute_kv_latest_seq') " + + "WHERE entity_id = ? and attribute_type =? and attribute_key = ? RETURNING seq_number;"; private static final String INSERT_OR_UPDATE = - "INSERT INTO attribute_kv (entity_id, attribute_type, attribute_key, str_v, long_v, dbl_v, bool_v, json_v, last_update_ts) " + - "VALUES(?, ?, ?, ?, ?, ?, ?, cast(? AS json), ?) " + + "INSERT INTO attribute_kv (entity_id, attribute_type, attribute_key, str_v, long_v, dbl_v, bool_v, json_v, last_update_ts, seq_number) " + + "VALUES(?, ?, ?, ?, ?, ?, ?, cast(? AS json), ?, nextval('attribute_kv_latest_seq')) " + "ON CONFLICT (entity_id, attribute_type, attribute_key) " + - "DO UPDATE SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), last_update_ts = ?;"; - - @Autowired - protected JdbcTemplate jdbcTemplate; - - @Autowired - private TransactionTemplate transactionTemplate; - - @Value("${sql.remove_null_chars:true}") - private boolean removeNullChars; - - public void saveOrUpdate(List entities) { - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - @Override - protected void doInTransactionWithoutResult(TransactionStatus status) { - int[] result = jdbcTemplate.batchUpdate(BATCH_UPDATE, new BatchPreparedStatementSetter() { - @Override - public void setValues(PreparedStatement ps, int i) throws SQLException { - AttributeKvEntity kvEntity = entities.get(i); - ps.setString(1, replaceNullChars(kvEntity.getStrValue())); - - if (kvEntity.getLongValue() != null) { - ps.setLong(2, kvEntity.getLongValue()); - } else { - ps.setNull(2, Types.BIGINT); - } - - if (kvEntity.getDoubleValue() != null) { - ps.setDouble(3, kvEntity.getDoubleValue()); - } else { - ps.setNull(3, Types.DOUBLE); - } - - if (kvEntity.getBooleanValue() != null) { - ps.setBoolean(4, kvEntity.getBooleanValue()); - } else { - ps.setNull(4, Types.BOOLEAN); - } - - ps.setString(5, replaceNullChars(kvEntity.getJsonValue())); - - ps.setLong(6, kvEntity.getLastUpdateTs()); - ps.setObject(7, kvEntity.getId().getEntityId()); - ps.setInt(8, kvEntity.getId().getAttributeType()); - ps.setInt(9, kvEntity.getId().getAttributeKey()); - } - - @Override - public int getBatchSize() { - return entities.size(); - } - }); - - int updatedCount = 0; - for (int i = 0; i < result.length; i++) { - if (result[i] == 0) { - updatedCount++; - } - } - - List insertEntities = new ArrayList<>(updatedCount); - for (int i = 0; i < result.length; i++) { - if (result[i] == 0) { - insertEntities.add(entities.get(i)); - } - } - - jdbcTemplate.batchUpdate(INSERT_OR_UPDATE, new BatchPreparedStatementSetter() { - @Override - public void setValues(PreparedStatement ps, int i) throws SQLException { - AttributeKvEntity kvEntity = insertEntities.get(i); - ps.setObject(1, kvEntity.getId().getEntityId()); - ps.setInt(2, kvEntity.getId().getAttributeType()); - ps.setInt(3, kvEntity.getId().getAttributeKey()); - - ps.setString(4, replaceNullChars(kvEntity.getStrValue())); - ps.setString(10, replaceNullChars(kvEntity.getStrValue())); - - if (kvEntity.getLongValue() != null) { - ps.setLong(5, kvEntity.getLongValue()); - ps.setLong(11, kvEntity.getLongValue()); - } else { - ps.setNull(5, Types.BIGINT); - ps.setNull(11, Types.BIGINT); - } - - if (kvEntity.getDoubleValue() != null) { - ps.setDouble(6, kvEntity.getDoubleValue()); - ps.setDouble(12, kvEntity.getDoubleValue()); - } else { - ps.setNull(6, Types.DOUBLE); - ps.setNull(12, Types.DOUBLE); - } - - if (kvEntity.getBooleanValue() != null) { - ps.setBoolean(7, kvEntity.getBooleanValue()); - ps.setBoolean(13, kvEntity.getBooleanValue()); - } else { - ps.setNull(7, Types.BOOLEAN); - ps.setNull(13, Types.BOOLEAN); - } - - ps.setString(8, replaceNullChars(kvEntity.getJsonValue())); - ps.setString(14, replaceNullChars(kvEntity.getJsonValue())); - - ps.setLong(9, kvEntity.getLastUpdateTs()); - ps.setLong(15, kvEntity.getLastUpdateTs()); - } - - @Override - public int getBatchSize() { - return insertEntities.size(); - } - }); - } - }); + "DO UPDATE SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), last_update_ts = ?, seq_number = nextval('attribute_kv_latest_seq') RETURNING seq_number;"; + + @Override + protected void setOnBatchUpdateValues(PreparedStatement ps, int i, List entities) throws SQLException { + AttributeKvEntity kvEntity = entities.get(i); + ps.setString(1, replaceNullChars(kvEntity.getStrValue())); + + if (kvEntity.getLongValue() != null) { + ps.setLong(2, kvEntity.getLongValue()); + } else { + ps.setNull(2, Types.BIGINT); + } + + if (kvEntity.getDoubleValue() != null) { + ps.setDouble(3, kvEntity.getDoubleValue()); + } else { + ps.setNull(3, Types.DOUBLE); + } + + if (kvEntity.getBooleanValue() != null) { + ps.setBoolean(4, kvEntity.getBooleanValue()); + } else { + ps.setNull(4, Types.BOOLEAN); + } + + ps.setString(5, replaceNullChars(kvEntity.getJsonValue())); + + ps.setLong(6, kvEntity.getLastUpdateTs()); + ps.setObject(7, kvEntity.getId().getEntityId()); + ps.setInt(8, kvEntity.getId().getAttributeType()); + ps.setInt(9, kvEntity.getId().getAttributeKey()); } - private String replaceNullChars(String strValue) { - if (removeNullChars && strValue != null) { - return PATTERN_THREAD_LOCAL.get().matcher(strValue).replaceAll(EMPTY_STR); + @Override + protected void setOnInsertOrUpdateValues(PreparedStatement ps, int i, List insertEntities) throws SQLException { + AttributeKvEntity kvEntity = insertEntities.get(i); + ps.setObject(1, kvEntity.getId().getEntityId()); + ps.setInt(2, kvEntity.getId().getAttributeType()); + ps.setInt(3, kvEntity.getId().getAttributeKey()); + + ps.setString(4, replaceNullChars(kvEntity.getStrValue())); + ps.setString(10, replaceNullChars(kvEntity.getStrValue())); + + if (kvEntity.getLongValue() != null) { + ps.setLong(5, kvEntity.getLongValue()); + ps.setLong(11, kvEntity.getLongValue()); + } else { + ps.setNull(5, Types.BIGINT); + ps.setNull(11, Types.BIGINT); } - return strValue; + + if (kvEntity.getDoubleValue() != null) { + ps.setDouble(6, kvEntity.getDoubleValue()); + ps.setDouble(12, kvEntity.getDoubleValue()); + } else { + ps.setNull(6, Types.DOUBLE); + ps.setNull(12, Types.DOUBLE); + } + + if (kvEntity.getBooleanValue() != null) { + ps.setBoolean(7, kvEntity.getBooleanValue()); + ps.setBoolean(13, kvEntity.getBooleanValue()); + } else { + ps.setNull(7, Types.BOOLEAN); + ps.setNull(13, Types.BOOLEAN); + } + + ps.setString(8, replaceNullChars(kvEntity.getJsonValue())); + ps.setString(14, replaceNullChars(kvEntity.getJsonValue())); + + ps.setLong(9, kvEntity.getLastUpdateTs()); + ps.setLong(15, kvEntity.getLastUpdateTs()); + } + + @Override + protected String getBatchUpdateQuery() { + return BATCH_UPDATE; + } + + @Override + protected String getInsertOrUpdateQuery() { + return INSERT_OR_UPDATE; } } \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java index 320e0e67fb..d8b4a9fedf 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java @@ -88,7 +88,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl @Value("${sql.batch_sort:true}") private boolean batchSortEnabled; - private TbSqlBlockingQueueWrapper queue; + private TbSqlBlockingQueueWrapper queue; @PostConstruct private void init() { @@ -99,6 +99,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl .statsPrintIntervalMs(statsPrintIntervalMs) .statsNamePrefix("attributes") .batchSortEnabled(batchSortEnabled) + .withResponse(true) .build(); Function hashcodeFunction = entity -> entity.getId().getEntityId().hashCode(); @@ -106,7 +107,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl queue.init(logExecutor, v -> attributeKvInsertRepository.saveOrUpdate(v), Comparator.comparing((AttributeKvEntity attributeKvEntity) -> attributeKvEntity.getId().getEntityId()) .thenComparing(attributeKvEntity -> attributeKvEntity.getId().getAttributeType()) - .thenComparing(attributeKvEntity -> attributeKvEntity.getId().getAttributeKey()) + .thenComparing(attributeKvEntity -> attributeKvEntity.getId().getAttributeKey()), l -> l ); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/SqlAttributesInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/SqlAttributesInsertRepository.java deleted file mode 100644 index a99f5a24e7..0000000000 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/SqlAttributesInsertRepository.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Copyright © 2016-2024 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.dao.sql.attributes; - -import org.springframework.stereotype.Repository; -import org.springframework.transaction.annotation.Transactional; -import org.thingsboard.server.dao.util.SqlDao; - -@Repository -@Transactional -@SqlDao -public class SqlAttributesInsertRepository extends AttributeKvInsertRepository { - -} \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java index 1c64973d94..d9d2282fca 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java @@ -90,7 +90,7 @@ public class JpaBaseEdgeEventDao extends JpaPartitionedAbstractDao queue; + private TbSqlBlockingQueueWrapper queue; @Override protected Class getEntityClass() { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java index afd6608774..33e7567664 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java @@ -110,7 +110,7 @@ public class JpaBaseEventDao implements EventDao { @Value("${sql.batch_sort:true}") private boolean batchSortEnabled; - private TbSqlBlockingQueueWrapper queue; + private TbSqlBlockingQueueWrapper queue; private final Map> repositories = new ConcurrentHashMap<>(); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java index 9a52f3a192..f3b7b96c87 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java @@ -60,7 +60,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq @Autowired protected InsertTsRepository insertRepository; - protected TbSqlBlockingQueueWrapper tsQueue; + protected TbSqlBlockingQueueWrapper tsQueue; @Autowired private StatsFactory statsFactory; diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java index 425bb10a0e..513414b8ac 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java @@ -46,6 +46,7 @@ import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity; import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper; +import org.thingsboard.server.dao.sql.TbSqlQueueElement; import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository; import org.thingsboard.server.dao.sqlts.latest.SearchTsKvLatestRepository; import org.thingsboard.server.dao.sqlts.latest.TsKvLatestRepository; @@ -81,7 +82,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme @Autowired private InsertLatestTsRepository insertLatestTsRepository; - private TbSqlBlockingQueueWrapper tsLatestQueue; + private TbSqlBlockingQueueWrapper tsLatestQueue; @Value("${sql.ts_latest.batch_size:1000}") private int tsLatestBatchSize; @@ -115,25 +116,26 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme .maxDelay(tsLatestMaxDelay) .statsPrintIntervalMs(tsLatestStatsPrintIntervalMs) .statsNamePrefix("ts.latest") - .batchSortEnabled(false) + .batchSortEnabled(batchSortEnabled) + .withResponse(true) .build(); java.util.function.Function hashcodeFunction = entity -> entity.getEntityId().hashCode(); tsLatestQueue = new TbSqlBlockingQueueWrapper<>(tsLatestParams, hashcodeFunction, tsLatestBatchThreads, statsFactory); - tsLatestQueue.init(logExecutor, v -> { - Map trueLatest = new HashMap<>(); - v.forEach(ts -> { - TsKey key = new TsKey(ts.getEntityId(), ts.getKey()); - trueLatest.merge(key, ts, (oldTs, newTs) -> oldTs.getTs() <= newTs.getTs() ? newTs : oldTs); - }); - List latestEntities = new ArrayList<>(trueLatest.values()); - if (batchSortEnabled) { - latestEntities.sort(Comparator.comparing((Function) AbstractTsKvEntity::getEntityId) - .thenComparingInt(AbstractTsKvEntity::getKey)); - } - insertLatestTsRepository.saveOrUpdate(latestEntities); - }, (l, r) -> 0); + tsLatestQueue.init(logExecutor, + v -> insertLatestTsRepository.saveOrUpdate(v), + Comparator.comparing((Function) AbstractTsKvEntity::getEntityId) + .thenComparingInt(AbstractTsKvEntity::getKey), + v -> { + Map> trueLatest = new HashMap<>(); + v.forEach(element -> { + var entity = element.getEntity(); + TsKey key = new TsKey(entity.getEntityId(), entity.getKey()); + trueLatest.merge(key, element, (oldElement, newElement) -> oldElement.getEntity().getTs() <= newElement.getEntity().getTs() ? newElement : oldElement); + }); + return new ArrayList<>(trueLatest.values()); + }); } @PreDestroy @@ -144,7 +146,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme } @Override - public ListenableFuture saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { + public ListenableFuture saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { return getSaveLatestFuture(entityId, tsKvEntry); } @@ -247,7 +249,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme searchTsKvLatestRepository.findAllByEntityId(entityId.getId())))); } - protected ListenableFuture getSaveLatestFuture(EntityId entityId, TsKvEntry tsKvEntry) { + protected ListenableFuture getSaveLatestFuture(EntityId entityId, TsKvEntry tsKvEntry) { TsKvLatestEntity latestEntity = new TsKvLatestEntity(); latestEntity.setEntityId(entityId.getId()); latestEntity.setTs(tsKvEntry.getTs()); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java index 48b54c2281..5b953dfe59 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java @@ -17,33 +17,25 @@ package org.thingsboard.server.dao.sqlts.insert.latest.sql; import jakarta.annotation.PostConstruct; import org.springframework.beans.factory.annotation.Value; -import org.springframework.jdbc.core.BatchPreparedStatementSetter; -import org.springframework.jdbc.core.PreparedStatementCreator; -import org.springframework.jdbc.core.SqlProvider; -import org.springframework.jdbc.support.GeneratedKeyHolder; -import org.springframework.jdbc.support.KeyHolder; import org.springframework.stereotype.Repository; import org.springframework.transaction.annotation.Transactional; +import org.thingsboard.server.dao.AbstractSequenceInsertRepository; import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity; -import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository; import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository; import org.thingsboard.server.dao.util.SqlDao; import org.thingsboard.server.dao.util.SqlTsLatestAnyDao; -import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Types; -import java.util.ArrayList; import java.util.List; -import java.util.Map; @SqlTsLatestAnyDao @Repository @Transactional @SqlDao -public class SqlLatestInsertTsRepository extends AbstractInsertRepository implements InsertLatestTsRepository { +public class SqlLatestInsertTsRepository extends AbstractSequenceInsertRepository implements InsertLatestTsRepository { @Value("${sql.ts_latest.update_by_latest_ts:true}") private Boolean updateByLatestTs; @@ -61,8 +53,6 @@ public class SqlLatestInsertTsRepository extends AbstractInsertRepository implem private static final String RETURNING = " RETURNING seq_number"; - private static final String SEQ_NUMBER = "seq_number"; - private String batchUpdateQuery; private String insertOrUpdateQuery; @@ -73,161 +63,89 @@ public class SqlLatestInsertTsRepository extends AbstractInsertRepository implem } @Override - public List saveOrUpdate(List entities) { - return transactionTemplate.execute(status -> { - List seqNumbers = new ArrayList<>(entities.size()); + protected void setOnBatchUpdateValues(PreparedStatement ps, int i, List entities) throws SQLException { + TsKvLatestEntity tsKvLatestEntity = entities.get(i); + ps.setLong(1, tsKvLatestEntity.getTs()); + + if (tsKvLatestEntity.getBooleanValue() != null) { + ps.setBoolean(2, tsKvLatestEntity.getBooleanValue()); + } else { + ps.setNull(2, Types.BOOLEAN); + } - KeyHolder keyHolder = new GeneratedKeyHolder(); + ps.setString(3, replaceNullChars(tsKvLatestEntity.getStrValue())); - int[] updateResult = onBatchUpdate(entities, keyHolder); + if (tsKvLatestEntity.getLongValue() != null) { + ps.setLong(4, tsKvLatestEntity.getLongValue()); + } else { + ps.setNull(4, Types.BIGINT); + } - List> seqNumbersList = keyHolder.getKeyList(); + if (tsKvLatestEntity.getDoubleValue() != null) { + ps.setDouble(5, tsKvLatestEntity.getDoubleValue()); + } else { + ps.setNull(5, Types.DOUBLE); + } - int notUpdatedCount = entities.size() - seqNumbersList.size(); + ps.setString(6, replaceNullChars(tsKvLatestEntity.getJsonValue())); - List toInsertIndexes = new ArrayList<>(notUpdatedCount); - List insertEntities = new ArrayList<>(notUpdatedCount); - int keyHolderIndex = 0; - for (int i = 0; i < updateResult.length; i++) { - if (updateResult[i] == 0) { - insertEntities.add(entities.get(i)); - seqNumbers.add(0L); - toInsertIndexes.add(i); - } else { - seqNumbers.add((Long) seqNumbersList.get(keyHolderIndex).get(SEQ_NUMBER)); - keyHolderIndex++; - } - } + ps.setObject(7, tsKvLatestEntity.getEntityId()); + ps.setInt(8, tsKvLatestEntity.getKey()); + if (updateByLatestTs) { + ps.setLong(9, tsKvLatestEntity.getTs()); + } + } - if (insertEntities.isEmpty()) { - return seqNumbers; - } + @Override + protected void setOnInsertOrUpdateValues(PreparedStatement ps, int i, List insertEntities) throws SQLException { + TsKvLatestEntity tsKvLatestEntity = insertEntities.get(i); + ps.setObject(1, tsKvLatestEntity.getEntityId()); + ps.setInt(2, tsKvLatestEntity.getKey()); + + ps.setLong(3, tsKvLatestEntity.getTs()); + ps.setLong(9, tsKvLatestEntity.getTs()); + if (updateByLatestTs) { + ps.setLong(15, tsKvLatestEntity.getTs()); + } - onInsertOrUpdate(insertEntities, keyHolder); + if (tsKvLatestEntity.getBooleanValue() != null) { + ps.setBoolean(4, tsKvLatestEntity.getBooleanValue()); + ps.setBoolean(10, tsKvLatestEntity.getBooleanValue()); + } else { + ps.setNull(4, Types.BOOLEAN); + ps.setNull(10, Types.BOOLEAN); + } - seqNumbersList = keyHolder.getKeyList(); + ps.setString(5, replaceNullChars(tsKvLatestEntity.getStrValue())); + ps.setString(11, replaceNullChars(tsKvLatestEntity.getStrValue())); - for (int i = 0; i < seqNumbersList.size(); i++) { - seqNumbers.set(toInsertIndexes.get(i), (Long) seqNumbersList.get(i).get(SEQ_NUMBER)); - } + if (tsKvLatestEntity.getLongValue() != null) { + ps.setLong(6, tsKvLatestEntity.getLongValue()); + ps.setLong(12, tsKvLatestEntity.getLongValue()); + } else { + ps.setNull(6, Types.BIGINT); + ps.setNull(12, Types.BIGINT); + } - return seqNumbers; - }); - } + if (tsKvLatestEntity.getDoubleValue() != null) { + ps.setDouble(7, tsKvLatestEntity.getDoubleValue()); + ps.setDouble(13, tsKvLatestEntity.getDoubleValue()); + } else { + ps.setNull(7, Types.DOUBLE); + ps.setNull(13, Types.DOUBLE); + } - private int[] onBatchUpdate(List entities, KeyHolder keyHolder) { - return jdbcTemplate.batchUpdate(new SimplePreparedStatementCreator(batchUpdateQuery), new BatchPreparedStatementSetter() { - @Override - public void setValues(PreparedStatement ps, int i) throws SQLException { - TsKvLatestEntity tsKvLatestEntity = entities.get(i); - ps.setLong(1, tsKvLatestEntity.getTs()); - - if (tsKvLatestEntity.getBooleanValue() != null) { - ps.setBoolean(2, tsKvLatestEntity.getBooleanValue()); - } else { - ps.setNull(2, Types.BOOLEAN); - } - - ps.setString(3, replaceNullChars(tsKvLatestEntity.getStrValue())); - - if (tsKvLatestEntity.getLongValue() != null) { - ps.setLong(4, tsKvLatestEntity.getLongValue()); - } else { - ps.setNull(4, Types.BIGINT); - } - - if (tsKvLatestEntity.getDoubleValue() != null) { - ps.setDouble(5, tsKvLatestEntity.getDoubleValue()); - } else { - ps.setNull(5, Types.DOUBLE); - } - - ps.setString(6, replaceNullChars(tsKvLatestEntity.getJsonValue())); - - ps.setObject(7, tsKvLatestEntity.getEntityId()); - ps.setInt(8, tsKvLatestEntity.getKey()); - if (updateByLatestTs) { - ps.setLong(9, tsKvLatestEntity.getTs()); - } - } - - @Override - public int getBatchSize() { - return entities.size(); - } - }, keyHolder); + ps.setString(8, replaceNullChars(tsKvLatestEntity.getJsonValue())); + ps.setString(14, replaceNullChars(tsKvLatestEntity.getJsonValue())); } - private void onInsertOrUpdate(List insertEntities, KeyHolder keyHolder) { - jdbcTemplate.batchUpdate(new SimplePreparedStatementCreator(insertOrUpdateQuery), new BatchPreparedStatementSetter() { - @Override - public void setValues(PreparedStatement ps, int i) throws SQLException { - TsKvLatestEntity tsKvLatestEntity = insertEntities.get(i); - ps.setObject(1, tsKvLatestEntity.getEntityId()); - ps.setInt(2, tsKvLatestEntity.getKey()); - - ps.setLong(3, tsKvLatestEntity.getTs()); - ps.setLong(9, tsKvLatestEntity.getTs()); - if (updateByLatestTs) { - ps.setLong(15, tsKvLatestEntity.getTs()); - } - - if (tsKvLatestEntity.getBooleanValue() != null) { - ps.setBoolean(4, tsKvLatestEntity.getBooleanValue()); - ps.setBoolean(10, tsKvLatestEntity.getBooleanValue()); - } else { - ps.setNull(4, Types.BOOLEAN); - ps.setNull(10, Types.BOOLEAN); - } - - ps.setString(5, replaceNullChars(tsKvLatestEntity.getStrValue())); - ps.setString(11, replaceNullChars(tsKvLatestEntity.getStrValue())); - - if (tsKvLatestEntity.getLongValue() != null) { - ps.setLong(6, tsKvLatestEntity.getLongValue()); - ps.setLong(12, tsKvLatestEntity.getLongValue()); - } else { - ps.setNull(6, Types.BIGINT); - ps.setNull(12, Types.BIGINT); - } - - if (tsKvLatestEntity.getDoubleValue() != null) { - ps.setDouble(7, tsKvLatestEntity.getDoubleValue()); - ps.setDouble(13, tsKvLatestEntity.getDoubleValue()); - } else { - ps.setNull(7, Types.DOUBLE); - ps.setNull(13, Types.DOUBLE); - } - - ps.setString(8, replaceNullChars(tsKvLatestEntity.getJsonValue())); - ps.setString(14, replaceNullChars(tsKvLatestEntity.getJsonValue())); - } - - @Override - public int getBatchSize() { - return insertEntities.size(); - } - }, keyHolder); + @Override + protected String getBatchUpdateQuery() { + return batchUpdateQuery; } - private static class SimplePreparedStatementCreator implements PreparedStatementCreator, SqlProvider { - - private static final String[] COLUMNS = {SEQ_NUMBER}; - private final String sql; - - - public SimplePreparedStatementCreator(String sql) { - this.sql = sql; - } - - @Override - public PreparedStatement createPreparedStatement(Connection con) throws SQLException { - return con.prepareStatement(sql, COLUMNS); - } - - @Override - public String getSql() { - return this.sql; - } + @Override + protected String getInsertOrUpdateQuery() { + return insertOrUpdateQuery; } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java index c8bff8ceb9..cd9f397bb3 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java @@ -18,8 +18,9 @@ package org.thingsboard.server.dao.sqlts.timescale; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Sort; @@ -38,7 +39,6 @@ import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.dictionary.KeyDictionaryDao; import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity; import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity; -import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper; import org.thingsboard.server.dao.sqlts.AbstractSqlTimeseriesDao; @@ -47,8 +47,6 @@ import org.thingsboard.server.dao.timeseries.TimeseriesDao; import org.thingsboard.server.dao.util.TimeUtils; import org.thingsboard.server.dao.util.TimescaleDBTsDao; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -77,7 +75,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements @Autowired protected KeyDictionaryDao keyDictionaryDao; - protected TbSqlBlockingQueueWrapper tsQueue; + protected TbSqlBlockingQueueWrapper tsQueue; @PostConstruct protected void init() { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index 60056b2b8f..f9c4218d64 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -187,8 +187,8 @@ public class BaseTimeseriesService implements TimeseriesService { } @Override - public ListenableFuture> saveLatest(TenantId tenantId, EntityId entityId, List tsKvEntries) { - List> futures = new ArrayList<>(tsKvEntries.size()); + public ListenableFuture> saveLatest(TenantId tenantId, EntityId entityId, List tsKvEntries) { + List> futures = new ArrayList<>(tsKvEntries.size()); for (TsKvEntry tsKvEntry : tsKvEntries) { futures.add(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry)); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java index 7a5904eb6b..f774f18dc7 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java @@ -100,7 +100,7 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes } @Override - public ListenableFuture saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { + public ListenableFuture saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { BoundStatementBuilder stmtBuilder = new BoundStatementBuilder(getLatestStmt().bind()); stmtBuilder.setString(0, entityId.getEntityType().name()) .setUuid(1, entityId.getId()) diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java index d339f49e11..9f62fd033a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java @@ -42,7 +42,7 @@ public interface TimeseriesLatestDao { ListenableFuture> findAllLatest(TenantId tenantId, EntityId entityId); - ListenableFuture saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry); + ListenableFuture saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry); ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query); diff --git a/dao/src/main/resources/sql/schema-entities.sql b/dao/src/main/resources/sql/schema-entities.sql index b92c27574c..a26262970c 100644 --- a/dao/src/main/resources/sql/schema-entities.sql +++ b/dao/src/main/resources/sql/schema-entities.sql @@ -102,6 +102,8 @@ CREATE TABLE IF NOT EXISTS audit_log ( action_failure_details varchar(1000000) ) PARTITION BY RANGE (created_time); +CREATE SEQUENCE IF NOT EXISTS attribute_kv_latest_seq cache 1000; + CREATE TABLE IF NOT EXISTS attribute_kv ( entity_id uuid, attribute_type int, @@ -112,6 +114,7 @@ CREATE TABLE IF NOT EXISTS attribute_kv ( dbl_v double precision, json_v json, last_update_ts bigint, + seq_number bigint, CONSTRAINT attribute_kv_pkey PRIMARY KEY (entity_id, attribute_type, attribute_key) );