diff --git a/application/src/main/data/upgrade/3.4.1/schema_update.sql b/application/src/main/data/upgrade/3.4.1/schema_update.sql new file mode 100644 index 0000000000..79cde8167f --- /dev/null +++ b/application/src/main/data/upgrade/3.4.1/schema_update.sql @@ -0,0 +1,74 @@ +-- +-- Copyright © 2016-2022 The Thingsboard Authors +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +DO +$$ + DECLARE table_partition RECORD; + BEGIN + -- in case of running the upgrade script a second time: + IF NOT (SELECT exists(SELECT FROM pg_tables WHERE tablename = 'old_audit_log')) THEN + ALTER TABLE audit_log RENAME TO old_audit_log; + ALTER INDEX IF EXISTS idx_audit_log_tenant_id_and_created_time RENAME TO idx_old_audit_log_tenant_id_and_created_time; + + FOR table_partition IN SELECT tablename AS name, split_part(tablename, '_', 3) AS partition_ts + FROM pg_tables WHERE tablename LIKE 'audit_log_%' + LOOP + EXECUTE format('ALTER TABLE %s RENAME TO old_audit_log_%s', table_partition.name, table_partition.partition_ts); + END LOOP; + ELSE + RAISE NOTICE 'Table old_audit_log already exists, leaving as is'; + END IF; + END; +$$; + +CREATE TABLE IF NOT EXISTS audit_log ( + id uuid NOT NULL, + created_time bigint NOT NULL, + tenant_id uuid, + customer_id uuid, + entity_id uuid, + entity_type varchar(255), + entity_name varchar(255), + user_id uuid, + user_name varchar(255), + action_type varchar(255), + action_data varchar(1000000), + action_status varchar(255), + action_failure_details varchar(1000000) +) PARTITION BY RANGE (created_time); +CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time DESC); + +CREATE OR REPLACE PROCEDURE migrate_audit_logs(IN start_time_ms BIGINT, IN end_time_ms BIGINT, IN partition_size_ms BIGINT) + LANGUAGE plpgsql AS +$$ +DECLARE + p RECORD; + partition_end_ts BIGINT; +BEGIN + FOR p IN SELECT DISTINCT (created_time - created_time % partition_size_ms) AS partition_ts FROM old_audit_log + WHERE created_time >= start_time_ms AND created_time < end_time_ms + LOOP + partition_end_ts = p.partition_ts + partition_size_ms; + RAISE NOTICE '[audit_log] Partition to create : [%-%]', p.partition_ts, partition_end_ts; + EXECUTE format('CREATE TABLE IF NOT EXISTS audit_log_%s PARTITION OF audit_log ' || + 'FOR VALUES FROM ( %s ) TO ( %s )', p.partition_ts, p.partition_ts, partition_end_ts); + END LOOP; + + INSERT INTO audit_log + SELECT * FROM old_audit_log + WHERE created_time >= start_time_ms AND created_time < end_time_ms; +END; +$$; diff --git a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java index c1f352d8eb..33a6d4bdd1 100644 --- a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java +++ b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java @@ -228,6 +228,7 @@ public class ThingsboardInstallService { case "3.4.1": log.info("Upgrading ThingsBoard from version 3.4.1 to 3.4.2 ..."); databaseEntitiesUpgradeService.upgradeDatabase("3.4.1"); + dataUpdateService.updateData("3.4.1"); log.info("Updating system data..."); systemDataLoaderService.updateSystemWidgets(); break; diff --git a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java index c5c81bfc91..092726d0c8 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java @@ -15,8 +15,6 @@ */ package org.thingsboard.server.service.install; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -25,10 +23,7 @@ import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.EntitySubtype; -import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.Tenant; -import org.thingsboard.server.common.data.TenantProfile; -import org.thingsboard.server.common.data.id.QueueId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; @@ -37,8 +32,6 @@ import org.thingsboard.server.common.data.queue.ProcessingStrategyType; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.data.queue.SubmitStrategy; import org.thingsboard.server.common.data.queue.SubmitStrategyType; -import org.thingsboard.server.common.data.rule.RuleNode; -import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration; import org.thingsboard.server.dao.asset.AssetProfileService; import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.dashboard.DashboardService; @@ -63,11 +56,8 @@ import java.sql.SQLException; import java.sql.SQLSyntaxErrorException; import java.sql.SQLWarning; import java.sql.Statement; -import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import static org.thingsboard.server.service.install.DatabaseHelper.ADDITIONAL_INFO; import static org.thingsboard.server.service.install.DatabaseHelper.ASSIGNED_CUSTOMERS; @@ -620,8 +610,8 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService case "3.4.1": try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) { log.info("Updating schema ..."); + runSchemaUpdateScript(conn, "3.4.1"); if (isOldSchema(conn, 3004001)) { - try { conn.createStatement().execute("ALTER TABLE asset ADD COLUMN asset_profile_id uuid"); } catch (Exception e) { @@ -669,6 +659,11 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService } } + private void runSchemaUpdateScript(Connection connection, String version) throws Exception { + Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", version, SCHEMA_UPDATE_SQL); + loadSql(schemaUpdateFile, connection); + } + private void loadSql(Path sqlFile, Connection conn) throws Exception { String sql = new String(Files.readAllBytes(sqlFile), Charset.forName("UTF-8")); Statement st = conn.createStatement(); diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java index d02f50b37b..a8cf5374a2 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java @@ -63,6 +63,7 @@ import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.alarm.AlarmDao; +import org.thingsboard.server.dao.audit.AuditLogDao; import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.event.EventService; @@ -138,6 +139,9 @@ public class DefaultDataUpdateService implements DataUpdateService { @Autowired private EventService eventService; + @Autowired + private AuditLogDao auditLogDao; + @Override public void updateData(String fromVersion) throws Exception { switch (fromVersion) { @@ -170,12 +174,22 @@ public class DefaultDataUpdateService implements DataUpdateService { rateLimitsUpdater.updateEntities(); break; case "3.4.0": - String skipEventsMigration = System.getenv("TB_SKIP_EVENTS_MIGRATION"); - if (skipEventsMigration == null || skipEventsMigration.equalsIgnoreCase("false")) { + boolean skipEventsMigration = getEnv("TB_SKIP_EVENTS_MIGRATION", false); + if (!skipEventsMigration) { log.info("Updating data from version 3.4.0 to 3.4.1 ..."); eventService.migrateEvents(); } break; + case "3.4.1": + boolean skipAuditLogsMigration = getEnv("TB_SKIP_AUDIT_LOGS_MIGRATION", false); + if (!skipAuditLogsMigration) { + log.info("Updating data from version 3.4.1 to 3.4.2 ..."); + log.info("Starting audit logs migration. Can be skipped with TB_SKIP_AUDIT_LOGS_MIGRATION env variable set to true"); + auditLogDao.migrateAuditLogs(); + } else { + log.info("Skipping audit logs migration"); + } + break; default: throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion); } @@ -645,4 +659,13 @@ public class DefaultDataUpdateService implements DataUpdateService { return mainQueueConfiguration; } + private boolean getEnv(String name, boolean defaultValue) { + String env = System.getenv(name); + if (env == null) { + return defaultValue; + } else { + return Boolean.parseBoolean(env); + } + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java new file mode 100644 index 0000000000..11b1751c0d --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java @@ -0,0 +1,61 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.ttl; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import org.thingsboard.server.dao.audit.AuditLogDao; +import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository; +import org.thingsboard.server.queue.discovery.PartitionService; + +import java.util.concurrent.TimeUnit; + +import static org.thingsboard.server.dao.model.ModelConstants.AUDIT_LOG_COLUMN_FAMILY_NAME; + +@Service +@ConditionalOnExpression("${sql.ttl.audit_logs.enabled:true} && ${sql.ttl.audit_logs.ttl:0} > 0") +@Slf4j +public class AuditLogsCleanUpService extends AbstractCleanUpService { + + private final AuditLogDao auditLogDao; + private final SqlPartitioningRepository partitioningRepository; + + @Value("${sql.ttl.audit_logs.ttl:0}") + private long ttlInSec; + @Value("${sql.audit_logs.partition_size:168}") + private int partitionSizeInHours; + + public AuditLogsCleanUpService(PartitionService partitionService, AuditLogDao auditLogDao, SqlPartitioningRepository partitioningRepository) { + super(partitionService); + this.auditLogDao = auditLogDao; + this.partitioningRepository = partitioningRepository; + } + + @Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.audit_logs.checking_interval_ms})}", + fixedDelayString = "${sql.ttl.audit_logs.checking_interval_ms}") + public void cleanUp() { + long auditLogsExpTime = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(ttlInSec); + if (isSystemTenantPartitionMine()) { + auditLogDao.cleanUpAuditLogs(auditLogsExpTime); + } else { + partitioningRepository.cleanupPartitionsCache(AUDIT_LOG_COLUMN_FAMILY_NAME, auditLogsExpTime, TimeUnit.HOURS.toMillis(partitionSizeInHours)); + } + } + +} diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 57e06b6563..f5ac40afd5 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -265,6 +265,8 @@ sql: batch_size: "${SQL_EDGE_EVENTS_BATCH_SIZE:1000}" batch_max_delay: "${SQL_EDGE_EVENTS_BATCH_MAX_DELAY_MS:100}" stats_print_interval_ms: "${SQL_EDGE_EVENTS_BATCH_STATS_PRINT_MS:10000}" + audit_logs: + partition_size: "${SQL_AUDIT_LOGS_PARTITION_SIZE_HOURS:168}" # Default value - 1 week # Specify whether to sort entities before batch update. Should be enabled for cluster mode to avoid deadlocks batch_sort: "${SQL_BATCH_SORT:false}" # Specify whether to remove null characters from strValue of attributes and timeseries before insert @@ -303,6 +305,10 @@ sql: rpc: enabled: "${SQL_TTL_RPC_ENABLED:true}" checking_interval: "${SQL_RPC_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours + audit_logs: + enabled: "${SQL_TTL_AUDIT_LOGS_ENABLED:true}" + ttl: "${SQL_TTL_AUDIT_LOGS_SECS:0}" # Disabled by default. Accuracy of the cleanup depends on the sql.audit_logs.partition_size + checking_interval_ms: "${SQL_TTL_AUDIT_LOGS_CHECKING_INTERVAL_MS:86400000}" # Default value - 1 day relations: max_level: "${SQL_RELATIONS_MAX_LEVEL:50}" # //This value has to be reasonable small to prevent infinite recursion as early as possible diff --git a/application/src/test/java/org/thingsboard/server/controller/BaseAuditLogControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/BaseAuditLogControllerTest.java index 5369d4d30a..c46f7d9c46 100644 --- a/application/src/test/java/org/thingsboard/server/controller/BaseAuditLogControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/BaseAuditLogControllerTest.java @@ -20,18 +20,33 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.mock.mockito.SpyBean; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.audit.AuditLog; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.data.security.Authority; +import org.thingsboard.server.dao.audit.AuditLogDao; import org.thingsboard.server.dao.model.ModelConstants; +import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository; +import org.thingsboard.server.service.ttl.AuditLogsCleanUpService; +import java.time.LocalDate; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; public abstract class BaseAuditLogControllerTest extends AbstractControllerTest { @@ -39,6 +54,18 @@ public abstract class BaseAuditLogControllerTest extends AbstractControllerTest private Tenant savedTenant; private User tenantAdmin; + @Autowired + private AuditLogDao auditLogDao; + @SpyBean + private SqlPartitioningRepository partitioningRepository; + @Autowired + private AuditLogsCleanUpService auditLogsCleanUpService; + + @Value("#{${sql.audit_logs.partition_size} * 60 * 60 * 1000}") + private long partitionDurationInMs; + @Value("${sql.ttl.audit_logs.ttl}") + private long auditLogsTtlInSec; + @Before public void beforeTest() throws Exception { loginSysAdmin(); @@ -145,4 +172,45 @@ public abstract class BaseAuditLogControllerTest extends AbstractControllerTest Assert.assertEquals(179, loadedAuditLogs.size()); } + + @Test + public void whenSavingNewAuditLog_thenCheckAndCreatePartitionIfNotExists() { + reset(partitioningRepository); + AuditLog auditLog = createAuditLog(ActionType.LOGIN, tenantAdminUserId); + verify(partitioningRepository).createPartitionIfNotExists(eq("audit_log"), eq(auditLog.getCreatedTime()), eq(partitionDurationInMs)); + + List partitions = partitioningRepository.fetchPartitions("audit_log"); + assertThat(partitions).singleElement().satisfies(partitionStartTs -> { + assertThat(partitionStartTs).isEqualTo(partitioningRepository.calculatePartitionStartTime(auditLog.getCreatedTime(), partitionDurationInMs)); + }); + } + + @Test + public void whenCleaningUpAuditLogsByTtl_thenDropOldPartitions() { + long oldAuditLogTs = LocalDate.of(2020, 10, 1).atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli(); + long partitionStartTs = partitioningRepository.calculatePartitionStartTime(oldAuditLogTs, partitionDurationInMs); + partitioningRepository.createPartitionIfNotExists("audit_log", oldAuditLogTs, partitionDurationInMs); + List partitions = partitioningRepository.fetchPartitions("audit_log"); + assertThat(partitions).contains(partitionStartTs); + + auditLogsCleanUpService.cleanUp(); + partitions = partitioningRepository.fetchPartitions("audit_log"); + assertThat(partitions).doesNotContain(partitionStartTs); + assertThat(partitions).allSatisfy(partitionsStart -> { + long partitionEndTs = partitionsStart + partitionDurationInMs; + assertThat(partitionEndTs).isGreaterThan(System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(auditLogsTtlInSec)); + }); + } + + private AuditLog createAuditLog(ActionType actionType, EntityId entityId) { + AuditLog auditLog = new AuditLog(); + auditLog.setTenantId(tenantId); + auditLog.setCustomerId(null); + auditLog.setUserId(tenantAdminUserId); + auditLog.setEntityId(entityId); + auditLog.setUserName(tenantAdmin.getEmail()); + auditLog.setActionType(actionType); + return auditLogDao.save(tenantId, auditLog); + } + } diff --git a/application/src/test/resources/application-test.properties b/application/src/test/resources/application-test.properties index 279d1e99be..eacd1733d3 100644 --- a/application/src/test/resources/application-test.properties +++ b/application/src/test/resources/application-test.properties @@ -56,4 +56,7 @@ queue.rule-engine.queues[2].processing-strategy.retries=1 queue.rule-engine.queues[2].processing-strategy.pause-between-retries=0 queue.rule-engine.queues[2].processing-strategy.max-pause-between-retries=0 -usage.stats.report.enabled=false \ No newline at end of file +usage.stats.report.enabled=false + +sql.audit_logs.partition_size=24 +sql.ttl.audit_logs.ttl=2592000 \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogDao.java b/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogDao.java index 939bc654b6..3f9b1ca01a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogDao.java @@ -39,4 +39,9 @@ public interface AuditLogDao extends Dao { PageData findAuditLogsByTenantIdAndUserId(UUID tenantId, UserId userId, List actionTypes, TimePageLink pageLink); PageData findAuditLogsByTenantId(UUID tenantId, List actionTypes, TimePageLink pageLink); + + void cleanUpAuditLogs(long expTime); + + void migrateAuditLogs(); + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java index d81e1a1323..b23be1a9cf 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java @@ -17,6 +17,7 @@ package org.thingsboard.server.dao.sql; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.JdbcTemplate; import javax.sql.DataSource; import java.sql.SQLException; @@ -32,6 +33,9 @@ public abstract class JpaAbstractDaoListeningExecutorService { @Autowired protected DataSource dataSource; + @Autowired + protected JdbcTemplate jdbcTemplate; + protected void printWarnings(Statement statement) throws SQLException { SQLWarning warnings = statement.getWarnings(); if (warnings != null) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java index 5d701a9c4d..1eefb21add 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java @@ -15,33 +15,52 @@ */ package org.thingsboard.server.dao.sql.audit; +import com.datastax.oss.driver.api.core.uuid.Uuids; import com.google.common.util.concurrent.ListenableFuture; -import org.springframework.beans.factory.annotation.Autowired; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.audit.AuditLog; +import org.thingsboard.server.common.data.id.AuditLogId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.audit.AuditLogDao; +import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.model.sql.AuditLogEntity; import org.thingsboard.server.dao.sql.JpaAbstractDao; import org.thingsboard.server.dao.util.SqlDao; +import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository; import java.util.List; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.TimeUnit; @Component @SqlDao +@RequiredArgsConstructor +@Slf4j public class JpaAuditLogDao extends JpaAbstractDao implements AuditLogDao { - @Autowired - private AuditLogRepository auditLogRepository; + private final AuditLogRepository auditLogRepository; + private final SqlPartitioningRepository partitioningRepository; + private final JdbcTemplate jdbcTemplate; + + @Value("${sql.audit_logs.partition_size:168}") + private int partitionSizeInHours; + @Value("${sql.ttl.audit_logs.ttl:0}") + private long ttlInSec; + + private static final String TABLE_NAME = ModelConstants.AUDIT_LOG_COLUMN_FAMILY_NAME; @Override protected Class getEntityClass() { @@ -61,6 +80,17 @@ public class JpaAuditLogDao extends JpaAbstractDao imp }); } + @Override + public AuditLog save(TenantId tenantId, AuditLog auditLog) { + if (auditLog.getId() == null) { + UUID uuid = Uuids.timeBased(); + auditLog.setId(new AuditLogId(uuid)); + auditLog.setCreatedTime(Uuids.unixTimestamp(uuid)); + } + partitioningRepository.createPartitionIfNotExists(TABLE_NAME, auditLog.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours)); + return super.save(tenantId, auditLog); + } + @Override public PageData findAuditLogsByTenantIdAndEntityId(UUID tenantId, EntityId entityId, List actionTypes, TimePageLink pageLink) { return DaoUtil.toPageData( @@ -115,4 +145,41 @@ public class JpaAuditLogDao extends JpaAbstractDao imp actionTypes, DaoUtil.toPageable(pageLink))); } + + @Override + public void cleanUpAuditLogs(long expTime) { + partitioningRepository.dropPartitionsBefore(TABLE_NAME, expTime, TimeUnit.HOURS.toMillis(partitionSizeInHours)); + } + + @Override + public void migrateAuditLogs() { + long startTime = ttlInSec > 0 ? System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(ttlInSec) : 1480982400000L; + + long currentTime = System.currentTimeMillis(); + var partitionStepInMs = TimeUnit.HOURS.toMillis(partitionSizeInHours); + long numberOfPartitions = (currentTime - startTime) / partitionStepInMs; + + if (numberOfPartitions > 1000) { + String error = "Please adjust your audit logs partitioning configuration. Configuration with partition size " + + "of " + partitionSizeInHours + " hours and corresponding TTL will use " + numberOfPartitions + " " + + "(> 1000) partitions which is not recommended!"; + log.error(error); + throw new RuntimeException(error); + } + + while (startTime < currentTime) { + var endTime = startTime + partitionStepInMs; + log.info("Migrating audit logs for time period: {} - {}", startTime, endTime); + callMigrationFunction(startTime, endTime, partitionStepInMs); + startTime = endTime; + } + log.info("Audit logs migration finished"); + + jdbcTemplate.execute("DROP TABLE IF EXISTS old_audit_log"); + } + + private void callMigrationFunction(long startTime, long endTime, long partitionSizeInMs) { + jdbcTemplate.update("CALL migrate_audit_logs(?, ?, ?)", startTime, endTime, partitionSizeInMs); + } + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java index 543b2aa30b..62ee11b828 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java @@ -18,10 +18,8 @@ package org.thingsboard.server.dao.sql.event; import com.datastax.oss.driver.api.core.uuid.Uuids; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; -import org.hibernate.exception.ConstraintViolationException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.dao.DataIntegrityViolationException; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.event.ErrorEventFilter; @@ -43,7 +41,6 @@ import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper; import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository; -import org.thingsboard.server.dao.timeseries.SqlPartition; import org.thingsboard.server.dao.util.SqlDao; import javax.annotation.PostConstruct; @@ -54,7 +51,6 @@ import java.util.Map; import java.util.Objects; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; /** @@ -65,9 +61,6 @@ import java.util.function.Function; @SqlDao public class JpaBaseEventDao implements EventDao { - private final Map> partitionsByEventType = new ConcurrentHashMap<>(); - private static final ReentrantLock partitionCreationLock = new ReentrantLock(); - @Autowired private EventPartitionConfiguration partitionConfiguration; @@ -122,9 +115,6 @@ public class JpaBaseEventDao implements EventDao { @PostConstruct private void init() { - for (EventType eventType : EventType.values()) { - partitionsByEventType.put(eventType, new ConcurrentHashMap<>()); - } TbSqlBlockingQueueParams params = TbSqlBlockingQueueParams.builder() .logName("Events") .batchSize(batchSize) @@ -165,42 +155,11 @@ public class JpaBaseEventDao implements EventDao { event.setCreatedTime(System.currentTimeMillis()); } } - savePartitionIfNotExist(event); + partitioningRepository.createPartitionIfNotExists(event.getType().getTable(), event.getCreatedTime(), + partitionConfiguration.getPartitionSizeInMs(event.getType())); return queue.add(event); } - private void savePartitionIfNotExist(Event event) { - EventType type = event.getType(); - var partitionsMap = partitionsByEventType.get(type); - var partitionDuration = partitionConfiguration.getPartitionSizeInMs(type); - long partitionStartTs = event.getCreatedTime() - (event.getCreatedTime() % partitionDuration); - if (partitionsMap.get(partitionStartTs) == null) { - savePartition(partitionsMap, new SqlPartition(type.getTable(), partitionStartTs, partitionStartTs + partitionDuration, Long.toString(partitionStartTs))); - } - } - - private void savePartition(Map partitionsMap, SqlPartition sqlPartition) { - if (!partitionsMap.containsKey(sqlPartition.getStart())) { - partitionCreationLock.lock(); - try { - log.trace("Saving partition: {}", sqlPartition); - partitioningRepository.save(sqlPartition); - log.trace("Adding partition to map: {}", sqlPartition); - partitionsMap.put(sqlPartition.getStart(), sqlPartition); - } catch (DataIntegrityViolationException ex) { - log.trace("Error occurred during partition save:", ex); - if (ex.getCause() instanceof ConstraintViolationException) { - log.warn("Saving partition [{}] rejected. Event data will save to the DEFAULT partition.", sqlPartition.getPartitionDate()); - partitionsMap.put(sqlPartition.getStart(), sqlPartition); - } else { - throw new RuntimeException(ex); - } - } finally { - partitionCreationLock.unlock(); - } - } - } - @Override public PageData findEvents(UUID tenantId, UUID entityId, EventType eventType, TimePageLink pageLink) { return DaoUtil.toPageData(getEventRepository(eventType).findEvents(tenantId, entityId, pageLink.getStartTime(), pageLink.getEndTime(), DaoUtil.toPageable(pageLink, EventEntity.eventColumnMap))); @@ -438,23 +397,24 @@ public class JpaBaseEventDao implements EventDao { log.info("Going to cleanup regular events with exp time: {}", regularEventExpTs); if (cleanupDb) { eventCleanupRepository.cleanupEvents(regularEventExpTs, false); + } else { + cleanupPartitionsCache(regularEventExpTs, false); } - cleanupPartitions(regularEventExpTs, false); } if (debugEventExpTs > 0) { log.info("Going to cleanup debug events with exp time: {}", debugEventExpTs); if (cleanupDb) { eventCleanupRepository.cleanupEvents(debugEventExpTs, true); + } else { + cleanupPartitionsCache(debugEventExpTs, true); } - cleanupPartitions(debugEventExpTs, true); } } - private void cleanupPartitions(long expTime, boolean isDebug) { + private void cleanupPartitionsCache(long expTime, boolean isDebug) { for (EventType eventType : EventType.values()) { if (eventType.isDebug() == isDebug) { - Map partitions = partitionsByEventType.get(eventType); - partitions.keySet().removeIf(startTs -> startTs + partitionConfiguration.getPartitionSizeInMs(eventType) < expTime); + partitioningRepository.cleanupPartitionsCache(eventType.getTable(), expTime, partitionConfiguration.getPartitionSizeInMs(eventType)); } } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/event/SqlEventCleanupRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/event/SqlEventCleanupRepository.java index 53786c448b..57073d34c5 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/event/SqlEventCleanupRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/event/SqlEventCleanupRepository.java @@ -17,16 +17,12 @@ package org.thingsboard.server.dao.sql.event; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.DataAccessException; import org.springframework.stereotype.Repository; import org.thingsboard.server.common.data.event.EventType; import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService; +import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.TimeUnit; @@ -34,13 +30,10 @@ import java.util.concurrent.TimeUnit; @Repository public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorService implements EventCleanupRepository { - private static final String SELECT_PARTITIONS_STMT = "SELECT tablename from pg_tables WHERE schemaname = 'public' and tablename like concat(?, '_%')"; - private static final int PSQL_VERSION_14 = 140000; - @Autowired private EventPartitionConfiguration partitionConfiguration; - - private volatile Integer currentServerVersion; + @Autowired + private SqlPartitioningRepository partitioningRepository; @Override public void cleanupEvents(long eventExpTime, boolean debug) { @@ -59,16 +52,13 @@ public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorSe callMigrateFunctionByPartitions("regular", "migrate_regular_events", regularEventTs, partitionConfiguration.getRegularPartitionSizeInHours()); callMigrateFunctionByPartitions("debug", "migrate_debug_events", debugEventTs, partitionConfiguration.getDebugPartitionSizeInHours()); - try (Connection connection = dataSource.getConnection(); - PreparedStatement dropFunction1 = connection.prepareStatement("DROP PROCEDURE IF EXISTS migrate_regular_events(bigint, bigint, int)"); - PreparedStatement dropFunction2 = connection.prepareStatement("DROP PROCEDURE IF EXISTS migrate_debug_events(bigint, bigint, int)"); - PreparedStatement dropTable = connection.prepareStatement("DROP TABLE IF EXISTS event")) { - dropFunction1.execute(); - dropFunction2.execute(); - dropTable.execute(); - } catch (SQLException e) { - log.error("SQLException occurred during drop of the `events` table", e); - throw new RuntimeException(e); + try { + jdbcTemplate.execute("DROP PROCEDURE IF EXISTS migrate_regular_events(bigint, bigint, int)"); + jdbcTemplate.execute("DROP PROCEDURE IF EXISTS migrate_debug_events(bigint, bigint, int)"); + jdbcTemplate.execute("DROP TABLE IF EXISTS event"); + } catch (DataAccessException e) { + log.error("Error occurred during drop of the `events` table", e); + throw e; } } @@ -94,13 +84,9 @@ public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorSe } private void callMigrateFunction(String functionName, long startTs, long endTs, int partitionSizeInHours) { - try (Connection connection = dataSource.getConnection(); - PreparedStatement stmt = connection.prepareStatement("call " + functionName + "(?,?,?)")) { - stmt.setLong(1, startTs); - stmt.setLong(2, endTs); - stmt.setInt(3, partitionSizeInHours); - stmt.execute(); - } catch (SQLException e) { + try { + jdbcTemplate.update("CALL " + functionName + "(?, ?, ?)", startTs, endTs, partitionSizeInHours); + } catch (DataAccessException e) { if (e.getMessage() == null || !e.getMessage().contains("relation \"event\" does not exist")) { log.error("[{}] SQLException occurred during execution of {} with parameters {} and {}", functionName, startTs, partitionSizeInHours, e); throw new RuntimeException(e); @@ -109,82 +95,7 @@ public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorSe } private void cleanupEvents(EventType eventType, long eventExpTime) { - var partitionDuration = partitionConfiguration.getPartitionSizeInMs(eventType); - List partitions = fetchPartitions(eventType); - for (var partitionTs : partitions) { - var partitionEndTs = partitionTs + partitionDuration; - if (partitionEndTs < eventExpTime) { - log.info("[{}] Detaching expired partition: [{}-{}]", eventType, partitionTs, partitionEndTs); - if (detachAndDropPartition(eventType, partitionTs)) { - log.info("[{}] Detached expired partition: {}", eventType, partitionTs); - } - } else { - log.debug("[{}] Skip valid partition: {}", eventType, partitionTs); - } - } - } - - private List fetchPartitions(EventType eventType) { - List partitions = new ArrayList<>(); - try (Connection connection = dataSource.getConnection(); - PreparedStatement stmt = connection.prepareStatement(SELECT_PARTITIONS_STMT)) { - stmt.setString(1, eventType.getTable()); - stmt.execute(); - try (ResultSet resultSet = stmt.getResultSet()) { - while (resultSet.next()) { - String partitionTableName = resultSet.getString(1); - String partitionTsStr = partitionTableName.substring(eventType.getTable().length() + 1); - try { - partitions.add(Long.parseLong(partitionTsStr)); - } catch (NumberFormatException nfe) { - log.warn("Failed to parse table name: {}", partitionTableName); - } - } - } - } catch (SQLException e) { - log.error("SQLException occurred during events TTL task execution ", e); - } - return partitions; - } - - private boolean detachAndDropPartition(EventType eventType, long partitionTs) { - String tablePartition = eventType.getTable() + "_" + partitionTs; - String detachPsqlStmtStr = "ALTER TABLE " + eventType.getTable() + " DETACH PARTITION " + tablePartition; - if (getCurrentServerVersion() >= PSQL_VERSION_14) { - detachPsqlStmtStr += " CONCURRENTLY"; - } - - String dropStmtStr = "DROP TABLE " + tablePartition; - try (Connection connection = dataSource.getConnection(); - PreparedStatement detachStmt = connection.prepareStatement(detachPsqlStmtStr); - PreparedStatement dropStmt = connection.prepareStatement(dropStmtStr)) { - detachStmt.execute(); - dropStmt.execute(); - return true; - } catch (SQLException e) { - log.error("[{}] SQLException occurred during detach and drop of the partition: {}", eventType, partitionTs, e); - } - return false; - } - - private synchronized int getCurrentServerVersion() { - if (currentServerVersion == null) { - try (Connection connection = dataSource.getConnection(); - PreparedStatement versionStmt = connection.prepareStatement("SELECT current_setting('server_version_num')")) { - versionStmt.execute(); - try (ResultSet resultSet = versionStmt.getResultSet()) { - while (resultSet.next()) { - currentServerVersion = resultSet.getInt(1); - } - } - } catch (SQLException e) { - log.warn("SQLException occurred during fetch of the server version", e); - } - if (currentServerVersion == null) { - currentServerVersion = 0; - } - } - return currentServerVersion; + partitioningRepository.dropPartitionsBefore(eventType.getTable(), eventExpTime, partitionConfiguration.getPartitionSizeInMs(eventType)); } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java index 805e5e4f4f..87a62d356f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java @@ -15,23 +15,148 @@ */ package org.thingsboard.server.dao.sqlts.insert.sql; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.DataAccessException; +import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.dao.timeseries.SqlPartition; -import org.thingsboard.server.dao.util.SqlTsDao; import javax.persistence.EntityManager; import javax.persistence.PersistenceContext; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; @Repository -@Transactional +@Slf4j public class SqlPartitioningRepository { @PersistenceContext private EntityManager entityManager; + @Autowired + private JdbcTemplate jdbcTemplate; + + private static final String SELECT_PARTITIONS_STMT = "SELECT tablename from pg_tables WHERE schemaname = 'public' and tablename like concat(?, '_%')"; + + private static final int PSQL_VERSION_14 = 140000; + private volatile Integer currentServerVersion; + + private final Map> tablesPartitions = new ConcurrentHashMap<>(); + private final ReentrantLock partitionCreationLock = new ReentrantLock(); + + @Transactional public void save(SqlPartition partition) { entityManager.createNativeQuery(partition.getQuery()).executeUpdate(); } + @Transactional + public void createPartitionIfNotExists(String table, long entityTs, long partitionDurationMs) { + long partitionStartTs = calculatePartitionStartTime(entityTs, partitionDurationMs); + Map partitions = tablesPartitions.computeIfAbsent(table, t -> new ConcurrentHashMap<>()); + if (!partitions.containsKey(partitionStartTs)) { + SqlPartition partition = new SqlPartition(table, partitionStartTs, partitionStartTs + partitionDurationMs, Long.toString(partitionStartTs)); + partitionCreationLock.lock(); + try { + if (partitions.containsKey(partitionStartTs)) return; + log.trace("Saving partition: {}", partition); + save(partition); + log.trace("Adding partition to map: {}", partition); + partitions.put(partition.getStart(), partition); + } catch (RuntimeException e) { + log.trace("Error occurred during partition save:", e); + String msg = ExceptionUtils.getRootCauseMessage(e); + if (msg.contains("would overlap partition")) { + log.warn("Couldn't save {} partition for {}, data will be saved to the default partition. SQL error: {}", + partition.getPartitionDate(), table, msg); + partitions.put(partition.getStart(), partition); + } else { + throw e; + } + } finally { + partitionCreationLock.unlock(); + } + } + } + + public void dropPartitionsBefore(String table, long ts, long partitionDurationMs) { + List partitions = fetchPartitions(table); + for (Long partitionStartTime : partitions) { + long partitionEndTime = partitionStartTime + partitionDurationMs; + if (partitionEndTime < ts) { + log.info("[{}] Detaching expired partition: [{}-{}]", table, partitionStartTime, partitionEndTime); + boolean success = detachAndDropPartition(table, partitionStartTime); + if (success) { + log.info("[{}] Detached expired partition: {}", table, partitionStartTime); + } + } else { + log.debug("[{}] Skipping valid partition: {}", table, partitionStartTime); + } + } + } + + public void cleanupPartitionsCache(String table, long expTime, long partitionDurationMs) { + Map partitions = tablesPartitions.get(table); + if (partitions == null) return; + partitions.keySet().removeIf(startTime -> (startTime + partitionDurationMs) < expTime); + } + + private boolean detachAndDropPartition(String table, long partitionTs) { + Map cachedPartitions = tablesPartitions.get(table); + if (cachedPartitions != null) cachedPartitions.remove(partitionTs); + + String tablePartition = table + "_" + partitionTs; + String detachPsqlStmtStr = "ALTER TABLE " + table + " DETACH PARTITION " + tablePartition; + if (getCurrentServerVersion() >= PSQL_VERSION_14) { + detachPsqlStmtStr += " CONCURRENTLY"; + } + + String dropStmtStr = "DROP TABLE " + tablePartition; + try { + jdbcTemplate.execute(detachPsqlStmtStr); + jdbcTemplate.execute(dropStmtStr); + return true; + } catch (DataAccessException e) { + log.error("[{}] Error occurred trying to detach and drop the partition {} ", table, partitionTs, e); + } + return false; + } + + public List fetchPartitions(String table) { + List partitions = new ArrayList<>(); + List partitionsTables = jdbcTemplate.queryForList(SELECT_PARTITIONS_STMT, String.class, table); + for (String partitionTableName : partitionsTables) { + String partitionTsStr = partitionTableName.substring(table.length() + 1); + try { + partitions.add(Long.parseLong(partitionTsStr)); + } catch (NumberFormatException nfe) { + log.warn("Failed to parse table name: {}", partitionTableName); + } + } + return partitions; + } + + public long calculatePartitionStartTime(long ts, long partitionDuration) { + return ts - (ts % partitionDuration); + } + + private synchronized int getCurrentServerVersion() { + if (currentServerVersion == null) { + try { + currentServerVersion = jdbcTemplate.queryForObject("SELECT current_setting('server_version_num')", Integer.class); + } catch (Exception e) { + log.warn("Error occurred during fetch of the server version", e); + } + if (currentServerVersion == null) { + currentServerVersion = 0; + } + } + return currentServerVersion; + } + } diff --git a/dao/src/main/resources/sql/schema-entities-idx.sql b/dao/src/main/resources/sql/schema-entities-idx.sql index 80a5b03b92..34862e5af3 100644 --- a/dao/src/main/resources/sql/schema-entities-idx.sql +++ b/dao/src/main/resources/sql/schema-entities-idx.sql @@ -48,7 +48,7 @@ CREATE INDEX IF NOT EXISTS idx_asset_type ON asset(tenant_id, type); CREATE INDEX IF NOT EXISTS idx_attribute_kv_by_key_and_last_update_ts ON attribute_kv(entity_id, attribute_key, last_update_ts desc); -CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time); +CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time DESC); CREATE INDEX IF NOT EXISTS idx_rpc_tenant_id_device_id ON rpc(tenant_id, device_id); diff --git a/dao/src/main/resources/sql/schema-entities.sql b/dao/src/main/resources/sql/schema-entities.sql index dd26724b46..51df863ae5 100644 --- a/dao/src/main/resources/sql/schema-entities.sql +++ b/dao/src/main/resources/sql/schema-entities.sql @@ -74,7 +74,7 @@ CREATE TABLE IF NOT EXISTS entity_alarm ( ); CREATE TABLE IF NOT EXISTS audit_log ( - id uuid NOT NULL CONSTRAINT audit_log_pkey PRIMARY KEY, + id uuid NOT NULL, created_time bigint NOT NULL, tenant_id uuid, customer_id uuid, @@ -87,7 +87,7 @@ CREATE TABLE IF NOT EXISTS audit_log ( action_data varchar(1000000), action_status varchar(255), action_failure_details varchar(1000000) -); +) PARTITION BY RANGE (created_time); CREATE TABLE IF NOT EXISTS attribute_kv ( entity_type varchar(255),