Browse Source

Upgrade improvements

pull/7001/head
Andrii Shvaika 4 years ago
parent
commit
302f512be8
  1. 81
      application/src/main/data/upgrade/3.4.0/schema_update.sql
  2. 41
      dao/src/main/java/org/thingsboard/server/dao/sql/event/SqlEventCleanupRepository.java

81
application/src/main/data/upgrade/3.4.0/schema_update.sql

@ -88,9 +88,19 @@ CREATE INDEX IF NOT EXISTS idx_lc_event_main
CREATE INDEX IF NOT EXISTS idx_error_event_main
ON error_event (tenant_id ASC, entity_id ASC, ts DESC NULLS LAST) WITH (FILLFACTOR=95);
CREATE OR REPLACE FUNCTION to_safe_json(p_json text) RETURNS json
LANGUAGE plpgsql AS
$$
BEGIN
return REPLACE(p_json, '\u0000', '' )::json;
EXCEPTION
WHEN OTHERS THEN
return '{}'::json;
END;
$$;
-- Useful to migrate old events to the new table structure;
CREATE OR REPLACE PROCEDURE migrate_regular_events(IN start_ts_in_ms bigint, IN partition_size_in_hours int)
CREATE OR REPLACE PROCEDURE migrate_regular_events(IN start_ts_in_ms bigint, IN end_ts_in_ms bigint, IN partition_size_in_hours int)
LANGUAGE plpgsql AS
$$
DECLARE
@ -100,7 +110,7 @@ DECLARE
BEGIN
partition_size_in_ms = partition_size_in_hours * 3600 * 1000;
FOR p IN SELECT DISTINCT event_type as event_type, (created_time - created_time % partition_size_in_ms) as partition_ts FROM event e WHERE e.event_type in ('STATS', 'LC_EVENT', 'ERROR') and ts > start_ts_in_ms
FOR p IN SELECT DISTINCT event_type as event_type, (created_time - created_time % partition_size_in_ms) as partition_ts FROM event e WHERE e.event_type in ('STATS', 'LC_EVENT', 'ERROR') and ts >= start_ts_in_ms and ts < end_ts_in_ms
LOOP
IF p.event_type = 'STATS' THEN
table_name := 'stats_event';
@ -121,9 +131,10 @@ BEGIN
body::json ->> 'server',
(body::json ->> 'messagesProcessed')::bigint,
(body::json ->> 'errorsOccurred')::bigint
FROM event
WHERE ts > start_ts_in_ms
AND event_type = 'STATS'
FROM
(select id, tenant_id, ts, entity_id, to_safe_json(body) as body
FROM event WHERE ts >= start_ts_in_ms and ts < end_ts_in_ms AND event_type = 'STATS' AND to_safe_json(body) ->> 'server' IS NOT NULL
) safe_event
ON CONFLICT DO NOTHING;
INSERT INTO lc_event
@ -135,9 +146,10 @@ BEGIN
body::json ->> 'event',
(body::json ->> 'success')::boolean,
body::json ->> 'error'
FROM event
WHERE ts > start_ts_in_ms
AND event_type = 'LC_EVENT'
FROM
(select id, tenant_id, ts, entity_id, to_safe_json(body) as body
FROM event WHERE ts >= start_ts_in_ms and ts < end_ts_in_ms AND event_type = 'LC_EVENT' AND to_safe_json(body) ->> 'server' IS NOT NULL
) safe_event
ON CONFLICT DO NOTHING;
INSERT INTO error_event
@ -148,16 +160,17 @@ BEGIN
body::json ->> 'server',
body::json ->> 'method',
body::json ->> 'error'
FROM event
WHERE ts > start_ts_in_ms
AND event_type = 'ERROR'
FROM
(select id, tenant_id, ts, entity_id, to_safe_json(body) as body
FROM event WHERE ts >= start_ts_in_ms and ts < end_ts_in_ms AND event_type = 'ERROR' AND to_safe_json(body) ->> 'server' IS NOT NULL
) safe_event
ON CONFLICT DO NOTHING;
END
$$;
-- Useful to migrate old debug events to the new table structure;
CREATE OR REPLACE PROCEDURE migrate_debug_events(IN start_ts_in_ms bigint, IN partition_size_in_hours int)
CREATE OR REPLACE PROCEDURE migrate_debug_events(IN start_ts_in_ms bigint, IN end_ts_in_ms bigint, IN partition_size_in_hours int)
LANGUAGE plpgsql AS
$$
DECLARE
@ -167,7 +180,7 @@ DECLARE
BEGIN
partition_size_in_ms = partition_size_in_hours * 3600 * 1000;
FOR p IN SELECT DISTINCT event_type as event_type, (created_time - created_time % partition_size_in_ms) as partition_ts FROM event e WHERE e.event_type in ('DEBUG_RULE_NODE', 'DEBUG_RULE_CHAIN') and ts > start_ts_in_ms
FOR p IN SELECT DISTINCT event_type as event_type, (created_time - created_time % partition_size_in_ms) as partition_ts FROM event e WHERE e.event_type in ('DEBUG_RULE_NODE', 'DEBUG_RULE_CHAIN') and ts >= start_ts_in_ms and ts < end_ts_in_ms
LOOP
IF p.event_type = 'DEBUG_RULE_NODE' THEN
table_name := 'rule_node_debug_event';
@ -183,20 +196,21 @@ BEGIN
tenant_id,
ts,
entity_id,
body::json ->> 'server',
body::json ->> 'type',
(body::json ->> 'entityId')::uuid,
body::json ->> 'entityName',
(body::json ->> 'msgId')::uuid,
body::json ->> 'msgType',
body::json ->> 'dataType',
body::json ->> 'relationType',
body::json ->> 'data',
body::json ->> 'metadata',
body::json ->> 'error'
FROM event
WHERE ts > start_ts_in_ms
AND event_type = 'DEBUG_RULE_NODE'
body ->> 'server',
body ->> 'type',
(body ->> 'entityId')::uuid,
body ->> 'entityName',
(body ->> 'msgId')::uuid,
body ->> 'msgType',
body ->> 'dataType',
body ->> 'relationType',
body ->> 'data',
body ->> 'metadata',
body ->> 'error'
FROM
(select id, tenant_id, ts, entity_id, to_safe_json(body) as body
FROM event WHERE ts >= start_ts_in_ms and ts < end_ts_in_ms AND event_type = 'DEBUG_RULE_NODE' AND to_safe_json(body) ->> 'server' IS NOT NULL
) safe_event
ON CONFLICT DO NOTHING;
INSERT INTO rule_chain_debug_event
@ -204,12 +218,13 @@ BEGIN
tenant_id,
ts,
entity_id,
body::json ->> 'server',
body::json ->> 'message',
body::json ->> 'error'
FROM event
WHERE ts > start_ts_in_ms
AND event_type = 'DEBUG_RULE_CHAIN'
body ->> 'server',
body ->> 'message',
body ->> 'error'
FROM
(select id, tenant_id, ts, entity_id, to_safe_json(body) as body
FROM event WHERE ts >= start_ts_in_ms and ts < end_ts_in_ms AND event_type = 'DEBUG_RULE_CHAIN' AND to_safe_json(body) ->> 'server' IS NOT NULL
) safe_event
ON CONFLICT DO NOTHING;
END
$$;

