Browse Source

Merge pull request #7371 from ViacheslavKlimov/feature/audit-logs-partitioning-ttl

[3.4.2] TTL and table partitioning for audit logs
pull/7425/head
Andrew Shvayka 4 years ago
committed by GitHub
parent
commit
740192b23c
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 74
      application/src/main/data/upgrade/3.4.1/schema_update.sql
  2. 1
      application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java
  3. 17
      application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java
  4. 27
      application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java
  5. 61
      application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java
  6. 6
      application/src/main/resources/thingsboard.yml
  7. 68
      application/src/test/java/org/thingsboard/server/controller/BaseAuditLogControllerTest.java
  8. 5
      application/src/test/resources/application-test.properties
  9. 5
      dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogDao.java
  10. 4
      dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java
  11. 73
      dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java
  12. 56
      dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java
  13. 119
      dao/src/main/java/org/thingsboard/server/dao/sql/event/SqlEventCleanupRepository.java
  14. 129
      dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java
  15. 2
      dao/src/main/resources/sql/schema-entities-idx.sql
  16. 4
      dao/src/main/resources/sql/schema-entities.sql

74
application/src/main/data/upgrade/3.4.1/schema_update.sql

@ -0,0 +1,74 @@
--
-- Copyright © 2016-2022 The Thingsboard Authors
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
DO
$$
DECLARE table_partition RECORD;
BEGIN
-- in case of running the upgrade script a second time:
IF NOT (SELECT exists(SELECT FROM pg_tables WHERE tablename = 'old_audit_log')) THEN
ALTER TABLE audit_log RENAME TO old_audit_log;
ALTER INDEX IF EXISTS idx_audit_log_tenant_id_and_created_time RENAME TO idx_old_audit_log_tenant_id_and_created_time;
FOR table_partition IN SELECT tablename AS name, split_part(tablename, '_', 3) AS partition_ts
FROM pg_tables WHERE tablename LIKE 'audit_log_%'
LOOP
EXECUTE format('ALTER TABLE %s RENAME TO old_audit_log_%s', table_partition.name, table_partition.partition_ts);
END LOOP;
ELSE
RAISE NOTICE 'Table old_audit_log already exists, leaving as is';
END IF;
END;
$$;
CREATE TABLE IF NOT EXISTS audit_log (
id uuid NOT NULL,
created_time bigint NOT NULL,
tenant_id uuid,
customer_id uuid,
entity_id uuid,
entity_type varchar(255),
entity_name varchar(255),
user_id uuid,
user_name varchar(255),
action_type varchar(255),
action_data varchar(1000000),
action_status varchar(255),
action_failure_details varchar(1000000)
) PARTITION BY RANGE (created_time);
CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time DESC);
CREATE OR REPLACE PROCEDURE migrate_audit_logs(IN start_time_ms BIGINT, IN end_time_ms BIGINT, IN partition_size_ms BIGINT)
LANGUAGE plpgsql AS
$$
DECLARE
p RECORD;
partition_end_ts BIGINT;
BEGIN
FOR p IN SELECT DISTINCT (created_time - created_time % partition_size_ms) AS partition_ts FROM old_audit_log
WHERE created_time >= start_time_ms AND created_time < end_time_ms
LOOP
partition_end_ts = p.partition_ts + partition_size_ms;
RAISE NOTICE '[audit_log] Partition to create : [%-%]', p.partition_ts, partition_end_ts;
EXECUTE format('CREATE TABLE IF NOT EXISTS audit_log_%s PARTITION OF audit_log ' ||
'FOR VALUES FROM ( %s ) TO ( %s )', p.partition_ts, p.partition_ts, partition_end_ts);
END LOOP;
INSERT INTO audit_log
SELECT * FROM old_audit_log
WHERE created_time >= start_time_ms AND created_time < end_time_ms;
END;
$$;

1
application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java

