diff --git a/application/src/main/data/upgrade/3.4.0/schema_update.sql b/application/src/main/data/upgrade/3.4.0/schema_update.sql index 2b4b5af42d..caac509d17 100644 --- a/application/src/main/data/upgrade/3.4.0/schema_update.sql +++ b/application/src/main/data/upgrade/3.4.0/schema_update.sql @@ -88,9 +88,19 @@ CREATE INDEX IF NOT EXISTS idx_lc_event_main CREATE INDEX IF NOT EXISTS idx_error_event_main ON error_event (tenant_id ASC, entity_id ASC, ts DESC NULLS LAST) WITH (FILLFACTOR=95); +CREATE OR REPLACE FUNCTION to_safe_json(p_json text) RETURNS json +LANGUAGE plpgsql AS +$$ +BEGIN + return REPLACE(p_json, '\u0000', '' )::json; +EXCEPTION + WHEN OTHERS THEN + return '{}'::json; +END; +$$; -- Useful to migrate old events to the new table structure; -CREATE OR REPLACE PROCEDURE migrate_regular_events(IN start_ts_in_ms bigint, IN partition_size_in_hours int) +CREATE OR REPLACE PROCEDURE migrate_regular_events(IN start_ts_in_ms bigint, IN end_ts_in_ms bigint, IN partition_size_in_hours int) LANGUAGE plpgsql AS $$ DECLARE @@ -100,7 +110,7 @@ DECLARE BEGIN partition_size_in_ms = partition_size_in_hours * 3600 * 1000; - FOR p IN SELECT DISTINCT event_type as event_type, (created_time - created_time % partition_size_in_ms) as partition_ts FROM event e WHERE e.event_type in ('STATS', 'LC_EVENT', 'ERROR') and ts > start_ts_in_ms + FOR p IN SELECT DISTINCT event_type as event_type, (created_time - created_time % partition_size_in_ms) as partition_ts FROM event e WHERE e.event_type in ('STATS', 'LC_EVENT', 'ERROR') and ts >= start_ts_in_ms and ts < end_ts_in_ms LOOP IF p.event_type = 'STATS' THEN table_name := 'stats_event'; @@ -121,9 +131,10 @@ BEGIN body::json ->> 'server', (body::json ->> 'messagesProcessed')::bigint, (body::json ->> 'errorsOccurred')::bigint - FROM event - WHERE ts > start_ts_in_ms - AND event_type = 'STATS' + FROM + (select id, tenant_id, ts, entity_id, to_safe_json(body) as body + FROM event WHERE ts >= start_ts_in_ms and ts < end_ts_in_ms AND event_type = 'STATS' AND to_safe_json(body) ->> 'server' IS NOT NULL + ) safe_event ON CONFLICT DO NOTHING; INSERT INTO lc_event @@ -135,9 +146,10 @@ BEGIN body::json ->> 'event', (body::json ->> 'success')::boolean, body::json ->> 'error' - FROM event - WHERE ts > start_ts_in_ms - AND event_type = 'LC_EVENT' + FROM + (select id, tenant_id, ts, entity_id, to_safe_json(body) as body + FROM event WHERE ts >= start_ts_in_ms and ts < end_ts_in_ms AND event_type = 'LC_EVENT' AND to_safe_json(body) ->> 'server' IS NOT NULL + ) safe_event ON CONFLICT DO NOTHING; INSERT INTO error_event @@ -148,16 +160,17 @@ BEGIN body::json ->> 'server', body::json ->> 'method', body::json ->> 'error' - FROM event - WHERE ts > start_ts_in_ms - AND event_type = 'ERROR' + FROM + (select id, tenant_id, ts, entity_id, to_safe_json(body) as body + FROM event WHERE ts >= start_ts_in_ms and ts < end_ts_in_ms AND event_type = 'ERROR' AND to_safe_json(body) ->> 'server' IS NOT NULL + ) safe_event ON CONFLICT DO NOTHING; END $$; -- Useful to migrate old debug events to the new table structure; -CREATE OR REPLACE PROCEDURE migrate_debug_events(IN start_ts_in_ms bigint, IN partition_size_in_hours int) +CREATE OR REPLACE PROCEDURE migrate_debug_events(IN start_ts_in_ms bigint, IN end_ts_in_ms bigint, IN partition_size_in_hours int) LANGUAGE plpgsql AS $$ DECLARE @@ -167,7 +180,7 @@ DECLARE BEGIN partition_size_in_ms = partition_size_in_hours * 3600 * 1000; - FOR p IN SELECT DISTINCT event_type as event_type, (created_time - created_time % partition_size_in_ms) as partition_ts FROM event e WHERE e.event_type in ('DEBUG_RULE_NODE', 'DEBUG_RULE_CHAIN') and ts > start_ts_in_ms + FOR p IN SELECT DISTINCT event_type as event_type, (created_time - created_time % partition_size_in_ms) as partition_ts FROM event e WHERE e.event_type in ('DEBUG_RULE_NODE', 'DEBUG_RULE_CHAIN') and ts >= start_ts_in_ms and ts < end_ts_in_ms LOOP IF p.event_type = 'DEBUG_RULE_NODE' THEN table_name := 'rule_node_debug_event'; @@ -183,20 +196,21 @@ BEGIN tenant_id, ts, entity_id, - body::json ->> 'server', - body::json ->> 'type', - (body::json ->> 'entityId')::uuid, - body::json ->> 'entityName', - (body::json ->> 'msgId')::uuid, - body::json ->> 'msgType', - body::json ->> 'dataType', - body::json ->> 'relationType', - body::json ->> 'data', - body::json ->> 'metadata', - body::json ->> 'error' - FROM event - WHERE ts > start_ts_in_ms - AND event_type = 'DEBUG_RULE_NODE' + body ->> 'server', + body ->> 'type', + (body ->> 'entityId')::uuid, + body ->> 'entityName', + (body ->> 'msgId')::uuid, + body ->> 'msgType', + body ->> 'dataType', + body ->> 'relationType', + body ->> 'data', + body ->> 'metadata', + body ->> 'error' + FROM + (select id, tenant_id, ts, entity_id, to_safe_json(body) as body + FROM event WHERE ts >= start_ts_in_ms and ts < end_ts_in_ms AND event_type = 'DEBUG_RULE_NODE' AND to_safe_json(body) ->> 'server' IS NOT NULL + ) safe_event ON CONFLICT DO NOTHING; INSERT INTO rule_chain_debug_event @@ -204,12 +218,13 @@ BEGIN tenant_id, ts, entity_id, - body::json ->> 'server', - body::json ->> 'message', - body::json ->> 'error' - FROM event - WHERE ts > start_ts_in_ms - AND event_type = 'DEBUG_RULE_CHAIN' + body ->> 'server', + body ->> 'message', + body ->> 'error' + FROM + (select id, tenant_id, ts, entity_id, to_safe_json(body) as body + FROM event WHERE ts >= start_ts_in_ms and ts < end_ts_in_ms AND event_type = 'DEBUG_RULE_CHAIN' AND to_safe_json(body) ->> 'server' IS NOT NULL + ) safe_event ON CONFLICT DO NOTHING; END $$; diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/event/SqlEventCleanupRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/event/SqlEventCleanupRepository.java index 9a4a7f13b5..53786c448b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/event/SqlEventCleanupRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/event/SqlEventCleanupRepository.java @@ -27,6 +27,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; @Slf4j @@ -52,11 +53,15 @@ public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorSe @Override public void migrateEvents(long regularEventTs, long debugEventTs) { - callMigrateFunction("migrate_regular_events", regularEventTs, partitionConfiguration.getRegularPartitionSizeInHours()); - callMigrateFunction("migrate_debug_events", debugEventTs, partitionConfiguration.getDebugPartitionSizeInHours()); + regularEventTs = Math.max(regularEventTs, 1480982400000L); + debugEventTs = Math.max(debugEventTs, 1480982400000L); + + callMigrateFunctionByPartitions("regular", "migrate_regular_events", regularEventTs, partitionConfiguration.getRegularPartitionSizeInHours()); + callMigrateFunctionByPartitions("debug", "migrate_debug_events", debugEventTs, partitionConfiguration.getDebugPartitionSizeInHours()); + try (Connection connection = dataSource.getConnection(); - PreparedStatement dropFunction1 = connection.prepareStatement("DROP PROCEDURE IF EXISTS migrate_regular_events"); - PreparedStatement dropFunction2 = connection.prepareStatement("DROP PROCEDURE IF EXISTS migrate_debug_events"); + PreparedStatement dropFunction1 = connection.prepareStatement("DROP PROCEDURE IF EXISTS migrate_regular_events(bigint, bigint, int)"); + PreparedStatement dropFunction2 = connection.prepareStatement("DROP PROCEDURE IF EXISTS migrate_debug_events(bigint, bigint, int)"); PreparedStatement dropTable = connection.prepareStatement("DROP TABLE IF EXISTS event")) { dropFunction1.execute(); dropFunction2.execute(); @@ -67,11 +72,33 @@ public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorSe } } - private void callMigrateFunction(String functionName, long startTs, int partitionSizeInHours) { + private void callMigrateFunctionByPartitions(String logTag, String functionName, long startTs, int partitionSizeInHours) { + long currentTs = System.currentTimeMillis(); + var regularPartitionStepInMs = TimeUnit.HOURS.toMillis(partitionSizeInHours); + long numberOfPartitions = (currentTs - startTs) / regularPartitionStepInMs; + if (numberOfPartitions > 1000) { + log.error("Please adjust your {} events partitioning configuration. " + + "Configuration with partition size of {} hours and corresponding TTL will use {} (>1000) partitions which is not recommended!", + logTag, partitionSizeInHours, numberOfPartitions); + throw new RuntimeException("Please adjust your " + logTag + " events partitioning configuration. " + + "Configuration with partition size of " + partitionSizeInHours + " hours and corresponding TTL will use " + + +numberOfPartitions + " (>1000) partitions which is not recommended!"); + } + while (startTs < currentTs) { + var endTs = startTs + regularPartitionStepInMs; + log.info("Migrate {} events for time period: [{},{}]", logTag, startTs, endTs); + callMigrateFunction(functionName, startTs, startTs + regularPartitionStepInMs, partitionSizeInHours); + startTs = endTs; + } + log.info("Migrate {} events done.", logTag); + } + + private void callMigrateFunction(String functionName, long startTs, long endTs, int partitionSizeInHours) { try (Connection connection = dataSource.getConnection(); - PreparedStatement stmt = connection.prepareStatement("call " + functionName + "(?,?)")) { + PreparedStatement stmt = connection.prepareStatement("call " + functionName + "(?,?,?)")) { stmt.setLong(1, startTs); - stmt.setInt(2, partitionSizeInHours); + stmt.setLong(2, endTs); + stmt.setInt(3, partitionSizeInHours); stmt.execute(); } catch (SQLException e) { if (e.getMessage() == null || !e.getMessage().contains("relation \"event\" does not exist")) {