From d85737b84b636d5e4b92dd727bd53af500002e47 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Mon, 3 Oct 2022 18:36:52 +0300 Subject: [PATCH 1/7] TTL and table partitioning for audit logs --- .../main/data/upgrade/3.4.1/schema_update.sql | 68 +++++++++ .../install/ThingsboardInstallService.java | 4 + .../install/SqlDatabaseUpgradeService.java | 32 +++-- .../update/DefaultDataUpdateService.java | 24 +++- .../service/ttl/AuditLogsCleanUpService.java | 63 +++++++++ application/src/main/resources/logback.xml | 1 + .../src/main/resources/thingsboard.yml | 6 + .../common/util/ThrowingConsumer.java | 22 +++ .../server/dao/audit/AuditLogDao.java | 5 + ...paAbstractDaoListeningExecutorService.java | 4 + .../server/dao/sql/audit/JpaAuditLogDao.java | 63 ++++++++- .../server/dao/sql/event/JpaBaseEventDao.java | 56 ++------ .../sql/event/SqlEventCleanupRepository.java | 119 ++-------------- .../insert/sql/SqlPartitioningRepository.java | 130 +++++++++++++++++- .../resources/sql/schema-entities-idx.sql | 2 +- .../main/resources/sql/schema-entities.sql | 4 +- 16 files changed, 430 insertions(+), 173 deletions(-) create mode 100644 application/src/main/data/upgrade/3.4.1/schema_update.sql create mode 100644 application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java create mode 100644 common/util/src/main/java/org/thingsboard/common/util/ThrowingConsumer.java diff --git a/application/src/main/data/upgrade/3.4.1/schema_update.sql b/application/src/main/data/upgrade/3.4.1/schema_update.sql new file mode 100644 index 0000000000..1b75a1c1e1 --- /dev/null +++ b/application/src/main/data/upgrade/3.4.1/schema_update.sql @@ -0,0 +1,68 @@ +-- +-- Copyright © 2016-2022 The Thingsboard Authors +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +CREATE TABLE IF NOT EXISTS tmp_audit_log ( + id uuid NOT NULL, + created_time bigint NOT NULL, + tenant_id uuid, + customer_id uuid, + entity_id uuid, + entity_type varchar(255), + entity_name varchar(255), + user_id uuid, + user_name varchar(255), + action_type varchar(255), + action_data varchar(1000000), + action_status varchar(255), + action_failure_details varchar(1000000) +) PARTITION BY RANGE (created_time); +CREATE INDEX IF NOT EXISTS idx_tmp_audit_log_tenant_id_and_created_time ON tmp_audit_log(tenant_id, created_time DESC); + +CREATE OR REPLACE PROCEDURE rename_old_audit_logs_partitions() + LANGUAGE plpgsql AS +$$ +DECLARE + table_partition RECORD; +BEGIN + FOR table_partition IN SELECT tablename AS name, split_part(tablename, '_', 3) AS partition_ts + FROM pg_tables WHERE tablename LIKE 'audit_log_%' + LOOP + EXECUTE format('ALTER TABLE %s RENAME TO old_audit_log_%s', table_partition.name, table_partition.partition_ts); + END LOOP; +END; +$$; + +CREATE OR REPLACE PROCEDURE migrate_audit_logs(IN start_time_ms BIGINT, IN end_time_ms BIGINT, IN partition_size_ms BIGINT) + LANGUAGE plpgsql AS +$$ +DECLARE + p RECORD; + partition_end_ts BIGINT; +BEGIN + FOR p IN SELECT DISTINCT (created_time - created_time % partition_size_ms) AS partition_ts FROM audit_log + WHERE created_time >= start_time_ms AND created_time < end_time_ms + LOOP + partition_end_ts = p.partition_ts + partition_size_ms; + RAISE NOTICE '[audit_log] Partition to create : [%-%]', p.partition_ts, partition_end_ts; + EXECUTE format('CREATE TABLE IF NOT EXISTS audit_log_%s PARTITION OF tmp_audit_log ' || + 'FOR VALUES FROM ( %s ) TO ( %s )', p.partition_ts, p.partition_ts, partition_end_ts); + END LOOP; + + INSERT INTO tmp_audit_log + SELECT * FROM audit_log + WHERE created_time >= start_time_ms AND created_time < end_time_ms; +END; +$$; diff --git a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java index 2c3251834f..33a6d4bdd1 100644 --- a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java +++ b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java @@ -225,6 +225,10 @@ public class ThingsboardInstallService { log.info("Upgrading ThingsBoard from version 3.4.0 to 3.4.1 ..."); databaseEntitiesUpgradeService.upgradeDatabase("3.4.0"); dataUpdateService.updateData("3.4.0"); + case "3.4.1": + log.info("Upgrading ThingsBoard from version 3.4.1 to 3.4.2 ..."); + databaseEntitiesUpgradeService.upgradeDatabase("3.4.1"); + dataUpdateService.updateData("3.4.1"); log.info("Updating system data..."); systemDataLoaderService.updateSystemWidgets(); break; diff --git a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java index ea976b2dec..c3b697f002 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java @@ -15,8 +15,6 @@ */ package org.thingsboard.server.service.install; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -24,11 +22,9 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.ThrowingConsumer; import org.thingsboard.server.common.data.EntitySubtype; -import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.Tenant; -import org.thingsboard.server.common.data.TenantProfile; -import org.thingsboard.server.common.data.id.QueueId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; @@ -37,8 +33,6 @@ import org.thingsboard.server.common.data.queue.ProcessingStrategyType; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.data.queue.SubmitStrategy; import org.thingsboard.server.common.data.queue.SubmitStrategyType; -import org.thingsboard.server.common.data.rule.RuleNode; -import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration; import org.thingsboard.server.dao.dashboard.DashboardService; import org.thingsboard.server.dao.device.DeviceProfileService; import org.thingsboard.server.dao.device.DeviceService; @@ -61,11 +55,8 @@ import java.sql.SQLException; import java.sql.SQLSyntaxErrorException; import java.sql.SQLWarning; import java.sql.Statement; -import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import static org.thingsboard.server.service.install.DatabaseHelper.ADDITIONAL_INFO; import static org.thingsboard.server.service.install.DatabaseHelper.ASSIGNED_CUSTOMERS; @@ -609,11 +600,32 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService log.error("Failed updating schema!!!", e); } break; + case "3.4.1": + execute(connection -> { + log.info("Updating schema ..."); + runSchemaUpdateScript(connection, "3.4.1"); + connection.createStatement().execute("UPDATE tb_schema_settings SET schema_version = 3004002;"); + log.info("Schema updated."); + }); + break; default: throw new RuntimeException("Unable to upgrade SQL database, unsupported fromVersion: " + fromVersion); } } + private void execute(ThrowingConsumer function) { + try (Connection connection = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) { + function.accept(connection); + } catch (Exception e) { + log.error("Failed to update schema!", e); + } + } + + private void runSchemaUpdateScript(Connection connection, String version) throws Exception { + Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", version, SCHEMA_UPDATE_SQL); + loadSql(schemaUpdateFile, connection); + } + private void loadSql(Path sqlFile, Connection conn) throws Exception { String sql = new String(Files.readAllBytes(sqlFile), Charset.forName("UTF-8")); Statement st = conn.createStatement(); diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java index d02f50b37b..b307489de6 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java @@ -63,6 +63,7 @@ import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.alarm.AlarmDao; +import org.thingsboard.server.dao.audit.AuditLogDao; import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.event.EventService; @@ -138,6 +139,9 @@ public class DefaultDataUpdateService implements DataUpdateService { @Autowired private EventService eventService; + @Autowired + private AuditLogDao auditLogDao; + @Override public void updateData(String fromVersion) throws Exception { switch (fromVersion) { @@ -170,12 +174,19 @@ public class DefaultDataUpdateService implements DataUpdateService { rateLimitsUpdater.updateEntities(); break; case "3.4.0": - String skipEventsMigration = System.getenv("TB_SKIP_EVENTS_MIGRATION"); - if (skipEventsMigration == null || skipEventsMigration.equalsIgnoreCase("false")) { + boolean skipEventsMigration = getEnv("TB_SKIP_EVENTS_MIGRATION", false); + if (!skipEventsMigration) { log.info("Updating data from version 3.4.0 to 3.4.1 ..."); eventService.migrateEvents(); } break; + case "3.4.1": + boolean skipAuditLogsMigration = getEnv("TB_SKIP_AUDIT_LOGS_MIGRATION", false); + if (!skipAuditLogsMigration) { + log.info("Updating data from version 3.4.1 to 3.4.2 ..."); + auditLogDao.migrateAuditLogs(); + } + break; default: throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion); } @@ -645,4 +656,13 @@ public class DefaultDataUpdateService implements DataUpdateService { return mainQueueConfiguration; } + private boolean getEnv(String name, boolean defaultValue) { + String env = System.getenv(name); + if (env == null) { + return defaultValue; + } else { + return Boolean.parseBoolean(env); + } + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java new file mode 100644 index 0000000000..9df77be0d2 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java @@ -0,0 +1,63 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.ttl; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import org.thingsboard.server.dao.audit.AuditLogDao; +import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository; +import org.thingsboard.server.queue.discovery.PartitionService; + +import java.util.concurrent.TimeUnit; + +import static org.thingsboard.server.dao.model.ModelConstants.AUDIT_LOG_COLUMN_FAMILY_NAME; + +@Service +@ConditionalOnProperty(name = "sql.ttl.audit_logs.enabled", havingValue = "true") +@ConditionalOnExpression("${sql.ttl.audit_logs.ttl:0} > 0") +@Slf4j +public class AuditLogsCleanUpService extends AbstractCleanUpService { + + private final AuditLogDao auditLogDao; + private final SqlPartitioningRepository partitioningRepository; + + @Value("${sql.ttl.audit_logs.ttl:0}") + private long ttlInSec; + @Value("${sql.audit_logs.partition_size:168}") + private int partitionSizeInHours; + + public AuditLogsCleanUpService(PartitionService partitionService, AuditLogDao auditLogDao, SqlPartitioningRepository partitioningRepository) { + super(partitionService); + this.auditLogDao = auditLogDao; + this.partitioningRepository = partitioningRepository; + } + + @Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.audit_logs.execution_interval_ms})}", + fixedDelayString = "${sql.ttl.audit_logs.execution_interval_ms}") + public void cleanUp() { + long auditLogsExpTime = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(ttlInSec); + if (isSystemTenantPartitionMine()) { + auditLogDao.cleanUpAuditLogs(auditLogsExpTime); + } else { + partitioningRepository.cleanupPartitionsCache(AUDIT_LOG_COLUMN_FAMILY_NAME, auditLogsExpTime, TimeUnit.HOURS.toMillis(partitionSizeInHours)); + } + } + +} diff --git a/application/src/main/resources/logback.xml b/application/src/main/resources/logback.xml index 941bd7d278..0bf081b35a 100644 --- a/application/src/main/resources/logback.xml +++ b/application/src/main/resources/logback.xml @@ -28,6 +28,7 @@ + diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 9edd5d0ed5..da76dae28a 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -265,6 +265,8 @@ sql: batch_size: "${SQL_EDGE_EVENTS_BATCH_SIZE:1000}" batch_max_delay: "${SQL_EDGE_EVENTS_BATCH_MAX_DELAY_MS:100}" stats_print_interval_ms: "${SQL_EDGE_EVENTS_BATCH_STATS_PRINT_MS:10000}" + audit_logs: + partition_size: "${SQL_AUDIT_LOGS_PARTITION_SIZE_HOURS:168}" # Default value - 1 week # Specify whether to sort entities before batch update. Should be enabled for cluster mode to avoid deadlocks batch_sort: "${SQL_BATCH_SORT:false}" # Specify whether to remove null characters from strValue of attributes and timeseries before insert @@ -301,6 +303,10 @@ sql: rpc: enabled: "${SQL_TTL_RPC_ENABLED:true}" checking_interval: "${SQL_RPC_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours + audit_logs: + enabled: "${SQL_TTL_AUDIT_LOGS_ENABLED:true}" + ttl: "${SQL_TTL_AUDIT_LOGS_SECS:0}" # Disabled by default. Accuracy of the cleanup depends on the sql.audit_logs.partition_size + execution_interval_ms: "${SQL_TTL_AUDIT_LOGS_EXECUTION_INTERVAL_MS:86400000}" # Default value - 1 day relations: max_level: "${SQL_RELATIONS_MAX_LEVEL:50}" # //This value has to be reasonable small to prevent infinite recursion as early as possible diff --git a/common/util/src/main/java/org/thingsboard/common/util/ThrowingConsumer.java b/common/util/src/main/java/org/thingsboard/common/util/ThrowingConsumer.java new file mode 100644 index 0000000000..ad067e48c6 --- /dev/null +++ b/common/util/src/main/java/org/thingsboard/common/util/ThrowingConsumer.java @@ -0,0 +1,22 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.common.util; + +public interface ThrowingConsumer { + + void accept(T t) throws Exception; + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogDao.java b/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogDao.java index 939bc654b6..3f9b1ca01a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogDao.java @@ -39,4 +39,9 @@ public interface AuditLogDao extends Dao { PageData findAuditLogsByTenantIdAndUserId(UUID tenantId, UserId userId, List actionTypes, TimePageLink pageLink); PageData findAuditLogsByTenantId(UUID tenantId, List actionTypes, TimePageLink pageLink); + + void cleanUpAuditLogs(long expTime); + + void migrateAuditLogs(); + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java index d81e1a1323..b23be1a9cf 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java @@ -17,6 +17,7 @@ package org.thingsboard.server.dao.sql; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.JdbcTemplate; import javax.sql.DataSource; import java.sql.SQLException; @@ -32,6 +33,9 @@ public abstract class JpaAbstractDaoListeningExecutorService { @Autowired protected DataSource dataSource; + @Autowired + protected JdbcTemplate jdbcTemplate; + protected void printWarnings(Statement statement) throws SQLException { SQLWarning warnings = statement.getWarnings(); if (warnings != null) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java index 9c72bec397..94e9e6718f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java @@ -16,8 +16,11 @@ package org.thingsboard.server.dao.sql.audit; import com.google.common.util.concurrent.ListenableFuture; -import org.springframework.beans.factory.annotation.Autowired; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.audit.AuditLog; @@ -28,18 +31,31 @@ import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.audit.AuditLogDao; +import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.model.sql.AuditLogEntity; import org.thingsboard.server.dao.sql.JpaAbstractDao; +import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository; import java.util.List; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.TimeUnit; @Component +@RequiredArgsConstructor +@Slf4j public class JpaAuditLogDao extends JpaAbstractDao implements AuditLogDao { - @Autowired - private AuditLogRepository auditLogRepository; + private final AuditLogRepository auditLogRepository; + private final SqlPartitioningRepository partitioningRepository; + private final JdbcTemplate jdbcTemplate; + + @Value("${sql.audit_logs.partition_size:168}") + private int partitionSizeInHours; + @Value("${sql.ttl.audit_logs.ttl:0}") + private long ttlInSec; + + private static final String TABLE_NAME = ModelConstants.AUDIT_LOG_COLUMN_FAMILY_NAME; @Override protected Class getEntityClass() { @@ -54,6 +70,7 @@ public class JpaAuditLogDao extends JpaAbstractDao imp @Override public ListenableFuture saveByTenantId(AuditLog auditLog) { return service.submit(() -> { + partitioningRepository.createPartitionIfNotExists(TABLE_NAME, auditLog.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours)); save(auditLog.getTenantId(), auditLog); return null; }); @@ -113,4 +130,44 @@ public class JpaAuditLogDao extends JpaAbstractDao imp actionTypes, DaoUtil.toPageable(pageLink))); } + + @Override + public void cleanUpAuditLogs(long expTime) { + partitioningRepository.dropPartitionsBefore(TABLE_NAME, expTime, TimeUnit.HOURS.toMillis(partitionSizeInHours)); + } + + @Override + public void migrateAuditLogs() { + long startTime = ttlInSec > 0 ? System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(ttlInSec) : 1480982400000L; + + long currentTime = System.currentTimeMillis(); + var partitionStepInMs = TimeUnit.HOURS.toMillis(partitionSizeInHours); + long numberOfPartitions = (currentTime - startTime) / partitionStepInMs; + + if (numberOfPartitions > 1000) { + String error = "Please adjust your audit logs partitioning configuration. Configuration with partition size " + + "of " + partitionSizeInHours + " hours and corresponding TTL will use " + numberOfPartitions + " " + + "(> 1000) partitions which is not recommended!"; + log.error(error); + throw new RuntimeException(error); + } + + jdbcTemplate.execute("CALL rename_old_audit_logs_partitions()"); + while (startTime < currentTime) { + var endTime = startTime + partitionStepInMs; + log.info("Migrating audit logs for time period: {} - {}", startTime, endTime); + callMigrationFunction(startTime, endTime, partitionStepInMs); + startTime = endTime; + } + log.info("Audit logs migration finished"); + + jdbcTemplate.execute("DROP TABLE IF EXISTS audit_log"); + jdbcTemplate.execute("ALTER TABLE tmp_audit_log RENAME TO audit_log"); + jdbcTemplate.execute("ALTER INDEX IF EXISTS idx_tmp_audit_log_tenant_id_and_created_time RENAME TO idx_audit_log_tenant_id_and_created_time"); + } + + private void callMigrationFunction(long startTime, long endTime, long partitionSizeInMs) { + jdbcTemplate.update("CALL migrate_audit_logs(?, ?, ?)", startTime, endTime, partitionSizeInMs); + } + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java index a9a3e25db2..902184eb13 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java @@ -18,10 +18,8 @@ package org.thingsboard.server.dao.sql.event; import com.datastax.oss.driver.api.core.uuid.Uuids; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; -import org.hibernate.exception.ConstraintViolationException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.dao.DataIntegrityViolationException; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.event.ErrorEventFilter; @@ -43,7 +41,6 @@ import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper; import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository; -import org.thingsboard.server.dao.timeseries.SqlPartition; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -53,7 +50,6 @@ import java.util.Map; import java.util.Objects; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; /** @@ -63,9 +59,6 @@ import java.util.function.Function; @Component public class JpaBaseEventDao implements EventDao { - private final Map> partitionsByEventType = new ConcurrentHashMap<>(); - private static final ReentrantLock partitionCreationLock = new ReentrantLock(); - @Autowired private EventPartitionConfiguration partitionConfiguration; @@ -120,9 +113,6 @@ public class JpaBaseEventDao implements EventDao { @PostConstruct private void init() { - for (EventType eventType : EventType.values()) { - partitionsByEventType.put(eventType, new ConcurrentHashMap<>()); - } TbSqlBlockingQueueParams params = TbSqlBlockingQueueParams.builder() .logName("Events") .batchSize(batchSize) @@ -163,42 +153,11 @@ public class JpaBaseEventDao implements EventDao { event.setCreatedTime(System.currentTimeMillis()); } } - savePartitionIfNotExist(event); + partitioningRepository.createPartitionIfNotExists(event.getType().getTable(), event.getCreatedTime(), + partitionConfiguration.getPartitionSizeInMs(event.getType())); return queue.add(event); } - private void savePartitionIfNotExist(Event event) { - EventType type = event.getType(); - var partitionsMap = partitionsByEventType.get(type); - var partitionDuration = partitionConfiguration.getPartitionSizeInMs(type); - long partitionStartTs = event.getCreatedTime() - (event.getCreatedTime() % partitionDuration); - if (partitionsMap.get(partitionStartTs) == null) { - savePartition(partitionsMap, new SqlPartition(type.getTable(), partitionStartTs, partitionStartTs + partitionDuration, Long.toString(partitionStartTs))); - } - } - - private void savePartition(Map partitionsMap, SqlPartition sqlPartition) { - if (!partitionsMap.containsKey(sqlPartition.getStart())) { - partitionCreationLock.lock(); - try { - log.trace("Saving partition: {}", sqlPartition); - partitioningRepository.save(sqlPartition); - log.trace("Adding partition to map: {}", sqlPartition); - partitionsMap.put(sqlPartition.getStart(), sqlPartition); - } catch (DataIntegrityViolationException ex) { - log.trace("Error occurred during partition save:", ex); - if (ex.getCause() instanceof ConstraintViolationException) { - log.warn("Saving partition [{}] rejected. Event data will save to the DEFAULT partition.", sqlPartition.getPartitionDate()); - partitionsMap.put(sqlPartition.getStart(), sqlPartition); - } else { - throw new RuntimeException(ex); - } - } finally { - partitionCreationLock.unlock(); - } - } - } - @Override public PageData findEvents(UUID tenantId, UUID entityId, EventType eventType, TimePageLink pageLink) { return DaoUtil.toPageData(getEventRepository(eventType).findEvents(tenantId, entityId, pageLink.getStartTime(), pageLink.getEndTime(), DaoUtil.toPageable(pageLink, EventEntity.eventColumnMap))); @@ -436,23 +395,24 @@ public class JpaBaseEventDao implements EventDao { log.info("Going to cleanup regular events with exp time: {}", regularEventExpTs); if (cleanupDb) { eventCleanupRepository.cleanupEvents(regularEventExpTs, false); + } else { + cleanupPartitionsCache(regularEventExpTs, false); } - cleanupPartitions(regularEventExpTs, false); } if (debugEventExpTs > 0) { log.info("Going to cleanup debug events with exp time: {}", debugEventExpTs); if (cleanupDb) { eventCleanupRepository.cleanupEvents(debugEventExpTs, true); + } else { + cleanupPartitionsCache(debugEventExpTs, true); } - cleanupPartitions(debugEventExpTs, true); } } - private void cleanupPartitions(long expTime, boolean isDebug) { + private void cleanupPartitionsCache(long expTime, boolean isDebug) { for (EventType eventType : EventType.values()) { if (eventType.isDebug() == isDebug) { - Map partitions = partitionsByEventType.get(eventType); - partitions.keySet().removeIf(startTs -> startTs + partitionConfiguration.getPartitionSizeInMs(eventType) < expTime); + partitioningRepository.cleanupPartitionsCache(eventType.getTable(), expTime, partitionConfiguration.getPartitionSizeInMs(eventType)); } } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/event/SqlEventCleanupRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/event/SqlEventCleanupRepository.java index 53786c448b..57073d34c5 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/event/SqlEventCleanupRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/event/SqlEventCleanupRepository.java @@ -17,16 +17,12 @@ package org.thingsboard.server.dao.sql.event; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.DataAccessException; import org.springframework.stereotype.Repository; import org.thingsboard.server.common.data.event.EventType; import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService; +import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.TimeUnit; @@ -34,13 +30,10 @@ import java.util.concurrent.TimeUnit; @Repository public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorService implements EventCleanupRepository { - private static final String SELECT_PARTITIONS_STMT = "SELECT tablename from pg_tables WHERE schemaname = 'public' and tablename like concat(?, '_%')"; - private static final int PSQL_VERSION_14 = 140000; - @Autowired private EventPartitionConfiguration partitionConfiguration; - - private volatile Integer currentServerVersion; + @Autowired + private SqlPartitioningRepository partitioningRepository; @Override public void cleanupEvents(long eventExpTime, boolean debug) { @@ -59,16 +52,13 @@ public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorSe callMigrateFunctionByPartitions("regular", "migrate_regular_events", regularEventTs, partitionConfiguration.getRegularPartitionSizeInHours()); callMigrateFunctionByPartitions("debug", "migrate_debug_events", debugEventTs, partitionConfiguration.getDebugPartitionSizeInHours()); - try (Connection connection = dataSource.getConnection(); - PreparedStatement dropFunction1 = connection.prepareStatement("DROP PROCEDURE IF EXISTS migrate_regular_events(bigint, bigint, int)"); - PreparedStatement dropFunction2 = connection.prepareStatement("DROP PROCEDURE IF EXISTS migrate_debug_events(bigint, bigint, int)"); - PreparedStatement dropTable = connection.prepareStatement("DROP TABLE IF EXISTS event")) { - dropFunction1.execute(); - dropFunction2.execute(); - dropTable.execute(); - } catch (SQLException e) { - log.error("SQLException occurred during drop of the `events` table", e); - throw new RuntimeException(e); + try { + jdbcTemplate.execute("DROP PROCEDURE IF EXISTS migrate_regular_events(bigint, bigint, int)"); + jdbcTemplate.execute("DROP PROCEDURE IF EXISTS migrate_debug_events(bigint, bigint, int)"); + jdbcTemplate.execute("DROP TABLE IF EXISTS event"); + } catch (DataAccessException e) { + log.error("Error occurred during drop of the `events` table", e); + throw e; } } @@ -94,13 +84,9 @@ public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorSe } private void callMigrateFunction(String functionName, long startTs, long endTs, int partitionSizeInHours) { - try (Connection connection = dataSource.getConnection(); - PreparedStatement stmt = connection.prepareStatement("call " + functionName + "(?,?,?)")) { - stmt.setLong(1, startTs); - stmt.setLong(2, endTs); - stmt.setInt(3, partitionSizeInHours); - stmt.execute(); - } catch (SQLException e) { + try { + jdbcTemplate.update("CALL " + functionName + "(?, ?, ?)", startTs, endTs, partitionSizeInHours); + } catch (DataAccessException e) { if (e.getMessage() == null || !e.getMessage().contains("relation \"event\" does not exist")) { log.error("[{}] SQLException occurred during execution of {} with parameters {} and {}", functionName, startTs, partitionSizeInHours, e); throw new RuntimeException(e); @@ -109,82 +95,7 @@ public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorSe } private void cleanupEvents(EventType eventType, long eventExpTime) { - var partitionDuration = partitionConfiguration.getPartitionSizeInMs(eventType); - List partitions = fetchPartitions(eventType); - for (var partitionTs : partitions) { - var partitionEndTs = partitionTs + partitionDuration; - if (partitionEndTs < eventExpTime) { - log.info("[{}] Detaching expired partition: [{}-{}]", eventType, partitionTs, partitionEndTs); - if (detachAndDropPartition(eventType, partitionTs)) { - log.info("[{}] Detached expired partition: {}", eventType, partitionTs); - } - } else { - log.debug("[{}] Skip valid partition: {}", eventType, partitionTs); - } - } - } - - private List fetchPartitions(EventType eventType) { - List partitions = new ArrayList<>(); - try (Connection connection = dataSource.getConnection(); - PreparedStatement stmt = connection.prepareStatement(SELECT_PARTITIONS_STMT)) { - stmt.setString(1, eventType.getTable()); - stmt.execute(); - try (ResultSet resultSet = stmt.getResultSet()) { - while (resultSet.next()) { - String partitionTableName = resultSet.getString(1); - String partitionTsStr = partitionTableName.substring(eventType.getTable().length() + 1); - try { - partitions.add(Long.parseLong(partitionTsStr)); - } catch (NumberFormatException nfe) { - log.warn("Failed to parse table name: {}", partitionTableName); - } - } - } - } catch (SQLException e) { - log.error("SQLException occurred during events TTL task execution ", e); - } - return partitions; - } - - private boolean detachAndDropPartition(EventType eventType, long partitionTs) { - String tablePartition = eventType.getTable() + "_" + partitionTs; - String detachPsqlStmtStr = "ALTER TABLE " + eventType.getTable() + " DETACH PARTITION " + tablePartition; - if (getCurrentServerVersion() >= PSQL_VERSION_14) { - detachPsqlStmtStr += " CONCURRENTLY"; - } - - String dropStmtStr = "DROP TABLE " + tablePartition; - try (Connection connection = dataSource.getConnection(); - PreparedStatement detachStmt = connection.prepareStatement(detachPsqlStmtStr); - PreparedStatement dropStmt = connection.prepareStatement(dropStmtStr)) { - detachStmt.execute(); - dropStmt.execute(); - return true; - } catch (SQLException e) { - log.error("[{}] SQLException occurred during detach and drop of the partition: {}", eventType, partitionTs, e); - } - return false; - } - - private synchronized int getCurrentServerVersion() { - if (currentServerVersion == null) { - try (Connection connection = dataSource.getConnection(); - PreparedStatement versionStmt = connection.prepareStatement("SELECT current_setting('server_version_num')")) { - versionStmt.execute(); - try (ResultSet resultSet = versionStmt.getResultSet()) { - while (resultSet.next()) { - currentServerVersion = resultSet.getInt(1); - } - } - } catch (SQLException e) { - log.warn("SQLException occurred during fetch of the server version", e); - } - if (currentServerVersion == null) { - currentServerVersion = 0; - } - } - return currentServerVersion; + partitioningRepository.dropPartitionsBefore(eventType.getTable(), eventExpTime, partitionConfiguration.getPartitionSizeInMs(eventType)); } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java index 805e5e4f4f..988b03f7fd 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java @@ -15,23 +15,147 @@ */ package org.thingsboard.server.dao.sqlts.insert.sql; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.hibernate.exception.ConstraintViolationException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.DataAccessException; +import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.dao.timeseries.SqlPartition; -import org.thingsboard.server.dao.util.SqlTsDao; import javax.persistence.EntityManager; import javax.persistence.PersistenceContext; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; @Repository -@Transactional +@Slf4j public class SqlPartitioningRepository { - + // todo: check INSTALL , compare to events @PersistenceContext private EntityManager entityManager; + @Autowired + private JdbcTemplate jdbcTemplate; + + private static final String SELECT_PARTITIONS_STMT = "SELECT tablename from pg_tables WHERE schemaname = 'public' and tablename like concat(?, '_%')"; + + private static final int PSQL_VERSION_14 = 140000; + private volatile Integer currentServerVersion; + + private final Map> tablesPartitions = new ConcurrentHashMap<>(); + private final ReentrantLock partitionCreationLock = new ReentrantLock(); + + @Transactional public void save(SqlPartition partition) { entityManager.createNativeQuery(partition.getQuery()).executeUpdate(); } + @Transactional + public void createPartitionIfNotExists(String table, long entityTs, long partitionDurationMs) { + long partitionStartTs = calculatePartitionStartTime(entityTs, partitionDurationMs); + Map partitions = tablesPartitions.computeIfAbsent(table, t -> new ConcurrentHashMap<>()); + if (!partitions.containsKey(partitionStartTs)) { + SqlPartition partition = new SqlPartition(table, partitionStartTs, partitionStartTs + partitionDurationMs, Long.toString(partitionStartTs)); + partitionCreationLock.lock(); + try { + log.trace("Saving partition: {}", partition); + save(partition); + log.trace("Adding partition to map: {}", partition); + partitions.put(partition.getStart(), partition); + } catch (Exception ex) { // fixme: check + log.trace("Error occurred during partition save:", ex); +// if (ExceptionUtils.indexOfThrowable(ex, ConstraintViolationException.class) >= 0) { +// log.warn("Saving partition [{}] rejected. Data will be saved to the DEFAULT partition.", partition.getPartitionDate()); +// partitions.put(partition.getStart(), partition); +// } else { +// throw ex; +// } + } finally { + partitionCreationLock.unlock(); + } + } + } + + public void dropPartitionsBefore(String table, long ts, long partitionDurationMs) { + List partitions = fetchPartitions(table); + for (Long partitionStartTime : partitions) { + long partitionEndTime = partitionStartTime + partitionDurationMs; + if (partitionEndTime < ts) { + log.info("[{}] Detaching expired partition: [{}-{}]", table, partitionStartTime, partitionEndTime); + boolean success = detachAndDropPartition(table, partitionStartTime); + if (success) { + log.info("[{}] Detached expired partition: {}", table, partitionStartTime); + } + } else { + log.debug("[{}] Skipping valid partition: {}", table, partitionStartTime); + } + } + } + + public void cleanupPartitionsCache(String table, long expTime, long partitionDurationMs) { + Map partitions = tablesPartitions.get(table); + if (partitions == null) return; + partitions.keySet().removeIf(startTime -> (startTime + partitionDurationMs) < expTime); + } + + private boolean detachAndDropPartition(String table, long partitionTs) { + Map cachedPartitions = tablesPartitions.get(table); + if (cachedPartitions != null) cachedPartitions.remove(partitionTs); + + String tablePartition = table + "_" + partitionTs; + String detachPsqlStmtStr = "ALTER TABLE " + table + " DETACH PARTITION " + tablePartition; + if (getCurrentServerVersion() >= PSQL_VERSION_14) { + detachPsqlStmtStr += " CONCURRENTLY"; + } + + String dropStmtStr = "DROP TABLE " + tablePartition; + try { + jdbcTemplate.execute(detachPsqlStmtStr); + jdbcTemplate.execute(dropStmtStr); + return true; + } catch (DataAccessException e) { + log.error("[{}] Error occurred trying to detach and drop the partition {} ", table, partitionTs, e); + } + return false; + } + + private List fetchPartitions(String table) { + // todo: test + List partitions = new ArrayList<>(); + List partitionsTables = jdbcTemplate.queryForList(SELECT_PARTITIONS_STMT, new Object[]{table}, String.class); + for (String partitionTableName : partitionsTables) { + String partitionTsStr = partitionTableName.substring(table.length() + 1); + try { + partitions.add(Long.parseLong(partitionTsStr)); + } catch (NumberFormatException nfe) { + log.warn("Failed to parse table name: {}", partitionTableName); + } + } + return partitions; + } + + private long calculatePartitionStartTime(long ts, long partitionDuration) { + return ts - (ts % partitionDuration); + } + + private synchronized int getCurrentServerVersion() { + if (currentServerVersion == null) { + try { + currentServerVersion = jdbcTemplate.queryForObject("SELECT current_setting('server_version_num')", Integer.class); + } catch (Exception e) { + log.warn("Error occurred during fetch of the server version", e); + } + if (currentServerVersion == null) { + currentServerVersion = 0; + } + } + return currentServerVersion; + } + } diff --git a/dao/src/main/resources/sql/schema-entities-idx.sql b/dao/src/main/resources/sql/schema-entities-idx.sql index 80a5b03b92..34862e5af3 100644 --- a/dao/src/main/resources/sql/schema-entities-idx.sql +++ b/dao/src/main/resources/sql/schema-entities-idx.sql @@ -48,7 +48,7 @@ CREATE INDEX IF NOT EXISTS idx_asset_type ON asset(tenant_id, type); CREATE INDEX IF NOT EXISTS idx_attribute_kv_by_key_and_last_update_ts ON attribute_kv(entity_id, attribute_key, last_update_ts desc); -CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time); +CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time DESC); CREATE INDEX IF NOT EXISTS idx_rpc_tenant_id_device_id ON rpc(tenant_id, device_id); diff --git a/dao/src/main/resources/sql/schema-entities.sql b/dao/src/main/resources/sql/schema-entities.sql index e93221508e..8892f4e3f0 100644 --- a/dao/src/main/resources/sql/schema-entities.sql +++ b/dao/src/main/resources/sql/schema-entities.sql @@ -89,7 +89,7 @@ CREATE TABLE IF NOT EXISTS asset ( ); CREATE TABLE IF NOT EXISTS audit_log ( - id uuid NOT NULL CONSTRAINT audit_log_pkey PRIMARY KEY, + id uuid NOT NULL, created_time bigint NOT NULL, tenant_id uuid, customer_id uuid, @@ -102,7 +102,7 @@ CREATE TABLE IF NOT EXISTS audit_log ( action_data varchar(1000000), action_status varchar(255), action_failure_details varchar(1000000) -); +) PARTITION BY RANGE (created_time); CREATE TABLE IF NOT EXISTS attribute_kv ( entity_type varchar(255), From c7c8ea0105c109e9a1d2076750838df3399c6a6d Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 4 Oct 2022 10:45:43 +0300 Subject: [PATCH 2/7] Fix partition overlap error handling in SqlPartitioningRepository --- .../main/data/upgrade/3.4.1/schema_update.sql | 46 +++++++++++-------- application/src/main/resources/logback.xml | 1 - .../server/dao/sql/audit/JpaAuditLogDao.java | 5 +- .../insert/sql/SqlPartitioningRepository.java | 22 ++++----- 4 files changed, 37 insertions(+), 37 deletions(-) diff --git a/application/src/main/data/upgrade/3.4.1/schema_update.sql b/application/src/main/data/upgrade/3.4.1/schema_update.sql index 1b75a1c1e1..79cde8167f 100644 --- a/application/src/main/data/upgrade/3.4.1/schema_update.sql +++ b/application/src/main/data/upgrade/3.4.1/schema_update.sql @@ -14,7 +14,27 @@ -- limitations under the License. -- -CREATE TABLE IF NOT EXISTS tmp_audit_log ( +DO +$$ + DECLARE table_partition RECORD; + BEGIN + -- in case of running the upgrade script a second time: + IF NOT (SELECT exists(SELECT FROM pg_tables WHERE tablename = 'old_audit_log')) THEN + ALTER TABLE audit_log RENAME TO old_audit_log; + ALTER INDEX IF EXISTS idx_audit_log_tenant_id_and_created_time RENAME TO idx_old_audit_log_tenant_id_and_created_time; + + FOR table_partition IN SELECT tablename AS name, split_part(tablename, '_', 3) AS partition_ts + FROM pg_tables WHERE tablename LIKE 'audit_log_%' + LOOP + EXECUTE format('ALTER TABLE %s RENAME TO old_audit_log_%s', table_partition.name, table_partition.partition_ts); + END LOOP; + ELSE + RAISE NOTICE 'Table old_audit_log already exists, leaving as is'; + END IF; + END; +$$; + +CREATE TABLE IF NOT EXISTS audit_log ( id uuid NOT NULL, created_time bigint NOT NULL, tenant_id uuid, @@ -29,21 +49,7 @@ CREATE TABLE IF NOT EXISTS tmp_audit_log ( action_status varchar(255), action_failure_details varchar(1000000) ) PARTITION BY RANGE (created_time); -CREATE INDEX IF NOT EXISTS idx_tmp_audit_log_tenant_id_and_created_time ON tmp_audit_log(tenant_id, created_time DESC); - -CREATE OR REPLACE PROCEDURE rename_old_audit_logs_partitions() - LANGUAGE plpgsql AS -$$ -DECLARE - table_partition RECORD; -BEGIN - FOR table_partition IN SELECT tablename AS name, split_part(tablename, '_', 3) AS partition_ts - FROM pg_tables WHERE tablename LIKE 'audit_log_%' - LOOP - EXECUTE format('ALTER TABLE %s RENAME TO old_audit_log_%s', table_partition.name, table_partition.partition_ts); - END LOOP; -END; -$$; +CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time DESC); CREATE OR REPLACE PROCEDURE migrate_audit_logs(IN start_time_ms BIGINT, IN end_time_ms BIGINT, IN partition_size_ms BIGINT) LANGUAGE plpgsql AS @@ -52,17 +58,17 @@ DECLARE p RECORD; partition_end_ts BIGINT; BEGIN - FOR p IN SELECT DISTINCT (created_time - created_time % partition_size_ms) AS partition_ts FROM audit_log + FOR p IN SELECT DISTINCT (created_time - created_time % partition_size_ms) AS partition_ts FROM old_audit_log WHERE created_time >= start_time_ms AND created_time < end_time_ms LOOP partition_end_ts = p.partition_ts + partition_size_ms; RAISE NOTICE '[audit_log] Partition to create : [%-%]', p.partition_ts, partition_end_ts; - EXECUTE format('CREATE TABLE IF NOT EXISTS audit_log_%s PARTITION OF tmp_audit_log ' || + EXECUTE format('CREATE TABLE IF NOT EXISTS audit_log_%s PARTITION OF audit_log ' || 'FOR VALUES FROM ( %s ) TO ( %s )', p.partition_ts, p.partition_ts, partition_end_ts); END LOOP; - INSERT INTO tmp_audit_log - SELECT * FROM audit_log + INSERT INTO audit_log + SELECT * FROM old_audit_log WHERE created_time >= start_time_ms AND created_time < end_time_ms; END; $$; diff --git a/application/src/main/resources/logback.xml b/application/src/main/resources/logback.xml index 0bf081b35a..941bd7d278 100644 --- a/application/src/main/resources/logback.xml +++ b/application/src/main/resources/logback.xml @@ -28,7 +28,6 @@ - diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java index a3ee6ea843..365dd82686 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java @@ -154,7 +154,6 @@ public class JpaAuditLogDao extends JpaAbstractDao imp throw new RuntimeException(error); } - jdbcTemplate.execute("CALL rename_old_audit_logs_partitions()"); while (startTime < currentTime) { var endTime = startTime + partitionStepInMs; log.info("Migrating audit logs for time period: {} - {}", startTime, endTime); @@ -163,9 +162,7 @@ public class JpaAuditLogDao extends JpaAbstractDao imp } log.info("Audit logs migration finished"); - jdbcTemplate.execute("DROP TABLE IF EXISTS audit_log"); - jdbcTemplate.execute("ALTER TABLE tmp_audit_log RENAME TO audit_log"); - jdbcTemplate.execute("ALTER INDEX IF EXISTS idx_tmp_audit_log_tenant_id_and_created_time RENAME TO idx_audit_log_tenant_id_and_created_time"); + jdbcTemplate.execute("DROP TABLE IF EXISTS old_audit_log"); } private void callMigrationFunction(long startTime, long endTime, long partitionSizeInMs) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java index 94097c590a..8083f17a5a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java @@ -17,7 +17,6 @@ package org.thingsboard.server.dao.sqlts.insert.sql; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.hibernate.exception.ConstraintViolationException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.JdbcTemplate; @@ -68,17 +67,16 @@ public class SqlPartitioningRepository { save(partition); log.trace("Adding partition to map: {}", partition); partitions.put(partition.getStart(), partition); - } catch (Exception ex) { // fixme: check - log.trace("Error occurred during partition save:", ex); - // todo: if partitions накладаються, потестити (ConstraintViolationException) - // todo: SKIP_MIGRATION тільки для того щоб не переносити данні, таблицю треба створити partitioned. - // fixme: update script -// if (ExceptionUtils.indexOfThrowable(ex, ConstraintViolationException.class) >= 0) { -// log.warn("Saving partition [{}] rejected. Data will be saved to the DEFAULT partition.", partition.getPartitionDate()); -// partitions.put(partition.getStart(), partition); -// } else { -// throw ex; -// } + } catch (RuntimeException e) { + log.trace("Error occurred during partition save:", e); + String msg = ExceptionUtils.getRootCauseMessage(e); + if (msg.contains("would overlap partition")) { + log.warn("Couldn't save {} partition for {}, data will be saved to the default partition. SQL error: {}", + partition.getPartitionDate(), table, msg); + partitions.put(partition.getStart(), partition); + } else { + throw e; + } } finally { partitionCreationLock.unlock(); } From 9e2e5f809e413624b50bfabeae9b8c94fb90f7dd Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 4 Oct 2022 11:03:10 +0300 Subject: [PATCH 3/7] Remove unneeded todos --- .../server/dao/sqlts/insert/sql/SqlPartitioningRepository.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java index 8083f17a5a..b194a666d2 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java @@ -35,7 +35,7 @@ import java.util.concurrent.locks.ReentrantLock; @Repository @Slf4j public class SqlPartitioningRepository { - // todo: check INSTALL , compare to events + @PersistenceContext private EntityManager entityManager; @@ -127,7 +127,6 @@ public class SqlPartitioningRepository { } private List fetchPartitions(String table) { - // todo: test List partitions = new ArrayList<>(); List partitionsTables = jdbcTemplate.queryForList(SELECT_PARTITIONS_STMT, new Object[]{table}, String.class); for (String partitionTableName : partitionsTables) { From 8694611f7a5a3fc07a1bd5c3440e58c6c4e075bc Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 4 Oct 2022 11:19:38 +0300 Subject: [PATCH 4/7] Fix save method of JpaAuditLogDao --- .../server/dao/sql/audit/JpaAuditLogDao.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java index 365dd82686..1eefb21add 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.dao.sql.audit; +import com.datastax.oss.driver.api.core.uuid.Uuids; import com.google.common.util.concurrent.ListenableFuture; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -24,8 +25,10 @@ import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.audit.AuditLog; +import org.thingsboard.server.common.data.id.AuditLogId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.TimePageLink; @@ -72,12 +75,22 @@ public class JpaAuditLogDao extends JpaAbstractDao imp @Override public ListenableFuture saveByTenantId(AuditLog auditLog) { return service.submit(() -> { - partitioningRepository.createPartitionIfNotExists(TABLE_NAME, auditLog.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours)); save(auditLog.getTenantId(), auditLog); return null; }); } + @Override + public AuditLog save(TenantId tenantId, AuditLog auditLog) { + if (auditLog.getId() == null) { + UUID uuid = Uuids.timeBased(); + auditLog.setId(new AuditLogId(uuid)); + auditLog.setCreatedTime(Uuids.unixTimestamp(uuid)); + } + partitioningRepository.createPartitionIfNotExists(TABLE_NAME, auditLog.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours)); + return super.save(tenantId, auditLog); + } + @Override public PageData findAuditLogsByTenantIdAndEntityId(UUID tenantId, EntityId entityId, List actionTypes, TimePageLink pageLink) { return DaoUtil.toPageData( From 7eea5a6cda7a6922f8874b31d68c1eafd60a0f03 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 4 Oct 2022 13:07:21 +0300 Subject: [PATCH 5/7] Minor refactoring --- .../service/install/update/DefaultDataUpdateService.java | 3 +++ .../server/service/ttl/AuditLogsCleanUpService.java | 4 ++-- application/src/main/resources/thingsboard.yml | 2 +- .../dao/sqlts/insert/sql/SqlPartitioningRepository.java | 1 + 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java index b307489de6..a8cf5374a2 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java @@ -184,7 +184,10 @@ public class DefaultDataUpdateService implements DataUpdateService { boolean skipAuditLogsMigration = getEnv("TB_SKIP_AUDIT_LOGS_MIGRATION", false); if (!skipAuditLogsMigration) { log.info("Updating data from version 3.4.1 to 3.4.2 ..."); + log.info("Starting audit logs migration. Can be skipped with TB_SKIP_AUDIT_LOGS_MIGRATION env variable set to true"); auditLogDao.migrateAuditLogs(); + } else { + log.info("Skipping audit logs migration"); } break; default: diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java index 9df77be0d2..54c4f6523e 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java @@ -49,8 +49,8 @@ public class AuditLogsCleanUpService extends AbstractCleanUpService { this.partitioningRepository = partitioningRepository; } - @Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.audit_logs.execution_interval_ms})}", - fixedDelayString = "${sql.ttl.audit_logs.execution_interval_ms}") + @Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.audit_logs.checking_interval_ms})}", + fixedDelayString = "${sql.ttl.audit_logs.checking_interval_ms}") public void cleanUp() { long auditLogsExpTime = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(ttlInSec); if (isSystemTenantPartitionMine()) { diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 96438470a0..cf13376946 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -308,7 +308,7 @@ sql: audit_logs: enabled: "${SQL_TTL_AUDIT_LOGS_ENABLED:true}" ttl: "${SQL_TTL_AUDIT_LOGS_SECS:0}" # Disabled by default. Accuracy of the cleanup depends on the sql.audit_logs.partition_size - execution_interval_ms: "${SQL_TTL_AUDIT_LOGS_EXECUTION_INTERVAL_MS:86400000}" # Default value - 1 day + checking_interval_ms: "${SQL_TTL_AUDIT_LOGS_CHECKING_INTERVAL_MS:86400000}" # Default value - 1 day relations: max_level: "${SQL_RELATIONS_MAX_LEVEL:50}" # //This value has to be reasonable small to prevent infinite recursion as early as possible diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java index b194a666d2..67b451ec2a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java @@ -63,6 +63,7 @@ public class SqlPartitioningRepository { SqlPartition partition = new SqlPartition(table, partitionStartTs, partitionStartTs + partitionDurationMs, Long.toString(partitionStartTs)); partitionCreationLock.lock(); try { + if (partitions.containsKey(partitionStartTs)) return; log.trace("Saving partition: {}", partition); save(partition); log.trace("Adding partition to map: {}", partition); From b4300b35bfae3298237ec577b364c3fea1102672 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 4 Oct 2022 15:03:05 +0300 Subject: [PATCH 6/7] Add tests for audit logs partitions --- .../BaseAuditLogControllerTest.java | 68 +++++++++++++++++++ .../resources/application-test.properties | 5 +- .../insert/sql/SqlPartitioningRepository.java | 4 +- 3 files changed, 74 insertions(+), 3 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/controller/BaseAuditLogControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/BaseAuditLogControllerTest.java index 5369d4d30a..c46f7d9c46 100644 --- a/application/src/test/java/org/thingsboard/server/controller/BaseAuditLogControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/BaseAuditLogControllerTest.java @@ -20,18 +20,33 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.mock.mockito.SpyBean; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.audit.AuditLog; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.data.security.Authority; +import org.thingsboard.server.dao.audit.AuditLogDao; import org.thingsboard.server.dao.model.ModelConstants; +import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository; +import org.thingsboard.server.service.ttl.AuditLogsCleanUpService; +import java.time.LocalDate; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; public abstract class BaseAuditLogControllerTest extends AbstractControllerTest { @@ -39,6 +54,18 @@ public abstract class BaseAuditLogControllerTest extends AbstractControllerTest private Tenant savedTenant; private User tenantAdmin; + @Autowired + private AuditLogDao auditLogDao; + @SpyBean + private SqlPartitioningRepository partitioningRepository; + @Autowired + private AuditLogsCleanUpService auditLogsCleanUpService; + + @Value("#{${sql.audit_logs.partition_size} * 60 * 60 * 1000}") + private long partitionDurationInMs; + @Value("${sql.ttl.audit_logs.ttl}") + private long auditLogsTtlInSec; + @Before public void beforeTest() throws Exception { loginSysAdmin(); @@ -145,4 +172,45 @@ public abstract class BaseAuditLogControllerTest extends AbstractControllerTest Assert.assertEquals(179, loadedAuditLogs.size()); } + + @Test + public void whenSavingNewAuditLog_thenCheckAndCreatePartitionIfNotExists() { + reset(partitioningRepository); + AuditLog auditLog = createAuditLog(ActionType.LOGIN, tenantAdminUserId); + verify(partitioningRepository).createPartitionIfNotExists(eq("audit_log"), eq(auditLog.getCreatedTime()), eq(partitionDurationInMs)); + + List partitions = partitioningRepository.fetchPartitions("audit_log"); + assertThat(partitions).singleElement().satisfies(partitionStartTs -> { + assertThat(partitionStartTs).isEqualTo(partitioningRepository.calculatePartitionStartTime(auditLog.getCreatedTime(), partitionDurationInMs)); + }); + } + + @Test + public void whenCleaningUpAuditLogsByTtl_thenDropOldPartitions() { + long oldAuditLogTs = LocalDate.of(2020, 10, 1).atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli(); + long partitionStartTs = partitioningRepository.calculatePartitionStartTime(oldAuditLogTs, partitionDurationInMs); + partitioningRepository.createPartitionIfNotExists("audit_log", oldAuditLogTs, partitionDurationInMs); + List partitions = partitioningRepository.fetchPartitions("audit_log"); + assertThat(partitions).contains(partitionStartTs); + + auditLogsCleanUpService.cleanUp(); + partitions = partitioningRepository.fetchPartitions("audit_log"); + assertThat(partitions).doesNotContain(partitionStartTs); + assertThat(partitions).allSatisfy(partitionsStart -> { + long partitionEndTs = partitionsStart + partitionDurationInMs; + assertThat(partitionEndTs).isGreaterThan(System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(auditLogsTtlInSec)); + }); + } + + private AuditLog createAuditLog(ActionType actionType, EntityId entityId) { + AuditLog auditLog = new AuditLog(); + auditLog.setTenantId(tenantId); + auditLog.setCustomerId(null); + auditLog.setUserId(tenantAdminUserId); + auditLog.setEntityId(entityId); + auditLog.setUserName(tenantAdmin.getEmail()); + auditLog.setActionType(actionType); + return auditLogDao.save(tenantId, auditLog); + } + } diff --git a/application/src/test/resources/application-test.properties b/application/src/test/resources/application-test.properties index 279d1e99be..eacd1733d3 100644 --- a/application/src/test/resources/application-test.properties +++ b/application/src/test/resources/application-test.properties @@ -56,4 +56,7 @@ queue.rule-engine.queues[2].processing-strategy.retries=1 queue.rule-engine.queues[2].processing-strategy.pause-between-retries=0 queue.rule-engine.queues[2].processing-strategy.max-pause-between-retries=0 -usage.stats.report.enabled=false \ No newline at end of file +usage.stats.report.enabled=false + +sql.audit_logs.partition_size=24 +sql.ttl.audit_logs.ttl=2592000 \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java index 67b451ec2a..9b58358e3f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java @@ -127,7 +127,7 @@ public class SqlPartitioningRepository { return false; } - private List fetchPartitions(String table) { + public List fetchPartitions(String table) { List partitions = new ArrayList<>(); List partitionsTables = jdbcTemplate.queryForList(SELECT_PARTITIONS_STMT, new Object[]{table}, String.class); for (String partitionTableName : partitionsTables) { @@ -141,7 +141,7 @@ public class SqlPartitioningRepository { return partitions; } - private long calculatePartitionStartTime(long ts, long partitionDuration) { + public long calculatePartitionStartTime(long ts, long partitionDuration) { return ts - (ts % partitionDuration); } From d2f67d33d176785f4d7414920983ae75e0469a58 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Wed, 12 Oct 2022 10:12:29 +0300 Subject: [PATCH 7/7] Minor refactoring for AuditLogsCleanUpService and SqlPartitioningRepository --- .../server/service/ttl/AuditLogsCleanUpService.java | 4 +--- .../dao/sqlts/insert/sql/SqlPartitioningRepository.java | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java index 54c4f6523e..11b1751c0d 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java @@ -18,7 +18,6 @@ package org.thingsboard.server.service.ttl; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.thingsboard.server.dao.audit.AuditLogDao; @@ -30,8 +29,7 @@ import java.util.concurrent.TimeUnit; import static org.thingsboard.server.dao.model.ModelConstants.AUDIT_LOG_COLUMN_FAMILY_NAME; @Service -@ConditionalOnProperty(name = "sql.ttl.audit_logs.enabled", havingValue = "true") -@ConditionalOnExpression("${sql.ttl.audit_logs.ttl:0} > 0") +@ConditionalOnExpression("${sql.ttl.audit_logs.enabled:true} && ${sql.ttl.audit_logs.ttl:0} > 0") @Slf4j public class AuditLogsCleanUpService extends AbstractCleanUpService { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java index 9b58358e3f..87a62d356f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java @@ -129,7 +129,7 @@ public class SqlPartitioningRepository { public List fetchPartitions(String table) { List partitions = new ArrayList<>(); - List partitionsTables = jdbcTemplate.queryForList(SELECT_PARTITIONS_STMT, new Object[]{table}, String.class); + List partitionsTables = jdbcTemplate.queryForList(SELECT_PARTITIONS_STMT, String.class, table); for (String partitionTableName : partitionsTables) { String partitionTsStr = partitionTableName.substring(table.length() + 1); try {