@ -228,6 +228,7 @@ public class ThingsboardInstallService {
case "3.4.1":
log.info("Upgrading ThingsBoard from version 3.4.1 to 3.4.2 ...");
databaseEntitiesUpgradeService.upgradeDatabase("3.4.1");
dataUpdateService.updateData("3.4.1");
log.info("Updating system data...");
systemDataLoaderService.updateSystemWidgets();
break;

17
application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java

@ -15,8 +15,6 @@
*/
package org.thingsboard.server.service.install;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
@ -25,10 +23,7 @@ import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.id.QueueId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
@ -37,8 +32,6 @@ import org.thingsboard.server.common.data.queue.ProcessingStrategyType;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.data.queue.SubmitStrategy;
import org.thingsboard.server.common.data.queue.SubmitStrategyType;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration;
import org.thingsboard.server.dao.asset.AssetProfileService;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.dashboard.DashboardService;
@ -63,11 +56,8 @@ import java.sql.SQLException;
import java.sql.SQLSyntaxErrorException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.thingsboard.server.service.install.DatabaseHelper.ADDITIONAL_INFO;
import static org.thingsboard.server.service.install.DatabaseHelper.ASSIGNED_CUSTOMERS;
@ -620,8 +610,8 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
case "3.4.1":
try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) {
log.info("Updating schema ...");
runSchemaUpdateScript(conn, "3.4.1");
if (isOldSchema(conn, 3004001)) {
try {
conn.createStatement().execute("ALTER TABLE asset ADD COLUMN asset_profile_id uuid");
} catch (Exception e) {
@ -669,6 +659,11 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
}
}
private void runSchemaUpdateScript(Connection connection, String version) throws Exception {
Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", version, SCHEMA_UPDATE_SQL);
loadSql(schemaUpdateFile, connection);
}
private void loadSql(Path sqlFile, Connection conn) throws Exception {
String sql = new String(Files.readAllBytes(sqlFile), Charset.forName("UTF-8"));
Statement st = conn.createStatement();

27
application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java

@ -63,6 +63,7 @@ import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.alarm.AlarmDao;
import org.thingsboard.server.dao.audit.AuditLogDao;
import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.event.EventService;
@ -138,6 +139,9 @@ public class DefaultDataUpdateService implements DataUpdateService {
@Autowired
private EventService eventService;
@Autowired
private AuditLogDao auditLogDao;
@Override
public void updateData(String fromVersion) throws Exception {
switch (fromVersion) {
@ -170,12 +174,22 @@ public class DefaultDataUpdateService implements DataUpdateService {
rateLimitsUpdater.updateEntities();
break;
case "3.4.0":
String skipEventsMigration = System.getenv("TB_SKIP_EVENTS_MIGRATION");
if (skipEventsMigration == null || skipEventsMigration.equalsIgnoreCase("false")) {
boolean skipEventsMigration = getEnv("TB_SKIP_EVENTS_MIGRATION", false);
if (!skipEventsMigration) {
log.info("Updating data from version 3.4.0 to 3.4.1 ...");
eventService.migrateEvents();
}
break;
case "3.4.1":
boolean skipAuditLogsMigration = getEnv("TB_SKIP_AUDIT_LOGS_MIGRATION", false);
if (!skipAuditLogsMigration) {
log.info("Updating data from version 3.4.1 to 3.4.2 ...");
log.info("Starting audit logs migration. Can be skipped with TB_SKIP_AUDIT_LOGS_MIGRATION env variable set to true");
auditLogDao.migrateAuditLogs();
} else {
log.info("Skipping audit logs migration");
}
break;
default:
throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion);
}
@ -645,4 +659,13 @@ public class DefaultDataUpdateService implements DataUpdateService {
return mainQueueConfiguration;
}
private boolean getEnv(String name, boolean defaultValue) {
String env = System.getenv(name);
if (env == null) {
return defaultValue;
} else {
return Boolean.parseBoolean(env);
}
}
}

61
application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java

@ -0,0 +1,61 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.ttl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.thingsboard.server.dao.audit.AuditLogDao;
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
import org.thingsboard.server.queue.discovery.PartitionService;
import java.util.concurrent.TimeUnit;
import static org.thingsboard.server.dao.model.ModelConstants.AUDIT_LOG_COLUMN_FAMILY_NAME;
@Service
@ConditionalOnExpression("${sql.ttl.audit_logs.enabled:true} && ${sql.ttl.audit_logs.ttl:0} > 0")
@Slf4j
public class AuditLogsCleanUpService extends AbstractCleanUpService {
private final AuditLogDao auditLogDao;
private final SqlPartitioningRepository partitioningRepository;
@Value("${sql.ttl.audit_logs.ttl:0}")
private long ttlInSec;
@Value("${sql.audit_logs.partition_size:168}")
private int partitionSizeInHours;
public AuditLogsCleanUpService(PartitionService partitionService, AuditLogDao auditLogDao, SqlPartitioningRepository partitioningRepository) {
super(partitionService);
this.auditLogDao = auditLogDao;
this.partitioningRepository = partitioningRepository;
}
@Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.audit_logs.checking_interval_ms})}",
fixedDelayString = "${sql.ttl.audit_logs.checking_interval_ms}")
public void cleanUp() {
long auditLogsExpTime = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(ttlInSec);
if (isSystemTenantPartitionMine()) {
auditLogDao.cleanUpAuditLogs(auditLogsExpTime);
} else {
partitioningRepository.cleanupPartitionsCache(AUDIT_LOG_COLUMN_FAMILY_NAME, auditLogsExpTime, TimeUnit.HOURS.toMillis(partitionSizeInHours));
}
}
}

