From bfdd52cefdc6bf781b31f99e6990d59383749f8e Mon Sep 17 00:00:00 2001 From: Dmytro Shvaika Date: Tue, 1 Sep 2020 15:47:06 +0300 Subject: [PATCH 1/3] fix drop partitions by max ttl procedure --- .../schema_update_psql_drop_partitions.sql | 1 + .../install/ThingsboardInstallService.java | 5 +++++ .../CassandraTsDatabaseUpgradeService.java | 1 + .../install/PsqlTsDatabaseUpgradeService.java | 6 ++++++ .../TimescaleTsDatabaseUpgradeService.java | 2 ++ .../service/ttl/AbstractCleanUpService.java | 18 +++++++----------- .../ttl/events/EventsCleanUpService.java | 2 +- .../PsqlTimeseriesCleanUpService.java | 3 ++- .../TimescaleTimeseriesCleanUpService.java | 3 ++- dao/src/main/resources/sql/schema-ts-psql.sql | 1 + 10 files changed, 28 insertions(+), 14 deletions(-) diff --git a/application/src/main/data/upgrade/2.4.3/schema_update_psql_drop_partitions.sql b/application/src/main/data/upgrade/2.4.3/schema_update_psql_drop_partitions.sql index 0916c241a1..41e1cfbb7a 100644 --- a/application/src/main/data/upgrade/2.4.3/schema_update_psql_drop_partitions.sql +++ b/application/src/main/data/upgrade/2.4.3/schema_update_psql_drop_partitions.sql @@ -64,6 +64,7 @@ BEGIN AND tablename like 'ts_kv_' || '%' AND tablename != 'ts_kv_latest' AND tablename != 'ts_kv_dictionary' + AND tablename != 'ts_kv_indefinite' LOOP IF partition != partition_by_max_ttl_date THEN IF partition_year IS NOT NULL THEN diff --git a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java index e281c0958e..01ad8a29e9 100644 --- a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java +++ b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java @@ -146,6 +146,11 @@ public class ThingsboardInstallService { databaseTsUpgradeService.upgradeDatabase("2.5.0"); } + case "2.5.4": + log.info("Upgrading ThingsBoard from version 2.5.4 to 2.5.5 ..."); + if (databaseTsUpgradeService != null) { + databaseTsUpgradeService.upgradeDatabase("2.5.4"); + } log.info("Updating system data..."); diff --git a/application/src/main/java/org/thingsboard/server/service/install/CassandraTsDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/CassandraTsDatabaseUpgradeService.java index 103e8090d9..07b8522323 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/CassandraTsDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/CassandraTsDatabaseUpgradeService.java @@ -49,6 +49,7 @@ public class CassandraTsDatabaseUpgradeService extends AbstractCassandraDatabase log.info("Schema updated."); break; case "2.5.0": + case "2.5.4": break; default: throw new RuntimeException("Unable to upgrade Cassandra database, unsupported fromVersion: " + fromVersion); diff --git a/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java index 7a8174af16..396f84664a 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java @@ -195,6 +195,12 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe executeQuery(conn, "UPDATE tb_schema_settings SET schema_version = 2005001"); } break; + case "2.5.4": + try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) { + log.info("Load Drop Partitions functions ..."); + loadSql(conn, LOAD_DROP_PARTITIONS_FUNCTIONS_SQL); + } + break; default: throw new RuntimeException("Unable to upgrade SQL database, unsupported fromVersion: " + fromVersion); } diff --git a/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseUpgradeService.java index d8f7ea61f9..a929a51fb5 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseUpgradeService.java @@ -177,6 +177,8 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr executeQuery(conn, "UPDATE tb_schema_settings SET schema_version = 2005001"); } break; + case "2.5.4": + break; default: throw new RuntimeException("Unable to upgrade SQL database, unsupported fromVersion: " + fromVersion); } diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java index 4fc4df0048..e81788958d 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java @@ -38,19 +38,15 @@ public abstract class AbstractCleanUpService { @Value("${spring.datasource.password}") protected String dbPassword; - protected long executeQuery(Connection conn, String query) { - long removed = 0L; - try { - Statement statement = conn.createStatement(); + protected long executeQuery(Connection conn, String query) throws SQLException { + try (Statement statement = conn.createStatement()) { ResultSet resultSet = statement.executeQuery(query); - getWarnings(statement); + if (log.isDebugEnabled()) { + getWarnings(statement); + } resultSet.next(); - removed = resultSet.getLong(1); - log.debug("Successfully executed query: {}", query); - } catch (SQLException e) { - log.debug("Failed to execute query: {} due to: {}", query, e.getMessage()); + return resultSet.getLong(1); } - return removed; } protected void getWarnings(Statement statement) throws SQLException { @@ -65,6 +61,6 @@ public abstract class AbstractCleanUpService { } } - protected abstract void doCleanUp(Connection connection); + protected abstract void doCleanUp(Connection connection) throws SQLException; } diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/events/EventsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/events/EventsCleanUpService.java index 5b094c5c0e..ca52bca7e0 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/events/EventsCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/events/EventsCleanUpService.java @@ -54,7 +54,7 @@ public class EventsCleanUpService extends AbstractCleanUpService { } @Override - protected void doCleanUp(Connection connection) { + protected void doCleanUp(Connection connection) throws SQLException { long totalEventsRemoved = executeQuery(connection, "call cleanup_events_by_ttl(" + ttl + ", " + debugTtl + ", 0);"); log.info("Total events removed by TTL: [{}]", totalEventsRemoved); } diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/PsqlTimeseriesCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/PsqlTimeseriesCleanUpService.java index cd403ee3b8..2464ab4677 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/PsqlTimeseriesCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/PsqlTimeseriesCleanUpService.java @@ -22,6 +22,7 @@ import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.util.PsqlTsDao; import java.sql.Connection; +import java.sql.SQLException; @PsqlTsDao @Service @@ -32,7 +33,7 @@ public class PsqlTimeseriesCleanUpService extends AbstractTimeseriesCleanUpServi private String partitionType; @Override - protected void doCleanUp(Connection connection) { + protected void doCleanUp(Connection connection) throws SQLException { long totalPartitionsRemoved = executeQuery(connection, "call drop_partitions_by_max_ttl('" + partitionType + "'," + systemTtl + ", 0);"); log.info("Total partitions removed by TTL: [{}]", totalPartitionsRemoved); long totalEntitiesTelemetryRemoved = executeQuery(connection, "call cleanup_timeseries_by_ttl('" + ModelConstants.NULL_UUID_STR + "'," + systemTtl + ", 0);"); diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/TimescaleTimeseriesCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/TimescaleTimeseriesCleanUpService.java index f5898b9b20..8bdeea46ae 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/TimescaleTimeseriesCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/TimescaleTimeseriesCleanUpService.java @@ -21,6 +21,7 @@ import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.util.TimescaleDBTsDao; import java.sql.Connection; +import java.sql.SQLException; @TimescaleDBTsDao @Service @@ -28,7 +29,7 @@ import java.sql.Connection; public class TimescaleTimeseriesCleanUpService extends AbstractTimeseriesCleanUpService { @Override - protected void doCleanUp(Connection connection) { + protected void doCleanUp(Connection connection) throws SQLException { long totalEntitiesTelemetryRemoved = executeQuery(connection, "call cleanup_timeseries_by_ttl('" + ModelConstants.NULL_UUID_STR + "'," + systemTtl + ", 0);"); log.info("Total telemetry removed stats by TTL for entities: [{}]", totalEntitiesTelemetryRemoved); } diff --git a/dao/src/main/resources/sql/schema-ts-psql.sql b/dao/src/main/resources/sql/schema-ts-psql.sql index 28420a8957..80f111869d 100644 --- a/dao/src/main/resources/sql/schema-ts-psql.sql +++ b/dao/src/main/resources/sql/schema-ts-psql.sql @@ -105,6 +105,7 @@ BEGIN AND tablename like 'ts_kv_' || '%' AND tablename != 'ts_kv_latest' AND tablename != 'ts_kv_dictionary' + AND tablename != 'ts_kv_indefinite' LOOP IF partition != partition_by_max_ttl_date THEN IF partition_year IS NOT NULL THEN From 8bdbda914eb18ec66d64a714d3b8899d7872e778 Mon Sep 17 00:00:00 2001 From: dshvaika Date: Mon, 14 Sep 2020 11:25:28 +0300 Subject: [PATCH 2/3] added resultSet to try with resources --- .../thingsboard/server/service/ttl/AbstractCleanUpService.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java index e81788958d..41f90c642c 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java @@ -39,8 +39,7 @@ public abstract class AbstractCleanUpService { protected String dbPassword; protected long executeQuery(Connection conn, String query) throws SQLException { - try (Statement statement = conn.createStatement()) { - ResultSet resultSet = statement.executeQuery(query); + try (Statement statement = conn.createStatement(); ResultSet resultSet = statement.executeQuery(query)) { if (log.isDebugEnabled()) { getWarnings(statement); } From e3957ea2aa20b8c59435e6d2b435721e930f0510 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 15 Sep 2020 18:29:54 +0300 Subject: [PATCH 3/3] MqttTransportHandler improvements --- .../server/transport/mqtt/MqttTransportHandler.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index d62ab6a7ee..e3861a51b2 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -342,7 +342,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private MqttMessage createUnSubAckMessage(int msgId) { MqttFixedHeader mqttFixedHeader = - new MqttFixedHeader(UNSUBACK, false, AT_LEAST_ONCE, false, 0); + new MqttFixedHeader(UNSUBACK, false, AT_MOST_ONCE, false, 0); MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId); return new MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader); } @@ -445,7 +445,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private static MqttSubAckMessage createSubAckMessage(Integer msgId, List grantedQoSList) { MqttFixedHeader mqttFixedHeader = - new MqttFixedHeader(SUBACK, false, AT_LEAST_ONCE, false, 0); + new MqttFixedHeader(SUBACK, false, AT_MOST_ONCE, false, 0); MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId); MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload(grantedQoSList); return new MqttSubAckMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubAckPayload); @@ -457,7 +457,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement public static MqttPubAckMessage createMqttPubAckMsg(int requestId) { MqttFixedHeader mqttFixedHeader = - new MqttFixedHeader(PUBACK, false, AT_LEAST_ONCE, false, 0); + new MqttFixedHeader(PUBACK, false, AT_MOST_ONCE, false, 0); MqttMessageIdVariableHeader mqttMsgIdVariableHeader = MqttMessageIdVariableHeader.from(requestId); return new MqttPubAckMessage(mqttFixedHeader, mqttMsgIdVariableHeader);