diff --git a/application/src/main/data/upgrade/2.4.3/schema_update_psql_drop_partitions.sql b/application/src/main/data/upgrade/2.4.3/schema_update_psql_drop_partitions.sql index fcc5c6f232..9d336e0330 100644 --- a/application/src/main/data/upgrade/2.4.3/schema_update_psql_drop_partitions.sql +++ b/application/src/main/data/upgrade/2.4.3/schema_update_psql_drop_partitions.sql @@ -18,17 +18,18 @@ CREATE OR REPLACE PROCEDURE drop_partitions_by_max_ttl(IN partition_type varchar LANGUAGE plpgsql AS $$ DECLARE - max_tenant_ttl bigint; - max_customer_ttl bigint; - max_ttl bigint; - date timestamp; - partition_by_max_ttl_date varchar; - partition_month varchar; - partition_day varchar; - partition_year varchar; - partition varchar; - partition_to_delete varchar; - + max_tenant_ttl bigint; + max_customer_ttl bigint; + max_ttl bigint; + date timestamp; + partition_by_max_ttl_date varchar; + partition_by_max_ttl_month varchar; + partition_by_max_ttl_day varchar; + partition_by_max_ttl_year varchar; + partition varchar; + partition_year integer; + partition_month integer; + partition_day integer; BEGIN SELECT max(attribute_kv.long_v) @@ -45,53 +46,138 @@ BEGIN if max_ttl IS NOT NULL AND max_ttl > 0 THEN date := to_timestamp(EXTRACT(EPOCH FROM current_timestamp) - max_ttl); partition_by_max_ttl_date := get_partition_by_max_ttl_date(partition_type, date); + RAISE NOTICE 'Date by max ttl: %', date; RAISE NOTICE 'Partition by max ttl: %', partition_by_max_ttl_date; IF partition_by_max_ttl_date IS NOT NULL THEN CASE WHEN partition_type = 'DAYS' THEN - partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); - partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4); - partition_day := SPLIT_PART(partition_by_max_ttl_date, '_', 5); + partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); + partition_by_max_ttl_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4); + partition_by_max_ttl_day := SPLIT_PART(partition_by_max_ttl_date, '_', 5); WHEN partition_type = 'MONTHS' THEN - partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); - partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4); + partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); + partition_by_max_ttl_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4); ELSE - partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); + partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); END CASE; - FOR partition IN SELECT tablename - FROM pg_tables - WHERE schemaname = 'public' - AND tablename like 'ts_kv_' || '%' - AND tablename != 'ts_kv_latest' - AND tablename != 'ts_kv_dictionary' - AND tablename != 'ts_kv_indefinite' - LOOP - IF partition != partition_by_max_ttl_date THEN - IF partition_year IS NOT NULL THEN - IF SPLIT_PART(partition, '_', 3)::integer < partition_year::integer THEN - partition_to_delete := partition; - ELSE - IF partition_month IS NOT NULL THEN - IF SPLIT_PART(partition, '_', 4)::integer < partition_month::integer THEN - partition_to_delete := partition; + IF partition_by_max_ttl_year IS NULL THEN + RAISE NOTICE 'Failed to remove partitions by max ttl date due to partition_by_max_ttl_year is null!'; + ELSE + IF partition_type = 'YEARS' THEN + FOR partition IN SELECT tablename + FROM pg_tables + WHERE schemaname = 'public' + AND tablename like 'ts_kv_' || '%' + AND tablename != 'ts_kv_latest' + AND tablename != 'ts_kv_dictionary' + AND tablename != 'ts_kv_indefinite' + AND tablename != partition_by_max_ttl_date + LOOP + partition_year := SPLIT_PART(partition, '_', 3)::integer; + IF partition_year < partition_by_max_ttl_year::integer THEN + RAISE NOTICE 'Partition to delete by max ttl: %', partition; + EXECUTE format('DROP TABLE IF EXISTS %I', partition); + deleted := deleted + 1; + END IF; + END LOOP; + ELSE + IF partition_type = 'MONTHS' THEN + IF partition_by_max_ttl_month IS NULL THEN + RAISE NOTICE 'Failed to remove months partitions by max ttl date due to partition_by_max_ttl_month is null!'; + ELSE + FOR partition IN SELECT tablename + FROM pg_tables + WHERE schemaname = 'public' + AND tablename like 'ts_kv_' || '%' + AND tablename != 'ts_kv_latest' + AND tablename != 'ts_kv_dictionary' + AND tablename != 'ts_kv_indefinite' + AND tablename != partition_by_max_ttl_date + LOOP + partition_year := SPLIT_PART(partition, '_', 3)::integer; + IF partition_year > partition_by_max_ttl_year::integer THEN + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition; + CONTINUE; ELSE - IF partition_day IS NOT NULL THEN - IF SPLIT_PART(partition, '_', 5)::integer < partition_day::integer THEN - partition_to_delete := partition; + IF partition_year < partition_by_max_ttl_year::integer THEN + RAISE NOTICE 'Partition to delete by max ttl: %', partition; + EXECUTE format('DROP TABLE IF EXISTS %I', partition); + deleted := deleted + 1; + ELSE + partition_month := SPLIT_PART(partition, '_', 4)::integer; + IF partition_year = partition_by_max_ttl_year::integer THEN + IF partition_month >= partition_by_max_ttl_month::integer THEN + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition; + CONTINUE; + ELSE + RAISE NOTICE 'Partition to delete by max ttl: %', partition; + EXECUTE format('DROP TABLE IF EXISTS %I', partition); + deleted := deleted + 1; + END IF; END IF; END IF; END IF; + END LOOP; + END IF; + ELSE + IF partition_type = 'DAYS' THEN + IF partition_by_max_ttl_month IS NULL THEN + RAISE NOTICE 'Failed to remove days partitions by max ttl date due to partition_by_max_ttl_month is null!'; + ELSE + IF partition_by_max_ttl_day IS NULL THEN + RAISE NOTICE 'Failed to remove days partitions by max ttl date due to partition_by_max_ttl_day is null!'; + ELSE + FOR partition IN SELECT tablename + FROM pg_tables + WHERE schemaname = 'public' + AND tablename like 'ts_kv_' || '%' + AND tablename != 'ts_kv_latest' + AND tablename != 'ts_kv_dictionary' + AND tablename != 'ts_kv_indefinite' + AND tablename != partition_by_max_ttl_date + LOOP + partition_year := SPLIT_PART(partition, '_', 3)::integer; + IF partition_year > partition_by_max_ttl_year::integer THEN + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition; + CONTINUE; + ELSE + IF partition_year < partition_by_max_ttl_year::integer THEN + RAISE NOTICE 'Partition to delete by max ttl: %', partition; + EXECUTE format('DROP TABLE IF EXISTS %I', partition); + deleted := deleted + 1; + ELSE + partition_month := SPLIT_PART(partition, '_', 4)::integer; + IF partition_month > partition_by_max_ttl_month::integer THEN + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition; + CONTINUE; + ELSE + IF partition_month < partition_by_max_ttl_month::integer THEN + RAISE NOTICE 'Partition to delete by max ttl: %', partition; + EXECUTE format('DROP TABLE IF EXISTS %I', partition); + deleted := deleted + 1; + ELSE + partition_day := SPLIT_PART(partition, '_', 5)::integer; + IF partition_day >= partition_by_max_ttl_day::integer THEN + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition; + CONTINUE; + ELSE + IF partition_day < partition_by_max_ttl_day::integer THEN + RAISE NOTICE 'Partition to delete by max ttl: %', partition; + EXECUTE format('DROP TABLE IF EXISTS %I', partition); + deleted := deleted + 1; + END IF; + END IF; + END IF; + END IF; + END IF; + END IF; + END LOOP; END IF; END IF; END IF; - IF partition_to_delete IS NOT NULL THEN - RAISE NOTICE 'Partition to delete by max ttl: %', partition_to_delete; - EXECUTE format('DROP TABLE IF EXISTS %I', partition_to_delete); - partition_to_delete := NULL; - deleted := deleted + 1; - END IF; END IF; - END LOOP; + END IF; + END IF; END IF; END IF; END @@ -107,8 +193,6 @@ BEGIN partition := 'ts_kv_' || to_char(date, 'yyyy') || '_' || to_char(date, 'MM'); WHEN partition_type = 'YEARS' THEN partition := 'ts_kv_' || to_char(date, 'yyyy'); - WHEN partition_type = 'INDEFINITE' THEN - partition := NULL; ELSE partition := NULL; END CASE; diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java index 05799fc643..d0ed543b76 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java @@ -15,8 +15,13 @@ */ package org.thingsboard.server.service.ttl; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.queue.discovery.PartitionService; import java.sql.Connection; import java.sql.DriverManager; @@ -27,43 +32,12 @@ import java.sql.Statement; @Slf4j +@RequiredArgsConstructor public abstract class AbstractCleanUpService { - @Value("${spring.datasource.url}") - protected String dbUrl; + private final PartitionService partitionService; - @Value("${spring.datasource.username}") - protected String dbUserName; - - @Value("${spring.datasource.password}") - protected String dbPassword; - - protected long executeQuery(Connection conn, String query) throws SQLException { - try (Statement statement = conn.createStatement(); ResultSet resultSet = statement.executeQuery(query)) { - if (log.isDebugEnabled()) { - getWarnings(statement); - } - resultSet.next(); - return resultSet.getLong(1); - } - } - - protected void getWarnings(Statement statement) throws SQLException { - SQLWarning warnings = statement.getWarnings(); - if (warnings != null) { - log.debug("{}", warnings.getMessage()); - SQLWarning nextWarning = warnings.getNextWarning(); - while (nextWarning != null) { - log.debug("{}", nextWarning.getMessage()); - nextWarning = nextWarning.getNextWarning(); - } - } + protected boolean isSystemTenantPartitionMine(){ + return partitionService.resolve(ServiceType.TB_CORE, TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID).isMyPartition(); } - - protected abstract void doCleanUp(Connection connection) throws SQLException; - - protected Connection getConnection() throws SQLException { - return DriverManager.getConnection(dbUrl, dbUserName, dbPassword); - } - } diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/alarms/AlarmsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/AlarmsCleanUpService.java similarity index 99% rename from application/src/main/java/org/thingsboard/server/service/ttl/alarms/AlarmsCleanUpService.java rename to application/src/main/java/org/thingsboard/server/service/ttl/AlarmsCleanUpService.java index 3b76a6cbca..051a6c92b2 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/alarms/AlarmsCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/AlarmsCleanUpService.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.ttl.alarms; +package org.thingsboard.server.service.ttl; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -52,6 +52,7 @@ import java.util.concurrent.TimeUnit; @Slf4j @RequiredArgsConstructor public class AlarmsCleanUpService { + @Value("${sql.ttl.alarms.removal_batch_size}") private Integer removalBatchSize; diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/edge/EdgeEventsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/EdgeEventsCleanUpService.java similarity index 64% rename from application/src/main/java/org/thingsboard/server/service/ttl/edge/EdgeEventsCleanUpService.java rename to application/src/main/java/org/thingsboard/server/service/ttl/EdgeEventsCleanUpService.java index e93a82c7eb..3cdd1f71bd 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/edge/EdgeEventsCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/EdgeEventsCleanUpService.java @@ -13,20 +13,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.ttl.edge; +package org.thingsboard.server.service.ttl; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; -import org.thingsboard.server.dao.util.PsqlDao; +import org.thingsboard.server.dao.edge.EdgeService; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.ttl.AbstractCleanUpService; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; - -@PsqlDao +@TbCoreComponent @Slf4j @Service public class EdgeEventsCleanUpService extends AbstractCleanUpService { @@ -37,20 +35,18 @@ public class EdgeEventsCleanUpService extends AbstractCleanUpService { @Value("${sql.ttl.edge_events.enabled}") private boolean ttlTaskExecutionEnabled; + private final EdgeService edgeService; + + public EdgeEventsCleanUpService(PartitionService partitionService, EdgeService edgeService) { + super(partitionService); + this.edgeService = edgeService; + } + @Scheduled(initialDelayString = "${sql.ttl.edge_events.execution_interval_ms}", fixedDelayString = "${sql.ttl.edge_events.execution_interval_ms}") public void cleanUp() { - if (ttlTaskExecutionEnabled) { - try (Connection conn = getConnection()) { - doCleanUp(conn); - } catch (SQLException e) { - log.error("SQLException occurred during TTL task execution ", e); - } + if (ttlTaskExecutionEnabled && isSystemTenantPartitionMine()) { + edgeService.cleanupEvents(ttl); } } - @Override - protected void doCleanUp(Connection connection) throws SQLException { - long totalEdgeEventsRemoved = executeQuery(connection, "call cleanup_edge_events_by_ttl(" + ttl + ", 0);"); - log.info("Total edge events removed by TTL: [{}]", totalEdgeEventsRemoved); - } } diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/events/EventsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/EventsCleanUpService.java similarity index 65% rename from application/src/main/java/org/thingsboard/server/service/ttl/events/EventsCleanUpService.java rename to application/src/main/java/org/thingsboard/server/service/ttl/EventsCleanUpService.java index 407c88261f..a51910b7ed 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/events/EventsCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/EventsCleanUpService.java @@ -13,20 +13,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.ttl.events; +package org.thingsboard.server.service.ttl; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; -import org.thingsboard.server.dao.util.PsqlDao; +import org.thingsboard.server.dao.event.EventService; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.ttl.AbstractCleanUpService; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; - -@PsqlDao +@TbCoreComponent @Slf4j @Service public class EventsCleanUpService extends AbstractCleanUpService { @@ -40,20 +38,18 @@ public class EventsCleanUpService extends AbstractCleanUpService { @Value("${sql.ttl.events.enabled}") private boolean ttlTaskExecutionEnabled; + private final EventService eventService; + + public EventsCleanUpService(PartitionService partitionService, EventService eventService) { + super(partitionService); + this.eventService = eventService; + } + @Scheduled(initialDelayString = "${sql.ttl.events.execution_interval_ms}", fixedDelayString = "${sql.ttl.events.execution_interval_ms}") public void cleanUp() { - if (ttlTaskExecutionEnabled) { - try (Connection conn = getConnection()) { - doCleanUp(conn); - } catch (SQLException e) { - log.error("SQLException occurred during TTL task execution ", e); - } + if (ttlTaskExecutionEnabled && isSystemTenantPartitionMine()) { + eventService.cleanupEvents(ttl, debugTtl); } } - @Override - protected void doCleanUp(Connection connection) throws SQLException { - long totalEventsRemoved = executeQuery(connection, "call cleanup_events_by_ttl(" + ttl + ", " + debugTtl + ", 0);"); - log.info("Total events removed by TTL: [{}]", totalEventsRemoved); - } } \ No newline at end of file diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/AbstractTimeseriesCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/TimeseriesCleanUpService.java similarity index 61% rename from application/src/main/java/org/thingsboard/server/service/ttl/timeseries/AbstractTimeseriesCleanUpService.java rename to application/src/main/java/org/thingsboard/server/service/ttl/TimeseriesCleanUpService.java index ee2d437a22..55c746b580 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/AbstractTimeseriesCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/TimeseriesCleanUpService.java @@ -13,19 +13,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.ttl.timeseries; +package org.thingsboard.server.service.ttl; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import org.thingsboard.server.dao.timeseries.TimeseriesService; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.ttl.AbstractCleanUpService; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; - +@TbCoreComponent @Slf4j -public abstract class AbstractTimeseriesCleanUpService extends AbstractCleanUpService { +@Service +public class TimeseriesCleanUpService extends AbstractCleanUpService { @Value("${sql.ttl.ts.ts_key_value_ttl}") protected long systemTtl; @@ -33,14 +35,17 @@ public abstract class AbstractTimeseriesCleanUpService extends AbstractCleanUpSe @Value("${sql.ttl.ts.enabled}") private boolean ttlTaskExecutionEnabled; + private final TimeseriesService timeseriesService; + + public TimeseriesCleanUpService(PartitionService partitionService, TimeseriesService timeseriesService) { + super(partitionService); + this.timeseriesService = timeseriesService; + } + @Scheduled(initialDelayString = "${sql.ttl.ts.execution_interval_ms}", fixedDelayString = "${sql.ttl.ts.execution_interval_ms}") public void cleanUp() { - if (ttlTaskExecutionEnabled) { - try (Connection conn = getConnection()) { - doCleanUp(conn); - } catch (SQLException e) { - log.error("SQLException occurred during TTL task execution ", e); - } + if (ttlTaskExecutionEnabled && isSystemTenantPartitionMine()) { + timeseriesService.cleanup(systemTtl); } } diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/PsqlTimeseriesCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/PsqlTimeseriesCleanUpService.java deleted file mode 100644 index 3197f0cbb0..0000000000 --- a/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/PsqlTimeseriesCleanUpService.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Copyright © 2016-2021 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.service.ttl.timeseries; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Service; -import org.thingsboard.server.dao.model.ModelConstants; -import org.thingsboard.server.dao.util.PsqlDao; -import org.thingsboard.server.dao.util.SqlTsDao; - -import java.sql.Connection; -import java.sql.SQLException; - -@SqlTsDao -@PsqlDao -@Service -@Slf4j -public class PsqlTimeseriesCleanUpService extends AbstractTimeseriesCleanUpService { - - @Value("${sql.postgres.ts_key_value_partitioning}") - private String partitionType; - - @Override - protected void doCleanUp(Connection connection) throws SQLException { - long totalPartitionsRemoved = executeQuery(connection, "call drop_partitions_by_max_ttl('" + partitionType + "'," + systemTtl + ", 0);"); - log.info("Total partitions removed by TTL: [{}]", totalPartitionsRemoved); - long totalEntitiesTelemetryRemoved = executeQuery(connection, "call cleanup_timeseries_by_ttl('" + ModelConstants.NULL_UUID + "'," + systemTtl + ", 0);"); - log.info("Total telemetry removed stats by TTL for entities: [{}]", totalEntitiesTelemetryRemoved); - } -} \ No newline at end of file diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/TimescaleTimeseriesCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/TimescaleTimeseriesCleanUpService.java deleted file mode 100644 index 0ed61ef97c..0000000000 --- a/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/TimescaleTimeseriesCleanUpService.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Copyright © 2016-2021 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.service.ttl.timeseries; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; -import org.thingsboard.server.dao.model.ModelConstants; -import org.thingsboard.server.dao.util.TimescaleDBTsDao; - -import java.sql.Connection; -import java.sql.SQLException; - -@TimescaleDBTsDao -@Service -@Slf4j -public class TimescaleTimeseriesCleanUpService extends AbstractTimeseriesCleanUpService { - - @Override - protected void doCleanUp(Connection connection) throws SQLException { - long totalEntitiesTelemetryRemoved = executeQuery(connection, "call cleanup_timeseries_by_ttl('" + ModelConstants.NULL_UUID + "'," + systemTtl + ", 0);"); - log.info("Total telemetry removed stats by TTL for entities: [{}]", totalEntitiesTelemetryRemoved); - } -} diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java index a7b9145c01..53472fd261 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java @@ -93,4 +93,6 @@ public interface EdgeService { Object activateInstance(String licenseSecret, String releaseDate); String findMissingToRelatedRuleChains(TenantId tenantId, EdgeId edgeId); + + void cleanupEvents(long ttl); } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/event/EventService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/event/EventService.java index ea25568375..db1c77697e 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/event/EventService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/event/EventService.java @@ -46,4 +46,6 @@ public interface EventService { void removeEvents(TenantId tenantId, EntityId entityId); + void cleanupEvents(long ttl, long debugTtl); + } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java index 6fcb5ca2ca..b1a2541fc7 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java @@ -52,4 +52,6 @@ public interface TimeseriesService { List findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId); List findAllKeysByEntityIds(TenantId tenantId, List entityIds); + + void cleanup(long systemTtl); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeDao.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeDao.java index d254c5ee9a..60ef98a801 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeDao.java @@ -176,4 +176,10 @@ public interface EdgeDao extends Dao { * @return the list of rule chain objects */ ListenableFuture> findEdgesByTenantIdAndDashboardId(UUID tenantId, UUID dashboardId); + + /** + * Executes stored procedure to cleanup old edge events. + * @param ttl the ttl for edge events in seconds + */ + void cleanupEvents(long ttl); } \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java index 2ff7d94f43..0089853969 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java @@ -627,6 +627,11 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic return result.toString(); } + @Override + public void cleanupEvents(long ttl) { + edgeDao.cleanupEvents(ttl); + } + private List findEdgeRuleChains(TenantId tenantId, EdgeId edgeId) { List result = new ArrayList<>(); PageLink pageLink = new PageLink(DEFAULT_LIMIT); diff --git a/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java b/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java index 91bfa954fd..2785df90df 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java @@ -131,6 +131,11 @@ public class BaseEventService implements EventService { } while (eventPageData.hasNext()); } + @Override + public void cleanupEvents(long ttl, long debugTtl) { + eventDao.cleanupEvents(ttl, debugTtl); + } + private DataValidator eventValidator = new DataValidator() { @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/event/EventDao.java b/dao/src/main/java/org/thingsboard/server/dao/event/EventDao.java index ba4e86c95e..ceacbffd50 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/event/EventDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/event/EventDao.java @@ -102,4 +102,10 @@ public interface EventDao extends Dao { */ List findLatestEvents(UUID tenantId, EntityId entityId, String eventType, int limit); + /** + * Executes stored procedure to cleanup old events. Uses separate ttl for debug and other events. + * @param otherEventsTtl the ttl for events in seconds + * @param debugEventsTtl the ttl for debug events in seconds + */ + void cleanupEvents(long otherEventsTtl, long debugEventsTtl); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java index 4431356690..fcd679383f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java @@ -15,11 +15,33 @@ */ package org.thingsboard.server.dao.sql; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import javax.sql.DataSource; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.Statement; + +@Slf4j public abstract class JpaAbstractDaoListeningExecutorService { @Autowired protected JpaExecutorService service; + @Autowired + protected DataSource dataSource; + + protected void printWarnings(Statement statement) throws SQLException { + SQLWarning warnings = statement.getWarnings(); + if (warnings != null) { + log.debug("{}", warnings.getMessage()); + SQLWarning nextWarning = warnings.getNextWarning(); + while (nextWarning != null) { + log.debug("{}", nextWarning.getMessage()); + nextWarning = nextWarning.getNextWarning(); + } + } + } + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaEdgeDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaEdgeDao.java index f17196fe93..2249de109f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaEdgeDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaEdgeDao.java @@ -40,6 +40,10 @@ import org.thingsboard.server.dao.model.sql.EdgeInfoEntity; import org.thingsboard.server.dao.relation.RelationDao; import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -194,6 +198,24 @@ public class JpaEdgeDao extends JpaAbstractSearchTextDao imple return transformFromRelationToEdge(tenantId, relations); } + @Override + public void cleanupEvents(long ttl) { + log.info("Going to cleanup old edge events using ttl: {}s", ttl); + try (Connection connection = dataSource.getConnection(); + PreparedStatement stmt = connection.prepareStatement("call cleanup_edge_events_by_ttl(?,?)")) { + stmt.setLong(1, ttl); + stmt.setLong(2, 0); + stmt.execute(); + printWarnings(stmt); + try (ResultSet resultSet = stmt.getResultSet()) { + resultSet.next(); + log.info("Total edge events removed by TTL: [{}]", resultSet.getLong(1)); + } + } catch (SQLException e) { + log.error("SQLException occurred during edge events TTL task execution ", e); + } + } + private ListenableFuture> transformFromRelationToEdge(UUID tenantId, ListenableFuture> relations) { return Futures.transformAsync(relations, input -> { List> edgeFutures = new ArrayList<>(input.size()); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java index 44848ec515..ee09bb7a90 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java @@ -27,7 +27,6 @@ import org.thingsboard.server.common.data.Event; import org.thingsboard.server.common.data.event.DebugEvent; import org.thingsboard.server.common.data.event.ErrorEventFilter; import org.thingsboard.server.common.data.event.EventFilter; -import org.thingsboard.server.common.data.event.EventType; import org.thingsboard.server.common.data.event.LifeCycleEventFilter; import org.thingsboard.server.common.data.event.StatisticsEventFilter; import org.thingsboard.server.common.data.id.EntityId; @@ -40,6 +39,10 @@ import org.thingsboard.server.dao.event.EventDao; import org.thingsboard.server.dao.model.sql.EventEntity; import org.thingsboard.server.dao.sql.JpaAbstractDao; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -256,6 +259,25 @@ public class JpaBaseEventDao extends JpaAbstractDao implemen return DaoUtil.convertDataList(latest); } + @Override + public void cleanupEvents(long otherEventsTtl, long debugEventsTtl) { + log.info("Going to cleanup old events using debug events ttl: {}s and other events ttl: {}s", debugEventsTtl, otherEventsTtl); + try (Connection connection = dataSource.getConnection(); + PreparedStatement stmt = connection.prepareStatement("call cleanup_events_by_ttl(?,?,?)")) { + stmt.setLong(1, otherEventsTtl); + stmt.setLong(2, debugEventsTtl); + stmt.setLong(3, 0); + stmt.execute(); + printWarnings(stmt); + try (ResultSet resultSet = stmt.getResultSet()){ + resultSet.next(); + log.info("Total events removed by TTL: [{}]", resultSet.getLong(1)); + } + } catch (SQLException e) { + log.error("SQLException occurred during events TTL task execution ", e); + } + } + public Optional save(EventEntity entity, boolean ifNotExists) { log.debug("Save event [{}] ", entity); if (entity.getTenantId() == null) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java index f6a6b56be5..eb12984753 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java @@ -25,9 +25,14 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent; import javax.annotation.Nullable; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.List; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -62,6 +67,24 @@ public abstract class AbstractSqlTimeseriesDao extends BaseAbstractSqlTimeseries @Value("${sql.ttl.ts.ts_key_value_ttl:0}") private long systemTtl; + public void cleanup(long systemTtl) { + log.info("Going to cleanup old timeseries data using ttl: {}s", systemTtl); + try (Connection connection = dataSource.getConnection(); + PreparedStatement stmt = connection.prepareStatement("call cleanup_timeseries_by_ttl(?,?,?)")) { + stmt.setObject(1, ModelConstants.NULL_UUID); + stmt.setLong(2, systemTtl); + stmt.setLong(3, 0); + stmt.execute(); + printWarnings(stmt); + try (ResultSet resultSet = stmt.getResultSet()) { + resultSet.next(); + log.info("Total telemetry removed stats by TTL for entities: [{}]", resultSet.getLong(1)); + } + } catch (SQLException e) { + log.error("SQLException occurred during timeseries TTL task execution ", e); + } + } + protected ListenableFuture> processFindAllAsync(TenantId tenantId, EntityId entityId, List queries) { List>> futures = queries .stream() diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/hsql/JpaHsqlTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/hsql/JpaHsqlTimeseriesDao.java index c01d91ec61..c8241714f4 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/hsql/JpaHsqlTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/hsql/JpaHsqlTimeseriesDao.java @@ -54,4 +54,9 @@ public class JpaHsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa return Futures.transform(tsQueue.add(entity), v -> dataPointDays, MoreExecutors.directExecutor()); } + @Override + public void cleanup(long systemTtl) { + + } + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/JpaPsqlTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/JpaPsqlTimeseriesDao.java index 64c074fd40..c23e615548 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/JpaPsqlTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/JpaPsqlTimeseriesDao.java @@ -35,6 +35,10 @@ import org.thingsboard.server.dao.timeseries.SqlTsPartitionDate; import org.thingsboard.server.dao.util.PsqlDao; import org.thingsboard.server.dao.util.SqlTsDao; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; @@ -62,6 +66,7 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa @Value("${sql.postgres.ts_key_value_partitioning:MONTHS}") private String partitioning; + @Override protected void init() { super.init(); @@ -93,6 +98,30 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa return Futures.transform(tsQueue.add(entity), v -> dataPointDays, MoreExecutors.directExecutor()); } + @Override + public void cleanup(long systemTtl) { + cleanupPartitions(systemTtl); + super.cleanup(systemTtl); + } + + private void cleanupPartitions(long systemTtl) { + log.info("Going to cleanup old timeseries data partitions using partition type: {} and ttl: {}s", partitioning, systemTtl); + try (Connection connection = dataSource.getConnection(); + PreparedStatement stmt = connection.prepareStatement("call drop_partitions_by_max_ttl(?,?,?)")) { + stmt.setString(1, partitioning); + stmt.setLong(2, systemTtl); + stmt.setLong(3, 0); + stmt.execute(); + printWarnings(stmt); + try (ResultSet resultSet = stmt.getResultSet()) { + resultSet.next(); + log.info("Total partitions removed by TTL: [{}]", resultSet.getLong(1)); + } + } catch (SQLException e) { + log.error("SQLException occurred during TTL task execution ", e); + } + } + private void savePartitionIfNotExist(long ts) { if (!tsFormat.equals(SqlTsPartitionDate.INDEFINITE) && ts >= 0) { LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java index 7f798ddb4b..31f3407fbf 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java @@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.dao.DaoUtil; +import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity; import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; @@ -45,6 +46,9 @@ import org.thingsboard.server.dao.util.TimescaleDBTsDao; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import java.sql.CallableStatement; +import java.sql.SQLException; +import java.sql.Types; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.function.Function; @@ -156,6 +160,11 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements } } + @Override + public void cleanup(long systemTtl) { + super.cleanup(systemTtl); + } + private ListenableFuture> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) { String strKey = query.getKey(); Integer keyId = getOrSaveKeyId(strKey); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index 701c67a648..fb15af723a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -126,6 +126,11 @@ public class BaseTimeseriesService implements TimeseriesService { return timeseriesLatestDao.findAllKeysByEntityIds(tenantId, entityIds); } + @Override + public void cleanup(long systemTtl) { + timeseriesDao.cleanup(systemTtl); + } + @Override public ListenableFuture save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { validate(entityId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 240d5a0b88..ce653e2e6e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -288,6 +288,11 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD } } + @Override + public void cleanup(long systemTtl) { + //Cleanup by TTL is native for Cassandra + } + private ListenableFuture> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { long minPartition = toPartitionTs(query.getStartTs()); long maxPartition = toPartitionTs(query.getEndTs()); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java index 3b3eb4ee0a..e9af5f0b75 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java @@ -38,4 +38,6 @@ public interface TimeseriesDao { ListenableFuture remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query); ListenableFuture removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query); + + void cleanup(long systemTtl); } diff --git a/dao/src/main/resources/sql/schema-ts-psql.sql b/dao/src/main/resources/sql/schema-ts-psql.sql index 5683cc0a17..2744ff5a07 100644 --- a/dao/src/main/resources/sql/schema-ts-psql.sql +++ b/dao/src/main/resources/sql/schema-ts-psql.sql @@ -38,17 +38,18 @@ CREATE OR REPLACE PROCEDURE drop_partitions_by_max_ttl(IN partition_type varchar LANGUAGE plpgsql AS $$ DECLARE - max_tenant_ttl bigint; - max_customer_ttl bigint; - max_ttl bigint; - date timestamp; - partition_by_max_ttl_date varchar; - partition_month varchar; - partition_day varchar; - partition_year varchar; - partition varchar; - partition_to_delete varchar; - + max_tenant_ttl bigint; + max_customer_ttl bigint; + max_ttl bigint; + date timestamp; + partition_by_max_ttl_date varchar; + partition_by_max_ttl_month varchar; + partition_by_max_ttl_day varchar; + partition_by_max_ttl_year varchar; + partition varchar; + partition_year integer; + partition_month integer; + partition_day integer; BEGIN SELECT max(attribute_kv.long_v) @@ -65,53 +66,138 @@ BEGIN if max_ttl IS NOT NULL AND max_ttl > 0 THEN date := to_timestamp(EXTRACT(EPOCH FROM current_timestamp) - max_ttl); partition_by_max_ttl_date := get_partition_by_max_ttl_date(partition_type, date); + RAISE NOTICE 'Date by max ttl: %', date; RAISE NOTICE 'Partition by max ttl: %', partition_by_max_ttl_date; IF partition_by_max_ttl_date IS NOT NULL THEN CASE WHEN partition_type = 'DAYS' THEN - partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); - partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4); - partition_day := SPLIT_PART(partition_by_max_ttl_date, '_', 5); + partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); + partition_by_max_ttl_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4); + partition_by_max_ttl_day := SPLIT_PART(partition_by_max_ttl_date, '_', 5); WHEN partition_type = 'MONTHS' THEN - partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); - partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4); + partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); + partition_by_max_ttl_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4); ELSE - partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); + partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); END CASE; - FOR partition IN SELECT tablename - FROM pg_tables - WHERE schemaname = 'public' - AND tablename like 'ts_kv_' || '%' - AND tablename != 'ts_kv_latest' - AND tablename != 'ts_kv_dictionary' - AND tablename != 'ts_kv_indefinite' - LOOP - IF partition != partition_by_max_ttl_date THEN - IF partition_year IS NOT NULL THEN - IF SPLIT_PART(partition, '_', 3)::integer < partition_year::integer THEN - partition_to_delete := partition; - ELSE - IF partition_month IS NOT NULL THEN - IF SPLIT_PART(partition, '_', 4)::integer < partition_month::integer THEN - partition_to_delete := partition; + IF partition_by_max_ttl_year IS NULL THEN + RAISE NOTICE 'Failed to remove partitions by max ttl date due to partition_by_max_ttl_year is null!'; + ELSE + IF partition_type = 'YEARS' THEN + FOR partition IN SELECT tablename + FROM pg_tables + WHERE schemaname = 'public' + AND tablename like 'ts_kv_' || '%' + AND tablename != 'ts_kv_latest' + AND tablename != 'ts_kv_dictionary' + AND tablename != 'ts_kv_indefinite' + AND tablename != partition_by_max_ttl_date + LOOP + partition_year := SPLIT_PART(partition, '_', 3)::integer; + IF partition_year < partition_by_max_ttl_year::integer THEN + RAISE NOTICE 'Partition to delete by max ttl: %', partition; + EXECUTE format('DROP TABLE IF EXISTS %I', partition); + deleted := deleted + 1; + END IF; + END LOOP; + ELSE + IF partition_type = 'MONTHS' THEN + IF partition_by_max_ttl_month IS NULL THEN + RAISE NOTICE 'Failed to remove months partitions by max ttl date due to partition_by_max_ttl_month is null!'; + ELSE + FOR partition IN SELECT tablename + FROM pg_tables + WHERE schemaname = 'public' + AND tablename like 'ts_kv_' || '%' + AND tablename != 'ts_kv_latest' + AND tablename != 'ts_kv_dictionary' + AND tablename != 'ts_kv_indefinite' + AND tablename != partition_by_max_ttl_date + LOOP + partition_year := SPLIT_PART(partition, '_', 3)::integer; + IF partition_year > partition_by_max_ttl_year::integer THEN + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition; + CONTINUE; ELSE - IF partition_day IS NOT NULL THEN - IF SPLIT_PART(partition, '_', 5)::integer < partition_day::integer THEN - partition_to_delete := partition; + IF partition_year < partition_by_max_ttl_year::integer THEN + RAISE NOTICE 'Partition to delete by max ttl: %', partition; + EXECUTE format('DROP TABLE IF EXISTS %I', partition); + deleted := deleted + 1; + ELSE + partition_month := SPLIT_PART(partition, '_', 4)::integer; + IF partition_year = partition_by_max_ttl_year::integer THEN + IF partition_month >= partition_by_max_ttl_month::integer THEN + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition; + CONTINUE; + ELSE + RAISE NOTICE 'Partition to delete by max ttl: %', partition; + EXECUTE format('DROP TABLE IF EXISTS %I', partition); + deleted := deleted + 1; + END IF; END IF; END IF; END IF; + END LOOP; + END IF; + ELSE + IF partition_type = 'DAYS' THEN + IF partition_by_max_ttl_month IS NULL THEN + RAISE NOTICE 'Failed to remove days partitions by max ttl date due to partition_by_max_ttl_month is null!'; + ELSE + IF partition_by_max_ttl_day IS NULL THEN + RAISE NOTICE 'Failed to remove days partitions by max ttl date due to partition_by_max_ttl_day is null!'; + ELSE + FOR partition IN SELECT tablename + FROM pg_tables + WHERE schemaname = 'public' + AND tablename like 'ts_kv_' || '%' + AND tablename != 'ts_kv_latest' + AND tablename != 'ts_kv_dictionary' + AND tablename != 'ts_kv_indefinite' + AND tablename != partition_by_max_ttl_date + LOOP + partition_year := SPLIT_PART(partition, '_', 3)::integer; + IF partition_year > partition_by_max_ttl_year::integer THEN + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition; + CONTINUE; + ELSE + IF partition_year < partition_by_max_ttl_year::integer THEN + RAISE NOTICE 'Partition to delete by max ttl: %', partition; + EXECUTE format('DROP TABLE IF EXISTS %I', partition); + deleted := deleted + 1; + ELSE + partition_month := SPLIT_PART(partition, '_', 4)::integer; + IF partition_month > partition_by_max_ttl_month::integer THEN + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition; + CONTINUE; + ELSE + IF partition_month < partition_by_max_ttl_month::integer THEN + RAISE NOTICE 'Partition to delete by max ttl: %', partition; + EXECUTE format('DROP TABLE IF EXISTS %I', partition); + deleted := deleted + 1; + ELSE + partition_day := SPLIT_PART(partition, '_', 5)::integer; + IF partition_day >= partition_by_max_ttl_day::integer THEN + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition; + CONTINUE; + ELSE + IF partition_day < partition_by_max_ttl_day::integer THEN + RAISE NOTICE 'Partition to delete by max ttl: %', partition; + EXECUTE format('DROP TABLE IF EXISTS %I', partition); + deleted := deleted + 1; + END IF; + END IF; + END IF; + END IF; + END IF; + END IF; + END LOOP; END IF; END IF; END IF; - IF partition_to_delete IS NOT NULL THEN - RAISE NOTICE 'Partition to delete by max ttl: %', partition_to_delete; - EXECUTE format('DROP TABLE IF EXISTS %I', partition_to_delete); - partition_to_delete := NULL; - deleted := deleted + 1; - END IF; END IF; - END LOOP; + END IF; + END IF; END IF; END IF; END @@ -127,8 +213,6 @@ BEGIN partition := 'ts_kv_' || to_char(date, 'yyyy') || '_' || to_char(date, 'MM'); WHEN partition_type = 'YEARS' THEN partition := 'ts_kv_' || to_char(date, 'yyyy'); - WHEN partition_type = 'INDEFINITE' THEN - partition := NULL; ELSE partition := NULL; END CASE;