41
dao/src/main/java/org/thingsboard/server/dao/sql/event/SqlEventCleanupRepository.java

@ -27,6 +27,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Slf4j
@ -52,11 +53,15 @@ public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorSe
@Override
public void migrateEvents(long regularEventTs, long debugEventTs) {
callMigrateFunction("migrate_regular_events", regularEventTs, partitionConfiguration.getRegularPartitionSizeInHours());
callMigrateFunction("migrate_debug_events", debugEventTs, partitionConfiguration.getDebugPartitionSizeInHours());
regularEventTs = Math.max(regularEventTs, 1480982400000L);
debugEventTs = Math.max(debugEventTs, 1480982400000L);
callMigrateFunctionByPartitions("regular", "migrate_regular_events", regularEventTs, partitionConfiguration.getRegularPartitionSizeInHours());
callMigrateFunctionByPartitions("debug", "migrate_debug_events", debugEventTs, partitionConfiguration.getDebugPartitionSizeInHours());
try (Connection connection = dataSource.getConnection();
PreparedStatement dropFunction1 = connection.prepareStatement("DROP PROCEDURE IF EXISTS migrate_regular_events");
PreparedStatement dropFunction2 = connection.prepareStatement("DROP PROCEDURE IF EXISTS migrate_debug_events");
PreparedStatement dropFunction1 = connection.prepareStatement("DROP PROCEDURE IF EXISTS migrate_regular_events(bigint, bigint, int)");
PreparedStatement dropFunction2 = connection.prepareStatement("DROP PROCEDURE IF EXISTS migrate_debug_events(bigint, bigint, int)");
PreparedStatement dropTable = connection.prepareStatement("DROP TABLE IF EXISTS event")) {
dropFunction1.execute();
dropFunction2.execute();
@ -67,11 +72,33 @@ public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorSe
}
}
private void callMigrateFunction(String functionName, long startTs, int partitionSizeInHours) {
private void callMigrateFunctionByPartitions(String logTag, String functionName, long startTs, int partitionSizeInHours) {
long currentTs = System.currentTimeMillis();
var regularPartitionStepInMs = TimeUnit.HOURS.toMillis(partitionSizeInHours);
long numberOfPartitions = (currentTs - startTs) / regularPartitionStepInMs;
if (numberOfPartitions > 1000) {
log.error("Please adjust your {} events partitioning configuration. " +
"Configuration with partition size of {} hours and corresponding TTL will use {} (>1000) partitions which is not recommended!",
logTag, partitionSizeInHours, numberOfPartitions);
throw new RuntimeException("Please adjust your " + logTag + " events partitioning configuration. " +
"Configuration with partition size of " + partitionSizeInHours + " hours and corresponding TTL will use " +
+numberOfPartitions + " (>1000) partitions which is not recommended!");
}
while (startTs < currentTs) {
var endTs = startTs + regularPartitionStepInMs;
log.info("Migrate {} events for time period: [{},{}]", logTag, startTs, endTs);
callMigrateFunction(functionName, startTs, startTs + regularPartitionStepInMs, partitionSizeInHours);
startTs = endTs;
}
log.info("Migrate {} events done.", logTag);
}
private void callMigrateFunction(String functionName, long startTs, long endTs, int partitionSizeInHours) {
try (Connection connection = dataSource.getConnection();
PreparedStatement stmt = connection.prepareStatement("call " + functionName + "(?,?)")) {
PreparedStatement stmt = connection.prepareStatement("call " + functionName + "(?,?,?)")) {
stmt.setLong(1, startTs);
stmt.setInt(2, partitionSizeInHours);
stmt.setLong(2, endTs);
stmt.setInt(3, partitionSizeInHours);
stmt.execute();
} catch (SQLException e) {
if (e.getMessage() == null || !e.getMessage().contains("relation \"event\" does not exist")) {

Loading…
Cancel
Save