6
application/src/main/resources/thingsboard.yml

@ -265,6 +265,8 @@ sql:
batch_size: "${SQL_EDGE_EVENTS_BATCH_SIZE:1000}"
batch_max_delay: "${SQL_EDGE_EVENTS_BATCH_MAX_DELAY_MS:100}"
stats_print_interval_ms: "${SQL_EDGE_EVENTS_BATCH_STATS_PRINT_MS:10000}"
audit_logs:
partition_size: "${SQL_AUDIT_LOGS_PARTITION_SIZE_HOURS:168}" # Default value - 1 week
# Specify whether to sort entities before batch update. Should be enabled for cluster mode to avoid deadlocks
batch_sort: "${SQL_BATCH_SORT:false}"
# Specify whether to remove null characters from strValue of attributes and timeseries before insert
@ -303,6 +305,10 @@ sql:
rpc:
enabled: "${SQL_TTL_RPC_ENABLED:true}"
checking_interval: "${SQL_RPC_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours
audit_logs:
enabled: "${SQL_TTL_AUDIT_LOGS_ENABLED:true}"
ttl: "${SQL_TTL_AUDIT_LOGS_SECS:0}" # Disabled by default. Accuracy of the cleanup depends on the sql.audit_logs.partition_size
checking_interval_ms: "${SQL_TTL_AUDIT_LOGS_CHECKING_INTERVAL_MS:86400000}" # Default value - 1 day
relations:
max_level: "${SQL_RELATIONS_MAX_LEVEL:50}" # //This value has to be reasonable small to prevent infinite recursion as early as possible

68
application/src/test/java/org/thingsboard/server/controller/BaseAuditLogControllerTest.java

@ -20,18 +20,33 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.audit.AuditLog;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.dao.audit.AuditLogDao;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
import org.thingsboard.server.service.ttl.AuditLogsCleanUpService;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
public abstract class BaseAuditLogControllerTest extends AbstractControllerTest {
@ -39,6 +54,18 @@ public abstract class BaseAuditLogControllerTest extends AbstractControllerTest
private Tenant savedTenant;
private User tenantAdmin;
@Autowired
private AuditLogDao auditLogDao;
@SpyBean
private SqlPartitioningRepository partitioningRepository;
@Autowired
private AuditLogsCleanUpService auditLogsCleanUpService;
@Value("#{${sql.audit_logs.partition_size} * 60 * 60 * 1000}")
private long partitionDurationInMs;
@Value("${sql.ttl.audit_logs.ttl}")
private long auditLogsTtlInSec;
@Before
public void beforeTest() throws Exception {
loginSysAdmin();
@ -145,4 +172,45 @@ public abstract class BaseAuditLogControllerTest extends AbstractControllerTest
Assert.assertEquals(179, loadedAuditLogs.size());
}
@Test
public void whenSavingNewAuditLog_thenCheckAndCreatePartitionIfNotExists() {
reset(partitioningRepository);
AuditLog auditLog = createAuditLog(ActionType.LOGIN, tenantAdminUserId);
verify(partitioningRepository).createPartitionIfNotExists(eq("audit_log"), eq(auditLog.getCreatedTime()), eq(partitionDurationInMs));
List<Long> partitions = partitioningRepository.fetchPartitions("audit_log");
assertThat(partitions).singleElement().satisfies(partitionStartTs -> {
assertThat(partitionStartTs).isEqualTo(partitioningRepository.calculatePartitionStartTime(auditLog.getCreatedTime(), partitionDurationInMs));
});
}
@Test
public void whenCleaningUpAuditLogsByTtl_thenDropOldPartitions() {
long oldAuditLogTs = LocalDate.of(2020, 10, 1).atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli();
long partitionStartTs = partitioningRepository.calculatePartitionStartTime(oldAuditLogTs, partitionDurationInMs);
partitioningRepository.createPartitionIfNotExists("audit_log", oldAuditLogTs, partitionDurationInMs);
List<Long> partitions = partitioningRepository.fetchPartitions("audit_log");
assertThat(partitions).contains(partitionStartTs);
auditLogsCleanUpService.cleanUp();
partitions = partitioningRepository.fetchPartitions("audit_log");
assertThat(partitions).doesNotContain(partitionStartTs);
assertThat(partitions).allSatisfy(partitionsStart -> {
long partitionEndTs = partitionsStart + partitionDurationInMs;
assertThat(partitionEndTs).isGreaterThan(System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(auditLogsTtlInSec));
});
}
private AuditLog createAuditLog(ActionType actionType, EntityId entityId) {
AuditLog auditLog = new AuditLog();
auditLog.setTenantId(tenantId);
auditLog.setCustomerId(null);
auditLog.setUserId(tenantAdminUserId);
auditLog.setEntityId(entityId);
auditLog.setUserName(tenantAdmin.getEmail());
auditLog.setActionType(actionType);
return auditLogDao.save(tenantId, auditLog);
}
}

5
application/src/test/resources/application-test.properties

@ -56,4 +56,7 @@ queue.rule-engine.queues[2].processing-strategy.retries=1
queue.rule-engine.queues[2].processing-strategy.pause-between-retries=0
queue.rule-engine.queues[2].processing-strategy.max-pause-between-retries=0
usage.stats.report.enabled=false
usage.stats.report.enabled=false
sql.audit_logs.partition_size=24
sql.ttl.audit_logs.ttl=2592000

5
dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogDao.java

@ -39,4 +39,9 @@ public interface AuditLogDao extends Dao<AuditLog> {
PageData<AuditLog> findAuditLogsByTenantIdAndUserId(UUID tenantId, UserId userId, List<ActionType> actionTypes, TimePageLink pageLink);
PageData<AuditLog> findAuditLogsByTenantId(UUID tenantId, List<ActionType> actionTypes, TimePageLink pageLink);
void cleanUpAuditLogs(long expTime);
void migrateAuditLogs();
}

4
dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java

@ -17,6 +17,7 @@ package org.thingsboard.server.dao.sql;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import javax.sql.DataSource;
import java.sql.SQLException;
@ -32,6 +33,9 @@ public abstract class JpaAbstractDaoListeningExecutorService {
@Autowired
protected DataSource dataSource;
@Autowired
protected JdbcTemplate jdbcTemplate;
protected void printWarnings(Statement statement) throws SQLException {
SQLWarning warnings = statement.getWarnings();
if (warnings != null) {

73
dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java

@ -15,33 +15,52 @@
*/
package org.thingsboard.server.dao.sql.audit;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.google.common.util.concurrent.ListenableFuture;
import org.springframework.beans.factory.annotation.Autowired;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.audit.AuditLog;
import org.thingsboard.server.common.data.id.AuditLogId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.audit.AuditLogDao;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.model.sql.AuditLogEntity;
import org.thingsboard.server.dao.sql.JpaAbstractDao;
import org.thingsboard.server.dao.util.SqlDao;
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@Component
@SqlDao
@RequiredArgsConstructor
@Slf4j
public class JpaAuditLogDao extends JpaAbstractDao<AuditLogEntity, AuditLog> implements AuditLogDao {
@Autowired
private AuditLogRepository auditLogRepository;
private final AuditLogRepository auditLogRepository;
private final SqlPartitioningRepository partitioningRepository;
private final JdbcTemplate jdbcTemplate;
@Value("${sql.audit_logs.partition_size:168}")
private int partitionSizeInHours;
@Value("${sql.ttl.audit_logs.ttl:0}")
private long ttlInSec;
private static final String TABLE_NAME = ModelConstants.AUDIT_LOG_COLUMN_FAMILY_NAME;
@Override
protected Class<AuditLogEntity> getEntityClass() {
@ -61,6 +80,17 @@ public class JpaAuditLogDao extends JpaAbstractDao<AuditLogEntity, AuditLog> imp
});
}
@Override
public AuditLog save(TenantId tenantId, AuditLog auditLog) {
if (auditLog.getId() == null) {
UUID uuid = Uuids.timeBased();
auditLog.setId(new AuditLogId(uuid));
auditLog.setCreatedTime(Uuids.unixTimestamp(uuid));
}
partitioningRepository.createPartitionIfNotExists(TABLE_NAME, auditLog.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
return super.save(tenantId, auditLog);
}
@Override
public PageData<AuditLog> findAuditLogsByTenantIdAndEntityId(UUID tenantId, EntityId entityId, List<ActionType> actionTypes, TimePageLink pageLink) {
return DaoUtil.toPageData(
@ -115,4 +145,41 @@ public class JpaAuditLogDao extends JpaAbstractDao<AuditLogEntity, AuditLog> imp
actionTypes,
DaoUtil.toPageable(pageLink)));
}
@Override
public void cleanUpAuditLogs(long expTime) {
partitioningRepository.dropPartitionsBefore(TABLE_NAME, expTime, TimeUnit.HOURS.toMillis(partitionSizeInHours));
}
@Override
public void migrateAuditLogs() {
long startTime = ttlInSec > 0 ? System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(ttlInSec) : 1480982400000L;
long currentTime = System.currentTimeMillis();
var partitionStepInMs = TimeUnit.HOURS.toMillis(partitionSizeInHours);
long numberOfPartitions = (currentTime - startTime) / partitionStepInMs;
if (numberOfPartitions > 1000) {
String error = "Please adjust your audit logs partitioning configuration. Configuration with partition size " +
"of " + partitionSizeInHours + " hours and corresponding TTL will use " + numberOfPartitions + " " +
"(> 1000) partitions which is not recommended!";
log.error(error);
throw new RuntimeException(error);
}
while (startTime < currentTime) {
var endTime = startTime + partitionStepInMs;
log.info("Migrating audit logs for time period: {} - {}", startTime, endTime);
callMigrationFunction(startTime, endTime, partitionStepInMs);
startTime = endTime;
}
log.info("Audit logs migration finished");
jdbcTemplate.execute("DROP TABLE IF EXISTS old_audit_log");
}
private void callMigrationFunction(long startTime, long endTime, long partitionSizeInMs) {
jdbcTemplate.update("CALL migrate_audit_logs(?, ?, ?)", startTime, endTime, partitionSizeInMs);
}
}

56
dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java

@ -18,10 +18,8 @@ package org.thingsboard.server.dao.sql.event;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.hibernate.exception.ConstraintViolationException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.event.ErrorEventFilter;
@ -43,7 +41,6 @@ import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
import org.thingsboard.server.dao.timeseries.SqlPartition;
import org.thingsboard.server.dao.util.SqlDao;
import javax.annotation.PostConstruct;
@ -54,7 +51,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
/**
@ -65,9 +61,6 @@ import java.util.function.Function;
@SqlDao
public class JpaBaseEventDao implements EventDao {
private final Map<EventType, Map<Long, SqlPartition>> partitionsByEventType = new ConcurrentHashMap<>();
private static final ReentrantLock partitionCreationLock = new ReentrantLock();
@Autowired
private EventPartitionConfiguration partitionConfiguration;
@ -122,9 +115,6 @@ public class JpaBaseEventDao implements EventDao {
@PostConstruct
private void init() {
for (EventType eventType : EventType.values()) {
partitionsByEventType.put(eventType, new ConcurrentHashMap<>());
}
TbSqlBlockingQueueParams params = TbSqlBlockingQueueParams.builder()
.logName("Events")
.batchSize(batchSize)
@ -165,42 +155,11 @@ public class JpaBaseEventDao implements EventDao {
event.setCreatedTime(System.currentTimeMillis());
}
}
savePartitionIfNotExist(event);
partitioningRepository.createPartitionIfNotExists(event.getType().getTable(), event.getCreatedTime(),
partitionConfiguration.getPartitionSizeInMs(event.getType()));
return queue.add(event);
}
private void savePartitionIfNotExist(Event event) {
EventType type = event.getType();
var partitionsMap = partitionsByEventType.get(type);
var partitionDuration = partitionConfiguration.getPartitionSizeInMs(type);
long partitionStartTs = event.getCreatedTime() - (event.getCreatedTime() % partitionDuration);
if (partitionsMap.get(partitionStartTs) == null) {
savePartition(partitionsMap, new SqlPartition(type.getTable(), partitionStartTs, partitionStartTs + partitionDuration, Long.toString(partitionStartTs)));
}
}
private void savePartition(Map<Long, SqlPartition> partitionsMap, SqlPartition sqlPartition) {
if (!partitionsMap.containsKey(sqlPartition.getStart())) {
partitionCreationLock.lock();
try {
log.trace("Saving partition: {}", sqlPartition);
partitioningRepository.save(sqlPartition);
log.trace("Adding partition to map: {}", sqlPartition);
partitionsMap.put(sqlPartition.getStart(), sqlPartition);
} catch (DataIntegrityViolationException ex) {
log.trace("Error occurred during partition save:", ex);
if (ex.getCause() instanceof ConstraintViolationException) {
log.warn("Saving partition [{}] rejected. Event data will save to the DEFAULT partition.", sqlPartition.getPartitionDate());
partitionsMap.put(sqlPartition.getStart(), sqlPartition);
} else {
throw new RuntimeException(ex);
}
} finally {
partitionCreationLock.unlock();
}
}
}
@Override
public PageData<? extends Event> findEvents(UUID tenantId, UUID entityId, EventType eventType, TimePageLink pageLink) {
return DaoUtil.toPageData(getEventRepository(eventType).findEvents(tenantId, entityId, pageLink.getStartTime(), pageLink.getEndTime(), DaoUtil.toPageable(pageLink, EventEntity.eventColumnMap)));
@ -438,23 +397,24 @@ public class JpaBaseEventDao implements EventDao {
log.info("Going to cleanup regular events with exp time: {}", regularEventExpTs);
if (cleanupDb) {
eventCleanupRepository.cleanupEvents(regularEventExpTs, false);
} else {
cleanupPartitionsCache(regularEventExpTs, false);
}
cleanupPartitions(regularEventExpTs, false);
}
if (debugEventExpTs > 0) {
log.info("Going to cleanup debug events with exp time: {}", debugEventExpTs);
if (cleanupDb) {
eventCleanupRepository.cleanupEvents(debugEventExpTs, true);
} else {
cleanupPartitionsCache(debugEventExpTs, true);
}
cleanupPartitions(debugEventExpTs, true);
}
}
private void cleanupPartitions(long expTime, boolean isDebug) {
private void cleanupPartitionsCache(long expTime, boolean isDebug) {
for (EventType eventType : EventType.values()) {
if (eventType.isDebug() == isDebug) {
Map<Long, SqlPartition> partitions = partitionsByEventType.get(eventType);
partitions.keySet().removeIf(startTs -> startTs + partitionConfiguration.getPartitionSizeInMs(eventType) < expTime);
partitioningRepository.cleanupPartitionsCache(eventType.getTable(), expTime, partitionConfiguration.getPartitionSizeInMs(eventType));
}
}
}

119
dao/src/main/java/org/thingsboard/server/dao/sql/event/SqlEventCleanupRepository.java

@ -17,16 +17,12 @@ package org.thingsboard.server.dao.sql.event;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.stereotype.Repository;
import org.thingsboard.server.common.data.event.EventType;
import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -34,13 +30,10 @@ import java.util.concurrent.TimeUnit;
@Repository
public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorService implements EventCleanupRepository {
private static final String SELECT_PARTITIONS_STMT = "SELECT tablename from pg_tables WHERE schemaname = 'public' and tablename like concat(?, '_%')";
private static final int PSQL_VERSION_14 = 140000;
@Autowired
private EventPartitionConfiguration partitionConfiguration;
private volatile Integer currentServerVersion;
@Autowired
private SqlPartitioningRepository partitioningRepository;
@Override
public void cleanupEvents(long eventExpTime, boolean debug) {
@ -59,16 +52,13 @@ public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorSe
callMigrateFunctionByPartitions("regular", "migrate_regular_events", regularEventTs, partitionConfiguration.getRegularPartitionSizeInHours());
callMigrateFunctionByPartitions("debug", "migrate_debug_events", debugEventTs, partitionConfiguration.getDebugPartitionSizeInHours());
try (Connection connection = dataSource.getConnection();
PreparedStatement dropFunction1 = connection.prepareStatement("DROP PROCEDURE IF EXISTS migrate_regular_events(bigint, bigint, int)");
PreparedStatement dropFunction2 = connection.prepareStatement("DROP PROCEDURE IF EXISTS migrate_debug_events(bigint, bigint, int)");
PreparedStatement dropTable = connection.prepareStatement("DROP TABLE IF EXISTS event")) {
dropFunction1.execute();
dropFunction2.execute();
dropTable.execute();
} catch (SQLException e) {
log.error("SQLException occurred during drop of the `events` table", e);
throw new RuntimeException(e);
try {
jdbcTemplate.execute("DROP PROCEDURE IF EXISTS migrate_regular_events(bigint, bigint, int)");
jdbcTemplate.execute("DROP PROCEDURE IF EXISTS migrate_debug_events(bigint, bigint, int)");
jdbcTemplate.execute("DROP TABLE IF EXISTS event");
} catch (DataAccessException e) {
log.error("Error occurred during drop of the `events` table", e);
throw e;
}
}
@ -94,13 +84,9 @@ public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorSe
}
private void callMigrateFunction(String functionName, long startTs, long endTs, int partitionSizeInHours) {
try (Connection connection = dataSource.getConnection();
PreparedStatement stmt = connection.prepareStatement("call " + functionName + "(?,?,?)")) {
stmt.setLong(1, startTs);
stmt.setLong(2, endTs);
stmt.setInt(3, partitionSizeInHours);
stmt.execute();
} catch (SQLException e) {
try {
jdbcTemplate.update("CALL " + functionName + "(?, ?, ?)", startTs, endTs, partitionSizeInHours);
} catch (DataAccessException e) {
if (e.getMessage() == null || !e.getMessage().contains("relation \"event\" does not exist")) {
log.error("[{}] SQLException occurred during execution of {} with parameters {} and {}", functionName, startTs, partitionSizeInHours, e);
throw new RuntimeException(e);
@ -109,82 +95,7 @@ public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorSe
}
private void cleanupEvents(EventType eventType, long eventExpTime) {
var partitionDuration = partitionConfiguration.getPartitionSizeInMs(eventType);
List<Long> partitions = fetchPartitions(eventType);
for (var partitionTs : partitions) {
var partitionEndTs = partitionTs + partitionDuration;
if (partitionEndTs < eventExpTime) {
log.info("[{}] Detaching expired partition: [{}-{}]", eventType, partitionTs, partitionEndTs);
if (detachAndDropPartition(eventType, partitionTs)) {
log.info("[{}] Detached expired partition: {}", eventType, partitionTs);
}
} else {
log.debug("[{}] Skip valid partition: {}", eventType, partitionTs);
}
}
}
private List<Long> fetchPartitions(EventType eventType) {
List<Long> partitions = new ArrayList<>();
try (Connection connection = dataSource.getConnection();
PreparedStatement stmt = connection.prepareStatement(SELECT_PARTITIONS_STMT)) {
stmt.setString(1, eventType.getTable());
stmt.execute();
try (ResultSet resultSet = stmt.getResultSet()) {
while (resultSet.next()) {
String partitionTableName = resultSet.getString(1);
String partitionTsStr = partitionTableName.substring(eventType.getTable().length() + 1);
try {
partitions.add(Long.parseLong(partitionTsStr));
} catch (NumberFormatException nfe) {
log.warn("Failed to parse table name: {}", partitionTableName);
}
}
}
} catch (SQLException e) {
log.error("SQLException occurred during events TTL task execution ", e);
}
return partitions;
}
private boolean detachAndDropPartition(EventType eventType, long partitionTs) {
String tablePartition = eventType.getTable() + "_" + partitionTs;
String detachPsqlStmtStr = "ALTER TABLE " + eventType.getTable() + " DETACH PARTITION " + tablePartition;
if (getCurrentServerVersion() >= PSQL_VERSION_14) {
detachPsqlStmtStr += " CONCURRENTLY";
}
String dropStmtStr = "DROP TABLE " + tablePartition;
try (Connection connection = dataSource.getConnection();
PreparedStatement detachStmt = connection.prepareStatement(detachPsqlStmtStr);
PreparedStatement dropStmt = connection.prepareStatement(dropStmtStr)) {
detachStmt.execute();
dropStmt.execute();
return true;
} catch (SQLException e) {
log.error("[{}] SQLException occurred during detach and drop of the partition: {}", eventType, partitionTs, e);
}
return false;
}
private synchronized int getCurrentServerVersion() {
if (currentServerVersion == null) {
try (Connection connection = dataSource.getConnection();
PreparedStatement versionStmt = connection.prepareStatement("SELECT current_setting('server_version_num')")) {
versionStmt.execute();
try (ResultSet resultSet = versionStmt.getResultSet()) {
while (resultSet.next()) {
currentServerVersion = resultSet.getInt(1);
}
}
} catch (SQLException e) {
log.warn("SQLException occurred during fetch of the server version", e);
}
if (currentServerVersion == null) {
currentServerVersion = 0;
}
}
return currentServerVersion;
partitioningRepository.dropPartitionsBefore(eventType.getTable(), eventExpTime, partitionConfiguration.getPartitionSizeInMs(eventType));
}
}

129
dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java

@ -15,23 +15,148 @@
*/
package org.thingsboard.server.dao.sqlts.insert.sql;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.dao.timeseries.SqlPartition;
import org.thingsboard.server.dao.util.SqlTsDao;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
@Repository
@Transactional
@Slf4j
public class SqlPartitioningRepository {
@PersistenceContext
private EntityManager entityManager;
@Autowired
private JdbcTemplate jdbcTemplate;
private static final String SELECT_PARTITIONS_STMT = "SELECT tablename from pg_tables WHERE schemaname = 'public' and tablename like concat(?, '_%')";
private static final int PSQL_VERSION_14 = 140000;
private volatile Integer currentServerVersion;
private final Map<String, Map<Long, SqlPartition>> tablesPartitions = new ConcurrentHashMap<>();
private final ReentrantLock partitionCreationLock = new ReentrantLock();
@Transactional
public void save(SqlPartition partition) {
entityManager.createNativeQuery(partition.getQuery()).executeUpdate();
}
@Transactional
public void createPartitionIfNotExists(String table, long entityTs, long partitionDurationMs) {
long partitionStartTs = calculatePartitionStartTime(entityTs, partitionDurationMs);
Map<Long, SqlPartition> partitions = tablesPartitions.computeIfAbsent(table, t -> new ConcurrentHashMap<>());
if (!partitions.containsKey(partitionStartTs)) {
SqlPartition partition = new SqlPartition(table, partitionStartTs, partitionStartTs + partitionDurationMs, Long.toString(partitionStartTs));
partitionCreationLock.lock();
try {
if (partitions.containsKey(partitionStartTs)) return;
log.trace("Saving partition: {}", partition);
save(partition);
log.trace("Adding partition to map: {}", partition);
partitions.put(partition.getStart(), partition);
} catch (RuntimeException e) {
log.trace("Error occurred during partition save:", e);
String msg = ExceptionUtils.getRootCauseMessage(e);
if (msg.contains("would overlap partition")) {
log.warn("Couldn't save {} partition for {}, data will be saved to the default partition. SQL error: {}",
partition.getPartitionDate(), table, msg);
partitions.put(partition.getStart(), partition);
} else {
throw e;
}
} finally {
partitionCreationLock.unlock();
}
}
}
public void dropPartitionsBefore(String table, long ts, long partitionDurationMs) {
List<Long> partitions = fetchPartitions(table);
for (Long partitionStartTime : partitions) {
long partitionEndTime = partitionStartTime + partitionDurationMs;
if (partitionEndTime < ts) {
log.info("[{}] Detaching expired partition: [{}-{}]", table, partitionStartTime, partitionEndTime);
boolean success = detachAndDropPartition(table, partitionStartTime);
if (success) {
log.info("[{}] Detached expired partition: {}", table, partitionStartTime);
}
} else {
log.debug("[{}] Skipping valid partition: {}", table, partitionStartTime);
}
}
}
public void cleanupPartitionsCache(String table, long expTime, long partitionDurationMs) {
Map<Long, SqlPartition> partitions = tablesPartitions.get(table);
if (partitions == null) return;
partitions.keySet().removeIf(startTime -> (startTime + partitionDurationMs) < expTime);
}
private boolean detachAndDropPartition(String table, long partitionTs) {
Map<Long, SqlPartition> cachedPartitions = tablesPartitions.get(table);
if (cachedPartitions != null) cachedPartitions.remove(partitionTs);
String tablePartition = table + "_" + partitionTs;
String detachPsqlStmtStr = "ALTER TABLE " + table + " DETACH PARTITION " + tablePartition;
if (getCurrentServerVersion() >= PSQL_VERSION_14) {
detachPsqlStmtStr += " CONCURRENTLY";
}
String dropStmtStr = "DROP TABLE " + tablePartition;
try {
jdbcTemplate.execute(detachPsqlStmtStr);
jdbcTemplate.execute(dropStmtStr);
return true;
} catch (DataAccessException e) {
log.error("[{}] Error occurred trying to detach and drop the partition {} ", table, partitionTs, e);
}
return false;
}
public List<Long> fetchPartitions(String table) {
List<Long> partitions = new ArrayList<>();
List<String> partitionsTables = jdbcTemplate.queryForList(SELECT_PARTITIONS_STMT, String.class, table);
for (String partitionTableName : partitionsTables) {
String partitionTsStr = partitionTableName.substring(table.length() + 1);
try {
partitions.add(Long.parseLong(partitionTsStr));
} catch (NumberFormatException nfe) {
log.warn("Failed to parse table name: {}", partitionTableName);
}
}
return partitions;
}
public long calculatePartitionStartTime(long ts, long partitionDuration) {
return ts - (ts % partitionDuration);
}
private synchronized int getCurrentServerVersion() {
if (currentServerVersion == null) {
try {
currentServerVersion = jdbcTemplate.queryForObject("SELECT current_setting('server_version_num')", Integer.class);
} catch (Exception e) {
log.warn("Error occurred during fetch of the server version", e);
}
if (currentServerVersion == null) {
currentServerVersion = 0;
}
}
return currentServerVersion;
}
}

2
dao/src/main/resources/sql/schema-entities-idx.sql

@ -48,7 +48,7 @@ CREATE INDEX IF NOT EXISTS idx_asset_type ON asset(tenant_id, type);
CREATE INDEX IF NOT EXISTS idx_attribute_kv_by_key_and_last_update_ts ON attribute_kv(entity_id, attribute_key, last_update_ts desc);
CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time);
CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time DESC);
CREATE INDEX IF NOT EXISTS idx_rpc_tenant_id_device_id ON rpc(tenant_id, device_id);

4
dao/src/main/resources/sql/schema-entities.sql

@ -74,7 +74,7 @@ CREATE TABLE IF NOT EXISTS entity_alarm (
);
CREATE TABLE IF NOT EXISTS audit_log (
id uuid NOT NULL CONSTRAINT audit_log_pkey PRIMARY KEY,
id uuid NOT NULL,
created_time bigint NOT NULL,
tenant_id uuid,
customer_id uuid,
@ -87,7 +87,7 @@ CREATE TABLE IF NOT EXISTS audit_log (
action_data varchar(1000000),
action_status varchar(255),
action_failure_details varchar(1000000)
);
) PARTITION BY RANGE (created_time);
CREATE TABLE IF NOT EXISTS attribute_kv (
entity_type varchar(255),

Loading…
Cancel
Save