diff --git a/application/src/main/data/upgrade/3.6.2/schema_update_attribute_kv.sql b/application/src/main/data/upgrade/3.6.2/schema_update_attribute_kv.sql new file mode 100644 index 0000000000..142b3c1844 --- /dev/null +++ b/application/src/main/data/upgrade/3.6.2/schema_update_attribute_kv.sql @@ -0,0 +1,181 @@ +-- +-- Copyright © 2016-2023 The Thingsboard Authors +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + + +-- create new attribute_kv table schema +DO +$$ + BEGIN + -- in case of running the upgrade script a second time: + IF EXISTS(SELECT 1 FROM information_schema.columns WHERE table_name = 'attribute_kv' and column_name='entity_type') THEN + IF EXISTS(SELECT 1 FROM pg_indexes WHERE indexname = 'idx_attribute_kv_by_key_and_last_update_ts') THEN + ALTER INDEX idx_attribute_kv_by_key_and_last_update_ts RENAME TO idx_attribute_kv_by_key_and_last_update_ts_old; + END IF; + IF EXISTS(SELECT 1 FROM pg_constraint WHERE conname = 'attribute_kv_pkey') THEN + ALTER TABLE attribute_kv RENAME CONSTRAINT attribute_kv_pkey TO attribute_kv_pkey_old; + END IF; + ALTER TABLE attribute_kv + RENAME TO attribute_kv_old; + CREATE TABLE IF NOT EXISTS attribute_kv + ( + entity_id uuid, + attribute_type int, + attribute_key int, + bool_v boolean, + str_v varchar(10000000), + long_v bigint, + dbl_v double precision, + json_v json, + last_update_ts bigint, + CONSTRAINT attribute_kv_pkey PRIMARY KEY (entity_id, attribute_type, attribute_key) + ); + END IF; + END; +$$; + +-- create attribute_kv_dictionary table +CREATE TABLE IF NOT EXISTS attribute_kv_dictionary +( + key varchar(255) NOT NULL, + key_id serial UNIQUE, + CONSTRAINT attribute_key_id_pkey PRIMARY KEY (key) +); + +-- create to_attribute_type_id +CREATE OR REPLACE FUNCTION to_attribute_type_id(IN attribute_type varchar, OUT attribute_type_id int) AS +$$ +BEGIN + CASE + WHEN attribute_type = 'CLIENT_SCOPE' THEN + attribute_type_id := 1; + WHEN attribute_type = 'SERVER_SCOPE' THEN + attribute_type_id := 2; + WHEN attribute_type = 'SHARED_SCOPE' THEN + attribute_type_id := 3; + END CASE; +END; +$$ LANGUAGE plpgsql; + + +-- insert keys into attribute_kv_dictionary +DO +$$ +DECLARE + insert_record RECORD; + key_cursor refcursor; +BEGIN + IF EXISTS(SELECT 1 FROM information_schema.tables WHERE table_name = 'attribute_kv_old') THEN + OPEN key_cursor FOR SELECT DISTINCT attribute_key + FROM attribute_kv_old + ORDER BY attribute_key; + LOOP + FETCH key_cursor INTO insert_record; + EXIT WHEN NOT FOUND; + IF NOT EXISTS(SELECT key FROM attribute_kv_dictionary WHERE key = insert_record.attribute_key) THEN + INSERT INTO attribute_kv_dictionary(key) VALUES (insert_record.attribute_key); + END IF; + END LOOP; + CLOSE key_cursor; + END IF; +END; +$$; + +-- create procedure to migrate all rows from attribute_kv_old to attribute_kv +CREATE OR REPLACE PROCEDURE insert_into_attribute_kv(IN path_to_file varchar) + LANGUAGE plpgsql AS +$$ +DECLARE + row_num_old integer; + row_num integer; + attribute_scope_array text[]; +BEGIN + attribute_scope_array := ARRAY['SERVER_SCOPE', 'CLIENT_SCOPE', 'SHARED_SCOPE']; + IF EXISTS(SELECT 1 FROM information_schema.tables WHERE table_name = 'attribute_kv_old') THEN + EXECUTE format('COPY (SELECT records.entity_id AS entity_id, + to_attribute_type_id(records.attribute_type) AS attribute_type, + records.attribute_key AS attribute_key, + records.bool_v AS bool_v, + records.str_v AS str_v, + records.long_v AS long_v, + records.dbl_v AS dbl_v, + records.json_v AS json_v, + records.last_update_ts AS last_update_ts + FROM (SELECT entity_id, + attribute_type, + key_id AS attribute_key, + bool_v, + str_v, + long_v, + dbl_v, + json_v, + last_update_ts + FROM attribute_kv_old INNER JOIN attribute_kv_dictionary ON (attribute_kv_old.attribute_key = attribute_kv_dictionary.key) + WHERE attribute_type= ANY(%L)) AS records) TO %L;', attribute_scope_array, path_to_file); + EXECUTE format('COPY attribute_kv FROM %L', path_to_file); + SELECT COUNT(*) INTO row_num_old FROM attribute_kv_old; + SELECT COUNT(*) INTO row_num FROM attribute_kv; + RAISE NOTICE 'Migrated % of % rows', row_num, row_num_old; + END IF; +EXCEPTION + WHEN others THEN + ROLLBACK; + RAISE EXCEPTION 'Error during COPY: %', SQLERRM; +END +$$; + +CREATE OR REPLACE PROCEDURE recreate_device_info_active_attribute_view() + LANGUAGE plpgsql AS +$$ +BEGIN + DROP VIEW IF EXISTS device_info_active_attribute_view CASCADE; + CREATE OR REPLACE VIEW device_info_active_attribute_view AS + SELECT d.* + , c.title as customer_title + , COALESCE((c.additional_info::json->>'isPublic')::bool, FALSE) as customer_is_public + , d.type as device_profile_name + , COALESCE(da.bool_v, FALSE) as active + FROM device d + LEFT JOIN customer c ON c.id = d.customer_id + LEFT JOIN attribute_kv da ON da.entity_id = d.id AND da.attribute_type = 2 AND da.attribute_key = (select key_id from attribute_kv_dictionary where key = 'active'); +END; +$$; + +CREATE OR REPLACE PROCEDURE recreate_device_info_view() + LANGUAGE plpgsql AS +$$ +BEGIN + DROP VIEW IF EXISTS device_info_view CASCADE; + CREATE OR REPLACE VIEW device_info_view AS SELECT * FROM device_info_active_attribute_view; +END; +$$; + +CREATE OR REPLACE PROCEDURE drop_attribute_kv_old_table() + LANGUAGE plpgsql AS +$$ +DECLARE + row_num integer; +BEGIN + SELECT COUNT(*) INTO row_num FROM attribute_kv; + IF row_num != 0 then + DROP TABLE IF EXISTS attribute_kv_old; + DROP PROCEDURE IF EXISTS insert_into_attribute_kv(IN path_to_file varchar); + ELSE + RAISE EXCEPTION 'Table attribute_kv is empty'; + END IF; + RETURN; +END; +$$; + diff --git a/application/src/main/data/upgrade/3.6.2/schema_update_ttl.sql b/application/src/main/data/upgrade/3.6.2/schema_update_ttl.sql new file mode 100644 index 0000000000..67ec25656e --- /dev/null +++ b/application/src/main/data/upgrade/3.6.2/schema_update_ttl.sql @@ -0,0 +1,87 @@ +-- +-- Copyright © 2016-2023 The Thingsboard Authors +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +CREATE OR REPLACE PROCEDURE cleanup_timeseries_by_ttl(IN null_uuid uuid, + IN system_ttl bigint, INOUT deleted bigint) + LANGUAGE plpgsql AS +$$ +DECLARE +tenant_cursor CURSOR FOR select tenant.id as tenant_id + from tenant; +tenant_id_record uuid; + customer_id_record uuid; + tenant_ttl bigint; + customer_ttl bigint; + deleted_for_entities bigint; + tenant_ttl_ts bigint; + customer_ttl_ts bigint; +BEGIN +OPEN tenant_cursor; +FETCH tenant_cursor INTO tenant_id_record; +WHILE FOUND + LOOP + EXECUTE format( + 'select attribute_kv.long_v from attribute_kv where attribute_kv.entity_id = %L and attribute_kv.attribute_key = (select key_id from attribute_kv_dictionary where key = %L)', + tenant_id_record, 'TTL') INTO tenant_ttl; + if tenant_ttl IS NULL THEN + tenant_ttl := system_ttl; +END IF; + IF tenant_ttl > 0 THEN + tenant_ttl_ts := (EXTRACT(EPOCH FROM current_timestamp) * 1000 - tenant_ttl::bigint * 1000)::bigint; + deleted_for_entities := delete_device_records_from_ts_kv(tenant_id_record, null_uuid, tenant_ttl_ts); + deleted := deleted + deleted_for_entities; + RAISE NOTICE '% telemetry removed for devices where tenant_id = %', deleted_for_entities, tenant_id_record; + deleted_for_entities := delete_asset_records_from_ts_kv(tenant_id_record, null_uuid, tenant_ttl_ts); + deleted := deleted + deleted_for_entities; + RAISE NOTICE '% telemetry removed for assets where tenant_id = %', deleted_for_entities, tenant_id_record; +END IF; +FOR customer_id_record IN +SELECT customer.id AS customer_id FROM customer WHERE customer.tenant_id = tenant_id_record + LOOP + EXECUTE format( + 'select attribute_kv.long_v from attribute_kv where attribute_kv.entity_id = %L and attribute_kv.attribute_key = (select key_id from attribute_kv_dictionary where key = %L)', + customer_id_record, 'TTL') INTO customer_ttl; +IF customer_ttl IS NULL THEN + customer_ttl_ts := tenant_ttl_ts; +ELSE + IF customer_ttl > 0 THEN + customer_ttl_ts := + (EXTRACT(EPOCH FROM current_timestamp) * 1000 - + customer_ttl::bigint * 1000)::bigint; +END IF; +END IF; + IF customer_ttl_ts IS NOT NULL AND customer_ttl_ts > 0 THEN + deleted_for_entities := + delete_customer_records_from_ts_kv(tenant_id_record, customer_id_record, + customer_ttl_ts); + deleted := deleted + deleted_for_entities; + RAISE NOTICE '% telemetry removed for customer with id = % where tenant_id = %', deleted_for_entities, customer_id_record, tenant_id_record; + deleted_for_entities := + delete_device_records_from_ts_kv(tenant_id_record, customer_id_record, + customer_ttl_ts); + deleted := deleted + deleted_for_entities; + RAISE NOTICE '% telemetry removed for devices where tenant_id = % and customer_id = %', deleted_for_entities, tenant_id_record, customer_id_record; + deleted_for_entities := delete_asset_records_from_ts_kv(tenant_id_record, + customer_id_record, + customer_ttl_ts); + deleted := deleted + deleted_for_entities; + RAISE NOTICE '% telemetry removed for assets where tenant_id = % and customer_id = %', deleted_for_entities, tenant_id_record, customer_id_record; +END IF; +END LOOP; +FETCH tenant_cursor INTO tenant_id_record; +END LOOP; +END +$$; \ No newline at end of file diff --git a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java index 3829b3cf45..ac348c1249 100644 --- a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java +++ b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java @@ -265,6 +265,12 @@ public class ThingsboardInstallService { case "3.6.0": log.info("Upgrading ThingsBoard from version 3.6.0 to 3.6.1 ..."); databaseEntitiesUpgradeService.upgradeDatabase("3.6.0"); + case "3.6.2": + log.info("Upgrading ThingsBoard from version 3.6.2 to 3.7.0 ..."); + if (databaseTsUpgradeService != null) { + databaseTsUpgradeService.upgradeDatabase("3.6.2"); + } + databaseEntitiesUpgradeService.upgradeDatabase("3.6.2"); //TODO DON'T FORGET to update switch statement in the CacheCleanupService if you need to clear the cache break; default: diff --git a/application/src/main/java/org/thingsboard/server/service/install/CassandraTsDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/CassandraTsDatabaseUpgradeService.java index 5c7c51eb2c..3e2f29440c 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/CassandraTsDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/CassandraTsDatabaseUpgradeService.java @@ -52,6 +52,7 @@ public class CassandraTsDatabaseUpgradeService extends AbstractCassandraDatabase case "3.1.1": case "3.2.1": case "3.2.2": + case "3.6.2": break; default: throw new RuntimeException("Unable to upgrade Cassandra database, unsupported fromVersion: " + fromVersion); diff --git a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java index c02b7e9cc1..2983dea649 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java @@ -19,12 +19,14 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.SystemUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.EntitySubtype; +import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; @@ -48,6 +50,8 @@ import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import org.thingsboard.server.service.install.sql.SqlDbHelper; import org.thingsboard.server.service.install.update.DefaultDataUpdateService; +import java.io.File; +import java.io.IOException; import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Path; @@ -64,6 +68,8 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; +import static org.thingsboard.server.service.install.AbstractSqlTsDatabaseUpgradeService.PATH_TO_USERS_PUBLIC_FOLDER; +import static org.thingsboard.server.service.install.AbstractSqlTsDatabaseUpgradeService.THINGSBOARD_WINDOWS_UPGRADE_DIR; import static org.thingsboard.server.service.install.DatabaseHelper.ADDITIONAL_INFO; import static org.thingsboard.server.service.install.DatabaseHelper.ASSIGNED_CUSTOMERS; import static org.thingsboard.server.service.install.DatabaseHelper.CONFIGURATION; @@ -89,6 +95,7 @@ import static org.thingsboard.server.service.install.DatabaseHelper.TYPE; public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService { private static final String SCHEMA_UPDATE_SQL = "schema_update.sql"; + private static final String LOAD_ATTRIBUTE_KV_FUNCTIONS_SQL = "schema_update_attribute_kv.sql"; @Value("${spring.datasource.url}") private String dbUrl; @@ -792,11 +799,88 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService log.error("Failed updating schema!!!", e); } break; + case "3.6.2": + try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) { + if (isOldSchema(conn, 3006002)) { + log.info("Updating schema ..."); + schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "3.6.2", LOAD_ATTRIBUTE_KV_FUNCTIONS_SQL); + loadSql(schemaUpdateFile, conn); + + Path pathToTempAttributeKvFile; + if (SystemUtils.IS_OS_WINDOWS) { + pathToTempAttributeKvFile = createTempFileWindows("attribute_kv_temp",".sql"); + executeQuery(conn, "call insert_into_attribute_kv('" + pathToTempAttributeKvFile + "')"); + } else { + pathToTempAttributeKvFile = createTempFile("attribute_kv", "attribute_kv_temp.sql"); + executeQuery(conn, "call insert_into_attribute_kv('" + pathToTempAttributeKvFile + "')"); + } + + //recreate device_info_active_attribute_view + executeQuery(conn, "call recreate_device_info_active_attribute_view()"); + executeQuery(conn, "call recreate_device_info_view()"); + + // remove attribute_kv_old + executeQuery(conn, "call drop_attribute_kv_old_table()"); + + //create index for new table attribute_kv + executeQuery(conn, "CREATE INDEX IF NOT EXISTS idx_attribute_kv_by_key_and_last_update_ts ON attribute_kv(entity_id, attribute_key, last_update_ts desc);"); + + // remove temp files + if (pathToTempAttributeKvFile.toFile().exists()) { + boolean deleteTsKvFile = pathToTempAttributeKvFile.toFile().delete(); + if (deleteTsKvFile) { + log.info("Successfully deleted the temp file for attribute_kv table upgrade!"); + } + } + + executeQuery(conn, "UPDATE tb_schema_settings SET schema_version = 3007000;"); + log.info("Schema updated to version 3.7.0."); + } else { + log.info("Skip schema re-update to version 3.7.0. Use env flag 'SKIP_SCHEMA_VERSION_CHECK' to force the re-update."); + } + } catch (Exception e) { + log.error("Failed updating schema!!!", e); + } + break; default: throw new RuntimeException("Unable to upgrade SQL database, unsupported fromVersion: " + fromVersion); } } + private static Path createTempFile(String tempDirectoryName, String tempFileName) throws IOException { + Path pathToTempAttributeKvFile; + Path tempDirPath = Files.createTempDirectory(tempDirectoryName); + File tempDirAsFile = tempDirPath.toFile(); + boolean writable = tempDirAsFile.setWritable(true, false); + boolean readable = tempDirAsFile.setReadable(true, false); + boolean executable = tempDirAsFile.setExecutable(true, false); + pathToTempAttributeKvFile = tempDirPath.resolve(tempFileName).toAbsolutePath(); + + if (!(writable && readable && executable)) { + throw new RuntimeException("Failed to grant write permissions for the: " + tempDirPath + "folder!"); + } + return pathToTempAttributeKvFile; + } + + private static Path createTempFileWindows(String prefix, String suffix) throws IOException { + Path pathToTempAttributeKvFile; + log.info("Lookup for environment variable: {} ...", THINGSBOARD_WINDOWS_UPGRADE_DIR); + Path pathToDir; + String thingsboardWindowsUpgradeDir = System.getenv("THINGSBOARD_WINDOWS_UPGRADE_DIR"); + if (StringUtils.isNotEmpty(thingsboardWindowsUpgradeDir)) { + log.info("Environment variable: {} was found!", THINGSBOARD_WINDOWS_UPGRADE_DIR); + pathToDir = Paths.get(thingsboardWindowsUpgradeDir); + } else { + log.info("Failed to lookup environment variable: {}", THINGSBOARD_WINDOWS_UPGRADE_DIR); + pathToDir = Paths.get(PATH_TO_USERS_PUBLIC_FOLDER); + } + log.info("Directory: {} will be used for creation temporary upgrade files!", pathToDir); + + Path attributeKvFile = Files.createTempFile(pathToDir, prefix, suffix); + pathToTempAttributeKvFile = attributeKvFile.toAbsolutePath(); + return pathToTempAttributeKvFile; + } + private void runSchemaUpdateScript(Connection connection, String version) throws Exception { Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", version, SCHEMA_UPDATE_SQL); loadSql(schemaUpdateFile, connection); @@ -811,6 +895,19 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService Thread.sleep(5000); } + private void executeQuery(Connection conn, String query) { + try { + Statement statement = conn.createStatement(); + statement.execute(query); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script + printWarnings(statement); + Thread.sleep(2000); + log.info("Successfully executed query: {}", query); + } catch (InterruptedException | SQLException e) { + log.error("Failed to execute query: {} due to: {}", query, e.getMessage()); + throw new RuntimeException("Failed to execute query:" + query + " due to: ", e); + } + } + protected void printWarnings(Statement statement) throws SQLException { SQLWarning warnings = statement.getWarnings(); if (warnings != null) { diff --git a/application/src/main/java/org/thingsboard/server/service/install/SqlTsDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/SqlTsDatabaseUpgradeService.java index 6400463e11..b61da45631 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/SqlTsDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/SqlTsDatabaseUpgradeService.java @@ -213,6 +213,8 @@ public class SqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSer loadSql(conn, LOAD_DROP_PARTITIONS_FUNCTIONS_SQL, "2.4.3"); } break; + case "3.6.2": + break; default: throw new RuntimeException("Unable to upgrade SQL database, unsupported fromVersion: " + fromVersion); } diff --git a/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseUpgradeService.java index 205ec14591..4477bef1c4 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseUpgradeService.java @@ -184,6 +184,11 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr break; case "3.2.2": break; + case "3.6.2": + try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) { + loadSql(conn, LOAD_TTL_FUNCTIONS_SQL, "3.6.2"); + } + break; default: throw new RuntimeException("Unable to upgrade SQL database, unsupported fromVersion: " + fromVersion); } diff --git a/application/src/main/java/org/thingsboard/server/service/query/DefaultEntityQueryService.java b/application/src/main/java/org/thingsboard/server/service/query/DefaultEntityQueryService.java index 89b4002223..296c992ccf 100644 --- a/application/src/main/java/org/thingsboard/server/service/query/DefaultEntityQueryService.java +++ b/application/src/main/java/org/thingsboard/server/service/query/DefaultEntityQueryService.java @@ -251,14 +251,13 @@ public class DefaultEntityQueryService implements EntityQueryService { } if (isAttributes) { - Map> typesMap = ids.stream().collect(Collectors.groupingBy(EntityId::getEntityType)); - List>> futures = new ArrayList<>(typesMap.size()); - typesMap.forEach((type, entityIds) -> futures.add(dbCallbackExecutor.submit(() -> attributesService.findAllKeysByEntityIds(tenantId, type, entityIds)))); - attributesKeysFuture = Futures.transform(Futures.allAsList(futures), lists -> { - if (CollectionUtils.isEmpty(lists)) { + ListenableFuture> future; + future = dbCallbackExecutor.submit(() -> attributesService.findAllKeysByEntityIds(tenantId, ids)); + attributesKeysFuture = Futures.transform(future, list -> { + if (CollectionUtils.isEmpty(list)) { return Collections.emptyList(); } - return lists.stream().flatMap(List::stream).distinct().sorted().collect(Collectors.toList()); + return list.stream().distinct().sorted().collect(Collectors.toList()); }, dbCallbackExecutor); } else { attributesKeysFuture = null; diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java index 8fab864cc5..de1386d0c6 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java @@ -16,7 +16,6 @@ package org.thingsboard.server.dao.attributes; import com.google.common.util.concurrent.ListenableFuture; -import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -45,6 +44,6 @@ public interface AttributesService { List findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId); - List findAllKeysByEntityIds(TenantId tenantId, EntityType entityType, List entityIds); + List findAllKeysByEntityIds(TenantId tenantId, List entityIds); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/AttributeScope.java b/common/data/src/main/java/org/thingsboard/server/common/data/AttributeScope.java new file mode 100644 index 0000000000..83f5210427 --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/AttributeScope.java @@ -0,0 +1,32 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data; + +import lombok.Getter; + +public enum AttributeScope { + + CLIENT_SCOPE(1), + SERVER_SCOPE(2), + SHARED_SCOPE(3); + @Getter + private final int id; + + AttributeScope(int id) { + this.id = id; + } + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeUtils.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeUtils.java index 192d56334d..29c44edd68 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeUtils.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeUtils.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.dao.attributes; +import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.dao.exception.IncorrectParameterException; @@ -27,7 +28,7 @@ public class AttributeUtils { public static void validate(EntityId id, String scope) { Validator.validateId(id.getId(), "Incorrect id " + id); - Validator.validateString(scope, "Incorrect scope " + scope); + Validator.validateEnum(AttributeScope.class, scope, "Incorrect scope " + scope); } public static void validate(List kvEntries, boolean valueNoXssValidation) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java index ffb3b9c229..2f53b0afa7 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java @@ -16,7 +16,7 @@ package org.thingsboard.server.dao.attributes; import com.google.common.util.concurrent.ListenableFuture; -import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -31,17 +31,17 @@ import java.util.Optional; */ public interface AttributesDao { - Optional find(TenantId tenantId, EntityId entityId, String attributeType, String attributeKey); + Optional find(TenantId tenantId, EntityId entityId, AttributeScope attributeScope, String attributeKey); - List find(TenantId tenantId, EntityId entityId, String attributeType, Collection attributeKey); + List find(TenantId tenantId, EntityId entityId, AttributeScope attributeScope, Collection attributeKey); - List findAll(TenantId tenantId, EntityId entityId, String attributeType); + List findAll(TenantId tenantId, EntityId entityId, AttributeScope attributeScope); - ListenableFuture save(TenantId tenantId, EntityId entityId, String attributeType, AttributeKvEntry attribute); + ListenableFuture save(TenantId tenantId, EntityId entityId, AttributeScope attributeScope, AttributeKvEntry attribute); - List> removeAll(TenantId tenantId, EntityId entityId, String attributeType, List keys); + List> removeAll(TenantId tenantId, EntityId entityId, AttributeScope attributeScope, List keys); List findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId); - List findAllKeysByEntityIds(TenantId tenantId, EntityType entityType, List entityIds); + List findAllKeysByEntityIds(TenantId tenantId, List entityIds); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java index f855c116e2..2defbd9398 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java @@ -22,7 +22,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Service; -import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -57,20 +57,20 @@ public class BaseAttributesService implements AttributesService { public ListenableFuture> find(TenantId tenantId, EntityId entityId, String scope, String attributeKey) { validate(entityId, scope); Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey); - return Futures.immediateFuture(attributesDao.find(tenantId, entityId, scope, attributeKey)); + return Futures.immediateFuture(attributesDao.find(tenantId, entityId, AttributeScope.valueOf(scope), attributeKey)); } @Override public ListenableFuture> find(TenantId tenantId, EntityId entityId, String scope, Collection attributeKeys) { validate(entityId, scope); attributeKeys.forEach(attributeKey -> Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey)); - return Futures.immediateFuture(attributesDao.find(tenantId, entityId, scope, attributeKeys)); + return Futures.immediateFuture(attributesDao.find(tenantId, entityId, AttributeScope.valueOf(scope), attributeKeys)); } @Override public ListenableFuture> findAll(TenantId tenantId, EntityId entityId, String scope) { validate(entityId, scope); - return Futures.immediateFuture(attributesDao.findAll(tenantId, entityId, scope)); + return Futures.immediateFuture(attributesDao.findAll(tenantId, entityId, AttributeScope.valueOf(scope))); } @Override @@ -79,28 +79,28 @@ public class BaseAttributesService implements AttributesService { } @Override - public List findAllKeysByEntityIds(TenantId tenantId, EntityType entityType, List entityIds) { - return attributesDao.findAllKeysByEntityIds(tenantId, entityType, entityIds); + public List findAllKeysByEntityIds(TenantId tenantId, List entityIds) { + return attributesDao.findAllKeysByEntityIds(tenantId, entityIds); } @Override public ListenableFuture save(TenantId tenantId, EntityId entityId, String scope, AttributeKvEntry attribute) { validate(entityId, scope); AttributeUtils.validate(attribute, valueNoXssValidation); - return attributesDao.save(tenantId, entityId, scope, attribute); + return attributesDao.save(tenantId, entityId, AttributeScope.valueOf(scope), attribute); } @Override public ListenableFuture> save(TenantId tenantId, EntityId entityId, String scope, List attributes) { validate(entityId, scope); AttributeUtils.validate(attributes, valueNoXssValidation); - List> saveFutures = attributes.stream().map(attribute -> attributesDao.save(tenantId, entityId, scope, attribute)).collect(Collectors.toList()); + List> saveFutures = attributes.stream().map(attribute -> attributesDao.save(tenantId, entityId, AttributeScope.valueOf(scope), attribute)).collect(Collectors.toList()); return Futures.allAsList(saveFutures); } @Override public ListenableFuture> removeAll(TenantId tenantId, EntityId entityId, String scope, List attributeKeys) { validate(entityId, scope); - return Futures.allAsList(attributesDao.removeAll(tenantId, entityId, scope, attributeKeys)); + return Futures.allAsList(attributesDao.removeAll(tenantId, entityId, AttributeScope.valueOf(scope), attributeKeys)); } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java index 322d725ff3..327f6b5706 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java @@ -26,7 +26,7 @@ import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Service; import org.thingsboard.server.cache.TbCacheValueWrapper; import org.thingsboard.server.cache.TbTransactionalCache; -import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; @@ -120,7 +120,7 @@ public class CachedAttributesService implements AttributesService { return cacheExecutor.submit(() -> { var cacheTransaction = cache.newTransactionForKey(attributeCacheKey); try { - Optional result = attributesDao.find(tenantId, entityId, scope, attributeKey); + Optional result = attributesDao.find(tenantId, entityId, AttributeScope.valueOf(scope), attributeKey); cacheTransaction.putIfAbsent(attributeCacheKey, result.orElse(null)); cacheTransaction.commit(); return result; @@ -159,7 +159,7 @@ public class CachedAttributesService implements AttributesService { var cacheTransaction = cache.newTransactionForKeys(notFoundKeys); try { log.trace("[{}][{}] Lookup attributes from db: {}", entityId, scope, notFoundAttributeKeys); - List result = attributesDao.find(tenantId, entityId, scope, notFoundAttributeKeys); + List result = attributesDao.find(tenantId, entityId, AttributeScope.valueOf(scope), notFoundAttributeKeys); for (AttributeKvEntry foundInDbAttribute : result) { AttributeCacheKey attributeCacheKey = new AttributeCacheKey(scope, entityId, foundInDbAttribute.getKey()); cacheTransaction.putIfAbsent(attributeCacheKey, foundInDbAttribute); @@ -198,7 +198,7 @@ public class CachedAttributesService implements AttributesService { @Override public ListenableFuture> findAll(TenantId tenantId, EntityId entityId, String scope) { validate(entityId, scope); - return Futures.immediateFuture(attributesDao.findAll(tenantId, entityId, scope)); + return Futures.immediateFuture(attributesDao.findAll(tenantId, entityId, AttributeScope.valueOf(scope))); } @Override @@ -207,15 +207,15 @@ public class CachedAttributesService implements AttributesService { } @Override - public List findAllKeysByEntityIds(TenantId tenantId, EntityType entityType, List entityIds) { - return attributesDao.findAllKeysByEntityIds(tenantId, entityType, entityIds); + public List findAllKeysByEntityIds(TenantId tenantId, List entityIds) { + return attributesDao.findAllKeysByEntityIds(tenantId, entityIds); } @Override public ListenableFuture save(TenantId tenantId, EntityId entityId, String scope, AttributeKvEntry attribute) { validate(entityId, scope); AttributeUtils.validate(attribute, valueNoXssValidation); - ListenableFuture future = attributesDao.save(tenantId, entityId, scope, attribute); + ListenableFuture future = attributesDao.save(tenantId, entityId, AttributeScope.valueOf(scope), attribute); return Futures.transform(future, key -> evict(entityId, scope, attribute, key), cacheExecutor); } @@ -226,7 +226,7 @@ public class CachedAttributesService implements AttributesService { List> futures = new ArrayList<>(attributes.size()); for (var attribute : attributes) { - ListenableFuture future = attributesDao.save(tenantId, entityId, scope, attribute); + ListenableFuture future = attributesDao.save(tenantId, entityId, AttributeScope.valueOf(scope), attribute); futures.add(Futures.transform(future, key -> evict(entityId, scope, attribute, key), cacheExecutor)); } @@ -243,7 +243,7 @@ public class CachedAttributesService implements AttributesService { @Override public ListenableFuture> removeAll(TenantId tenantId, EntityId entityId, String scope, List attributeKeys) { validate(entityId, scope); - List> futures = attributesDao.removeAll(tenantId, entityId, scope, attributeKeys); + List> futures = attributesDao.removeAll(tenantId, entityId, AttributeScope.valueOf(scope), attributeKeys); return Futures.allAsList(futures.stream().map(future -> Futures.transform(future, key -> { cache.evict(new AttributeCacheKey(scope, entityId, key)); return key; diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvCompositeKey.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvCompositeKey.java index 936a18eeeb..80be913468 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvCompositeKey.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvCompositeKey.java @@ -17,12 +17,9 @@ package org.thingsboard.server.dao.model.sql; import jakarta.persistence.Column; import jakarta.persistence.Embeddable; -import jakarta.persistence.EnumType; -import jakarta.persistence.Enumerated; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; -import org.thingsboard.server.common.data.EntityType; import java.io.Serializable; import java.util.UUID; @@ -30,20 +27,16 @@ import java.util.UUID; import static org.thingsboard.server.dao.model.ModelConstants.ATTRIBUTE_KEY_COLUMN; import static org.thingsboard.server.dao.model.ModelConstants.ATTRIBUTE_TYPE_COLUMN; import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_ID_COLUMN; -import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_TYPE_COLUMN; @Data @AllArgsConstructor @NoArgsConstructor @Embeddable public class AttributeKvCompositeKey implements Serializable { - @Enumerated(EnumType.STRING) - @Column(name = ENTITY_TYPE_COLUMN) - private EntityType entityType; @Column(name = ENTITY_ID_COLUMN, columnDefinition = "uuid") private UUID entityId; @Column(name = ATTRIBUTE_TYPE_COLUMN) - private String attributeType; + private int attributeType; @Column(name = ATTRIBUTE_KEY_COLUMN) - private String attributeKey; + private int attributeKey; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvDictionary.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvDictionary.java new file mode 100644 index 0000000000..1cde08912e --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvDictionary.java @@ -0,0 +1,41 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.model.sql; + +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.Id; +import jakarta.persistence.Table; +import lombok.Data; +import org.hibernate.annotations.Generated; + +import static org.thingsboard.server.dao.model.ModelConstants.KEY_COLUMN; +import static org.thingsboard.server.dao.model.ModelConstants.KEY_ID_COLUMN; + +@Data +@Entity +@Table(name = "attribute_kv_dictionary") +public final class AttributeKvDictionary { + + @Id + @Column(name = KEY_COLUMN) + private String key; + + @Column(name = KEY_ID_COLUMN, unique = true, columnDefinition="int") + @Generated + private int keyId; + +} \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java index bc97c0394f..1bb23aaa89 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java @@ -30,6 +30,7 @@ import jakarta.persistence.Column; import jakarta.persistence.EmbeddedId; import jakarta.persistence.Entity; import jakarta.persistence.Table; +import jakarta.persistence.Transient; import java.io.Serializable; import static org.thingsboard.server.dao.model.ModelConstants.BOOLEAN_VALUE_COLUMN; @@ -65,19 +66,22 @@ public class AttributeKvEntity implements ToData, Serializable @Column(name = LAST_UPDATE_TS_COLUMN) private Long lastUpdateTs; + @Transient + protected String strKey; + @Override public AttributeKvEntry toData() { KvEntry kvEntry = null; if (strValue != null) { - kvEntry = new StringDataEntry(id.getAttributeKey(), strValue); + kvEntry = new StringDataEntry(strKey, strValue); } else if (booleanValue != null) { - kvEntry = new BooleanDataEntry(id.getAttributeKey(), booleanValue); + kvEntry = new BooleanDataEntry(strKey, booleanValue); } else if (doubleValue != null) { - kvEntry = new DoubleDataEntry(id.getAttributeKey(), doubleValue); + kvEntry = new DoubleDataEntry(strKey, doubleValue); } else if (longValue != null) { - kvEntry = new LongDataEntry(id.getAttributeKey(), longValue); + kvEntry = new LongDataEntry(strKey, longValue); } else if (jsonValue != null) { - kvEntry = new JsonDataEntry(id.getAttributeKey(), jsonValue); + kvEntry = new JsonDataEntry(strKey, jsonValue); } return new BaseAttributeKvEntry(kvEntry, lastUpdateTs); diff --git a/dao/src/main/java/org/thingsboard/server/dao/service/Validator.java b/dao/src/main/java/org/thingsboard/server/dao/service/Validator.java index d8c4ffddc9..5bf14d06ae 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/service/Validator.java +++ b/dao/src/main/java/org/thingsboard/server/dao/service/Validator.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.dao.service; +import org.apache.commons.lang3.EnumUtils; import org.apache.commons.lang3.StringUtils; import org.thingsboard.common.util.RegexUtils; import org.thingsboard.server.common.data.id.EntityId; @@ -59,6 +60,19 @@ public class Validator { } } + /** + * This method validate String string. If string is null or can not be cast to enum than throw + * IncorrectParameterException exception + * + * @param val the val + * @param errorMessage the error message for exception + */ + public static > void validateEnum(Class enumClass, String val, String errorMessage) { + if (val == null || !EnumUtils.isValidEnum(enumClass, val)) { + throw new IncorrectParameterException(errorMessage); + } + } + /** * This method validate long value. If value isn't possitive than throw diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvDictionaryRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvDictionaryRepository.java new file mode 100644 index 0000000000..7af7df70ea --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvDictionaryRepository.java @@ -0,0 +1,29 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.sql.attributes; + +import org.springframework.data.jpa.repository.JpaRepository; +import org.thingsboard.server.dao.model.sql.AttributeKvDictionary; +import org.thingsboard.server.dao.util.SqlTsOrTsLatestAnyDao; + +import java.util.Optional; + +@SqlTsOrTsLatestAnyDao +public interface AttributeKvDictionaryRepository extends JpaRepository { + + Optional findByKeyId(int keyId); + +} \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvInsertRepository.java index 75b7412985..6b33e45546 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvInsertRepository.java @@ -43,12 +43,12 @@ public abstract class AttributeKvInsertRepository { private static final String EMPTY_STR = ""; private static final String BATCH_UPDATE = "UPDATE attribute_kv SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), last_update_ts = ? " + - "WHERE entity_type = ? and entity_id = ? and attribute_type =? and attribute_key = ?;"; + "WHERE entity_id = ? and attribute_type =? and attribute_key = ?;"; private static final String INSERT_OR_UPDATE = - "INSERT INTO attribute_kv (entity_type, entity_id, attribute_type, attribute_key, str_v, long_v, dbl_v, bool_v, json_v, last_update_ts) " + - "VALUES(?, ?, ?, ?, ?, ?, ?, ?, cast(? AS json), ?) " + - "ON CONFLICT (entity_type, entity_id, attribute_type, attribute_key) " + + "INSERT INTO attribute_kv (entity_id, attribute_type, attribute_key, str_v, long_v, dbl_v, bool_v, json_v, last_update_ts) " + + "VALUES(?, ?, ?, ?, ?, ?, ?, cast(? AS json), ?) " + + "ON CONFLICT (entity_id, attribute_type, attribute_key) " + "DO UPDATE SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), last_update_ts = ?;"; @Autowired @@ -91,10 +91,9 @@ public abstract class AttributeKvInsertRepository { ps.setString(5, replaceNullChars(kvEntity.getJsonValue())); ps.setLong(6, kvEntity.getLastUpdateTs()); - ps.setString(7, kvEntity.getId().getEntityType().name()); - ps.setObject(8, kvEntity.getId().getEntityId()); - ps.setString(9, kvEntity.getId().getAttributeType()); - ps.setString(10, kvEntity.getId().getAttributeKey()); + ps.setObject(7, kvEntity.getId().getEntityId()); + ps.setInt(8, kvEntity.getId().getAttributeType()); + ps.setInt(9, kvEntity.getId().getAttributeKey()); } @Override @@ -121,43 +120,42 @@ public abstract class AttributeKvInsertRepository { @Override public void setValues(PreparedStatement ps, int i) throws SQLException { AttributeKvEntity kvEntity = insertEntities.get(i); - ps.setString(1, kvEntity.getId().getEntityType().name()); - ps.setObject(2, kvEntity.getId().getEntityId()); - ps.setString(3, kvEntity.getId().getAttributeType()); - ps.setString(4, kvEntity.getId().getAttributeKey()); + ps.setObject(1, kvEntity.getId().getEntityId()); + ps.setInt(2, kvEntity.getId().getAttributeType()); + ps.setInt(3, kvEntity.getId().getAttributeKey()); - ps.setString(5, replaceNullChars(kvEntity.getStrValue())); - ps.setString(11, replaceNullChars(kvEntity.getStrValue())); + ps.setString(4, replaceNullChars(kvEntity.getStrValue())); + ps.setString(10, replaceNullChars(kvEntity.getStrValue())); if (kvEntity.getLongValue() != null) { - ps.setLong(6, kvEntity.getLongValue()); - ps.setLong(12, kvEntity.getLongValue()); + ps.setLong(5, kvEntity.getLongValue()); + ps.setLong(11, kvEntity.getLongValue()); } else { - ps.setNull(6, Types.BIGINT); - ps.setNull(12, Types.BIGINT); + ps.setNull(5, Types.BIGINT); + ps.setNull(11, Types.BIGINT); } if (kvEntity.getDoubleValue() != null) { - ps.setDouble(7, kvEntity.getDoubleValue()); - ps.setDouble(13, kvEntity.getDoubleValue()); + ps.setDouble(6, kvEntity.getDoubleValue()); + ps.setDouble(12, kvEntity.getDoubleValue()); } else { - ps.setNull(7, Types.DOUBLE); - ps.setNull(13, Types.DOUBLE); + ps.setNull(6, Types.DOUBLE); + ps.setNull(12, Types.DOUBLE); } if (kvEntity.getBooleanValue() != null) { - ps.setBoolean(8, kvEntity.getBooleanValue()); - ps.setBoolean(14, kvEntity.getBooleanValue()); + ps.setBoolean(7, kvEntity.getBooleanValue()); + ps.setBoolean(13, kvEntity.getBooleanValue()); } else { - ps.setNull(8, Types.BOOLEAN); - ps.setNull(14, Types.BOOLEAN); + ps.setNull(7, Types.BOOLEAN); + ps.setNull(13, Types.BOOLEAN); } - ps.setString(9, replaceNullChars(kvEntity.getJsonValue())); - ps.setString(15, replaceNullChars(kvEntity.getJsonValue())); + ps.setString(8, replaceNullChars(kvEntity.getJsonValue())); + ps.setString(14, replaceNullChars(kvEntity.getJsonValue())); - ps.setLong(10, kvEntity.getLastUpdateTs()); - ps.setLong(16, kvEntity.getLastUpdateTs()); + ps.setLong(9, kvEntity.getLastUpdateTs()); + ps.setLong(15, kvEntity.getLastUpdateTs()); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java index 6a0cdfabd0..a43be3e49d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java @@ -20,7 +20,6 @@ import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; import org.springframework.transaction.annotation.Transactional; -import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.dao.model.sql.AttributeKvCompositeKey; import org.thingsboard.server.dao.model.sql.AttributeKvEntity; @@ -29,35 +28,31 @@ import java.util.UUID; public interface AttributeKvRepository extends JpaRepository { - @Query("SELECT a FROM AttributeKvEntity a WHERE a.id.entityType = :entityType " + - "AND a.id.entityId = :entityId " + + @Query("SELECT a FROM AttributeKvEntity a WHERE a.id.entityId = :entityId " + "AND a.id.attributeType = :attributeType") - List findAllByEntityTypeAndEntityIdAndAttributeType(@Param("entityType") EntityType entityType, - @Param("entityId") UUID entityId, - @Param("attributeType") String attributeType); + List findAllEntityIdAndAttributeType(@Param("entityId") UUID entityId, + @Param("attributeType") int attributeType); @Transactional @Modifying - @Query("DELETE FROM AttributeKvEntity a WHERE a.id.entityType = :entityType " + - "AND a.id.entityId = :entityId " + + @Query("DELETE FROM AttributeKvEntity a WHERE a.id.entityId = :entityId " + "AND a.id.attributeType = :attributeType " + "AND a.id.attributeKey = :attributeKey") - void delete(@Param("entityType") EntityType entityType, - @Param("entityId") UUID entityId, - @Param("attributeType") String attributeType, - @Param("attributeKey") String attributeKey); + void delete(@Param("entityId") UUID entityId, + @Param("attributeType") int attributeType, + @Param("attributeKey") int attributeKey); @Query(value = "SELECT DISTINCT attribute_key FROM attribute_kv WHERE entity_type = 'DEVICE' " + "AND entity_id in (SELECT id FROM device WHERE tenant_id = :tenantId and device_profile_id = :deviceProfileId limit 100) ORDER BY attribute_key", nativeQuery = true) - List findAllKeysByDeviceProfileId(@Param("tenantId") UUID tenantId, + List findAllKeysByDeviceProfileId(@Param("tenantId") UUID tenantId, @Param("deviceProfileId") UUID deviceProfileId); @Query(value = "SELECT DISTINCT attribute_key FROM attribute_kv WHERE entity_type = 'DEVICE' " + "AND entity_id in (SELECT id FROM device WHERE tenant_id = :tenantId limit 100) ORDER BY attribute_key", nativeQuery = true) - List findAllKeysByTenantId(@Param("tenantId") UUID tenantId); + List findAllKeysByTenantId(@Param("tenantId") UUID tenantId); - @Query(value = "SELECT DISTINCT attribute_key FROM attribute_kv WHERE entity_type = :entityType " + - "AND entity_id in :entityIds ORDER BY attribute_key", nativeQuery = true) - List findAllKeysByEntityIds(@Param("entityType") String entityType, @Param("entityIds") List entityIds); + @Query(value = "SELECT DISTINCT attribute_key FROM attribute_kv WHERE " + + "entity_id in :entityIds ORDER BY attribute_key", nativeQuery = true) + List findAllKeysByEntityIds(@Param("entityIds") List entityIds); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java index 6d195d1cc4..c47cd9dbdf 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java @@ -22,10 +22,12 @@ import com.google.common.util.concurrent.MoreExecutors; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; +import org.hibernate.exception.ConstraintViolationException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.dao.DataIntegrityViolationException; import org.springframework.stereotype.Component; -import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -34,6 +36,7 @@ import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.attributes.AttributesDao; import org.thingsboard.server.dao.model.sql.AttributeKvCompositeKey; +import org.thingsboard.server.dao.model.sql.AttributeKvDictionary; import org.thingsboard.server.dao.model.sql.AttributeKvEntity; import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService; import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent; @@ -46,6 +49,9 @@ import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import java.util.stream.Collectors; @@ -57,6 +63,9 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl @Autowired ScheduledLogExecutorComponent logExecutor; + @Autowired + private AttributeKvDictionaryRepository dictionaryRepository; + @Autowired private AttributeKvRepository attributeKvRepository; @@ -81,6 +90,9 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl @Value("${sql.batch_sort:true}") private boolean batchSortEnabled; + private final ConcurrentMap attributeDictionaryMap = new ConcurrentHashMap<>(); + private static final ReentrantLock attributeCreationLock = new ReentrantLock(); + private TbSqlBlockingQueueWrapper queue; @PostConstruct @@ -98,7 +110,6 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl queue = new TbSqlBlockingQueueWrapper<>(params, hashcodeFunction, batchThreads, statsFactory); queue.init(logExecutor, v -> attributeKvInsertRepository.saveOrUpdate(v), Comparator.comparing((AttributeKvEntity attributeKvEntity) -> attributeKvEntity.getId().getEntityId()) - .thenComparing(attributeKvEntity -> attributeKvEntity.getId().getEntityType().name()) .thenComparing(attributeKvEntity -> attributeKvEntity.getId().getAttributeType()) .thenComparing(attributeKvEntity -> attributeKvEntity.getId().getAttributeKey()) ); @@ -112,81 +123,131 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl } @Override - public Optional find(TenantId tenantId, EntityId entityId, String attributeType, String attributeKey) { + public Optional find(TenantId tenantId, EntityId entityId, AttributeScope attributeScope, String attributeKey) { AttributeKvCompositeKey compositeKey = - getAttributeKvCompositeKey(entityId, attributeType, attributeKey); - return Optional.ofNullable(DaoUtil.getData(attributeKvRepository.findById(compositeKey))); + getAttributeKvCompositeKey(entityId, attributeScope.getId(), getOrSaveKeyId(attributeKey)); + Optional attributeKvEntityOptional = attributeKvRepository.findById(compositeKey); + if (attributeKvEntityOptional.isPresent()) { + AttributeKvEntity attributeKvEntity = attributeKvEntityOptional.get(); + attributeKvEntity.setStrKey(attributeKey); + return Optional.ofNullable(DaoUtil.getData(attributeKvEntity)); + } + return Optional.ofNullable(DaoUtil.getData(attributeKvEntityOptional)); } @Override - public List find(TenantId tenantId, EntityId entityId, String attributeType, Collection attributeKeys) { + public List find(TenantId tenantId, EntityId entityId, AttributeScope attributeScope, Collection attributeKeys) { List compositeKeys = attributeKeys .stream() .map(attributeKey -> - getAttributeKvCompositeKey(entityId, attributeType, attributeKey)) + getAttributeKvCompositeKey(entityId, attributeScope.getId(), getOrSaveKeyId(attributeKey))) .collect(Collectors.toList()); - return DaoUtil.convertDataList(Lists.newArrayList(attributeKvRepository.findAllById(compositeKeys))); + List attributes = attributeKvRepository.findAllById(compositeKeys); + attributes.forEach(attributeKvEntity -> attributeKvEntity.setStrKey(getKey(attributeKvEntity.getId().getAttributeKey()))); + return DaoUtil.convertDataList(Lists.newArrayList(attributes)); } @Override - public List findAll(TenantId tenantId, EntityId entityId, String attributeType) { + public List findAll(TenantId tenantId, EntityId entityId, AttributeScope attributeScope) { + List attributes = attributeKvRepository.findAllEntityIdAndAttributeType( + entityId.getId(), + attributeScope.getId()); + attributes.forEach(attributeKvEntity -> attributeKvEntity.setStrKey(getKey(attributeKvEntity.getId().getAttributeKey()))); return DaoUtil.convertDataList(Lists.newArrayList( - attributeKvRepository.findAllByEntityTypeAndEntityIdAndAttributeType( - entityId.getEntityType(), - entityId.getId(), - attributeType))); + attributes)); } @Override public List findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId) { if (deviceProfileId != null) { - return attributeKvRepository.findAllKeysByDeviceProfileId(tenantId.getId(), deviceProfileId.getId()); + return attributeKvRepository.findAllKeysByDeviceProfileId(tenantId.getId(), deviceProfileId.getId()) + .stream().map(this::getKey).collect(Collectors.toList()); } else { - return attributeKvRepository.findAllKeysByTenantId(tenantId.getId()); + return attributeKvRepository.findAllKeysByTenantId(tenantId.getId()) + .stream().map(this::getKey).collect(Collectors.toList()); } } @Override - public List findAllKeysByEntityIds(TenantId tenantId, EntityType entityType, List entityIds) { + public List findAllKeysByEntityIds(TenantId tenantId, List entityIds) { return attributeKvRepository - .findAllKeysByEntityIds(entityType.name(), entityIds.stream().map(EntityId::getId).collect(Collectors.toList())); + .findAllKeysByEntityIds(entityIds.stream().map(EntityId::getId).collect(Collectors.toList())) + .stream().map(this::getKey).collect(Collectors.toList()); } @Override - public ListenableFuture save(TenantId tenantId, EntityId entityId, String attributeType, AttributeKvEntry attribute) { + public ListenableFuture save(TenantId tenantId, EntityId entityId, AttributeScope attributeScope, AttributeKvEntry attribute) { AttributeKvEntity entity = new AttributeKvEntity(); - entity.setId(new AttributeKvCompositeKey(entityId.getEntityType(), entityId.getId(), attributeType, attribute.getKey())); + entity.setId(new AttributeKvCompositeKey(entityId.getId(), attributeScope.getId(), getOrSaveKeyId(attribute.getKey()))); entity.setLastUpdateTs(attribute.getLastUpdateTs()); entity.setStrValue(attribute.getStrValue().orElse(null)); entity.setDoubleValue(attribute.getDoubleValue().orElse(null)); entity.setLongValue(attribute.getLongValue().orElse(null)); entity.setBooleanValue(attribute.getBooleanValue().orElse(null)); entity.setJsonValue(attribute.getJsonValue().orElse(null)); - return addToQueue(entity); + return addToQueue(entity, attribute.getKey()); } - private ListenableFuture addToQueue(AttributeKvEntity entity) { - return Futures.transform(queue.add(entity), v -> entity.getId().getAttributeKey(), MoreExecutors.directExecutor()); + private ListenableFuture addToQueue(AttributeKvEntity entity, String key) { + return Futures.transform(queue.add(entity), v -> key, MoreExecutors.directExecutor()); } @Override - public List> removeAll(TenantId tenantId, EntityId entityId, String attributeType, List keys) { + public List> removeAll(TenantId tenantId, EntityId entityId, AttributeScope attributeScope, List keys) { List> futuresList = new ArrayList<>(keys.size()); for (String key : keys) { futuresList.add(service.submit(() -> { - attributeKvRepository.delete(entityId.getEntityType(), entityId.getId(), attributeType, key); + attributeKvRepository.delete(entityId.getId(), attributeScope.getId(), getOrSaveKeyId(key)); return key; })); } return futuresList; } - private AttributeKvCompositeKey getAttributeKvCompositeKey(EntityId entityId, String attributeType, String attributeKey) { + private AttributeKvCompositeKey getAttributeKvCompositeKey(EntityId entityId, Integer attributeType, Integer attributeKey) { return new AttributeKvCompositeKey( - entityId.getEntityType(), entityId.getId(), attributeType, attributeKey); } + + private Integer getOrSaveKeyId(String attributeKey) { + Integer keyId = attributeDictionaryMap.get(attributeKey); + if (keyId == null) { + Optional byIdOptional = dictionaryRepository.findById(attributeKey); + if (byIdOptional.isEmpty()) { + attributeCreationLock.lock(); + try { + byIdOptional = dictionaryRepository.findById(attributeKey); + if (byIdOptional.isEmpty()) { + AttributeKvDictionary attributeKvDictionary = new AttributeKvDictionary(); + attributeKvDictionary.setKey(attributeKey); + try { + AttributeKvDictionary saved = dictionaryRepository.save(attributeKvDictionary); + attributeDictionaryMap.put(saved.getKey(), saved.getKeyId()); + keyId = saved.getKeyId(); + } catch (DataIntegrityViolationException | ConstraintViolationException e) { + byIdOptional = dictionaryRepository.findById(attributeKey); + AttributeKvDictionary dictionary = byIdOptional.orElseThrow(() -> new RuntimeException("Failed to get AttributeKvDictionary entity from DB!")); + attributeDictionaryMap.put(dictionary.getKey(), dictionary.getKeyId()); + keyId = dictionary.getKeyId(); + } + } else { + keyId = byIdOptional.get().getKeyId(); + } + } finally { + attributeCreationLock.unlock(); + } + } else { + keyId = byIdOptional.get().getKeyId(); + attributeDictionaryMap.put(attributeKey, keyId); + } + } + return keyId; + } + private String getKey(Integer attributeKey) { + Optional byKeyId = dictionaryRepository.findByKeyId(attributeKey); + return byKeyId.map(AttributeKvDictionary::getKey).orElse(null); + } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/query/EntityKeyMapping.java b/dao/src/main/java/org/thingsboard/server/dao/sql/query/EntityKeyMapping.java index e9adae29ff..200e93765d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/query/EntityKeyMapping.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/query/EntityKeyMapping.java @@ -16,7 +16,7 @@ package org.thingsboard.server.dao.sql.query; import lombok.Data; -import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.query.BooleanFilterPredicate; @@ -262,22 +262,22 @@ public class EntityKeyMapping { String query; if (!entityKey.getType().equals(EntityKeyType.ATTRIBUTE)) { String join = (hasFilter() && hasFilterValues(ctx)) ? "inner join" : "left join"; - query = String.format("%s attribute_kv %s ON %s.entity_id=entities.id AND %s.entity_type=%s AND %s.attribute_key=:%s_key_id ", - join, alias, alias, alias, entityTypeStr, alias, alias); - String scope; + query = String.format("%s attribute_kv %s ON %s.entity_id=entities.id AND %s.attribute_key=(select key_id from attribute_kv_dictionary where key = :%s_key_id) ", + join, alias, alias, alias, alias); + int scope; if (entityKey.getType().equals(EntityKeyType.CLIENT_ATTRIBUTE)) { - scope = DataConstants.CLIENT_SCOPE; + scope = AttributeScope.CLIENT_SCOPE.getId(); } else if (entityKey.getType().equals(EntityKeyType.SHARED_ATTRIBUTE)) { - scope = DataConstants.SHARED_SCOPE; + scope = AttributeScope.SHARED_SCOPE.getId();; } else { - scope = DataConstants.SERVER_SCOPE; + scope = AttributeScope.SERVER_SCOPE.getId();; } - query = String.format("%s AND %s.attribute_type='%s' %s", query, alias, scope, filterQuery); + query = String.format("%s AND %s.attribute_type=%s %s", query, alias, scope, filterQuery); } else { String join = (hasFilter() && hasFilterValues(ctx)) ? "join LATERAL" : "left join LATERAL"; - query = String.format("%s (select * from attribute_kv %s WHERE %s.entity_id=entities.id AND %s.entity_type=%s AND %s.attribute_key=:%s_key_id %s " + + query = String.format("%s (select * from attribute_kv %s WHERE %s.entity_id=entities.id AND %s.attribute_key=(select key_id from attribute_kv_dictionary where key = :%s_key_id) %s " + "ORDER BY %s.last_update_ts DESC limit 1) as %s ON true", - join, alias, alias, alias, entityTypeStr, alias, alias, filterQuery, alias, alias); + join, alias, alias, alias, alias, filterQuery, alias, alias); } return query; } diff --git a/dao/src/main/resources/sql/schema-entities.sql b/dao/src/main/resources/sql/schema-entities.sql index 85b09e2053..f376a777d3 100644 --- a/dao/src/main/resources/sql/schema-entities.sql +++ b/dao/src/main/resources/sql/schema-entities.sql @@ -103,17 +103,22 @@ CREATE TABLE IF NOT EXISTS audit_log ( ) PARTITION BY RANGE (created_time); CREATE TABLE IF NOT EXISTS attribute_kv ( - entity_type varchar(255), entity_id uuid, - attribute_type varchar(255), - attribute_key varchar(255), + attribute_type int, + attribute_key int, bool_v boolean, str_v varchar(10000000), long_v bigint, dbl_v double precision, json_v json, last_update_ts bigint, - CONSTRAINT attribute_kv_pkey PRIMARY KEY (entity_type, entity_id, attribute_type, attribute_key) + CONSTRAINT attribute_kv_pkey PRIMARY KEY (entity_id, attribute_type, attribute_key) +); + +CREATE TABLE IF NOT EXISTS attribute_kv_dictionary ( + key varchar(255) NOT NULL, + key_id serial UNIQUE, + CONSTRAINT attribute_key_id_pkey PRIMARY KEY (key) ); CREATE TABLE IF NOT EXISTS component_descriptor ( diff --git a/dao/src/main/resources/sql/schema-timescale.sql b/dao/src/main/resources/sql/schema-timescale.sql index ed15566268..0a60b64a50 100644 --- a/dao/src/main/resources/sql/schema-timescale.sql +++ b/dao/src/main/resources/sql/schema-timescale.sql @@ -104,7 +104,7 @@ BEGIN WHILE FOUND LOOP EXECUTE format( - 'select attribute_kv.long_v from attribute_kv where attribute_kv.entity_id = %L and attribute_kv.attribute_key = %L', + 'select attribute_kv.long_v from attribute_kv where attribute_kv.entity_id = %L and attribute_kv.attribute_key = (select key_id from attribute_kv_dictionary where key = %L)', tenant_id_record, 'TTL') INTO tenant_ttl; if tenant_ttl IS NULL THEN tenant_ttl := system_ttl; @@ -122,7 +122,7 @@ BEGIN SELECT customer.id AS customer_id FROM customer WHERE customer.tenant_id = tenant_id_record LOOP EXECUTE format( - 'select attribute_kv.long_v from attribute_kv where attribute_kv.entity_id = %L and attribute_kv.attribute_key = %L', + 'select attribute_kv.long_v from attribute_kv where attribute_kv.entity_id = %L and attribute_kv.attribute_key = (select key_id from attribute_kv_dictionary where key = %L)', customer_id_record, 'TTL') INTO customer_ttl; IF customer_ttl IS NULL THEN customer_ttl_ts := tenant_ttl_ts; diff --git a/dao/src/main/resources/sql/schema-ts-psql.sql b/dao/src/main/resources/sql/schema-ts-psql.sql index 8b2b80203e..3f8f380b03 100644 --- a/dao/src/main/resources/sql/schema-ts-psql.sql +++ b/dao/src/main/resources/sql/schema-ts-psql.sql @@ -272,7 +272,7 @@ BEGIN WHILE FOUND LOOP EXECUTE format( - 'select attribute_kv.long_v from attribute_kv where attribute_kv.entity_id = %L and attribute_kv.attribute_key = %L', + 'select attribute_kv.long_v from attribute_kv where attribute_kv.entity_id = %L and attribute_kv.attribute_key = (select key_id from attribute_kv_dictionary where key = %L)', tenant_id_record, 'TTL') INTO tenant_ttl; if tenant_ttl IS NULL THEN tenant_ttl := system_ttl; @@ -290,7 +290,7 @@ BEGIN SELECT customer.id AS customer_id FROM customer WHERE customer.tenant_id = tenant_id_record LOOP EXECUTE format( - 'select attribute_kv.long_v from attribute_kv where attribute_kv.entity_id = %L and attribute_kv.attribute_key = %L', + 'select attribute_kv.long_v from attribute_kv where attribute_kv.entity_id = %L and attribute_kv.attribute_key = (select key_id from attribute_kv_dictionary where key = %L)', customer_id_record, 'TTL') INTO customer_ttl; IF customer_ttl IS NULL THEN customer_ttl_ts := tenant_ttl_ts; diff --git a/dao/src/main/resources/sql/schema-views-and-functions.sql b/dao/src/main/resources/sql/schema-views-and-functions.sql index 1aa5647bde..4583b71e92 100644 --- a/dao/src/main/resources/sql/schema-views-and-functions.sql +++ b/dao/src/main/resources/sql/schema-views-and-functions.sql @@ -23,7 +23,7 @@ SELECT d.* , COALESCE(da.bool_v, FALSE) as active FROM device d LEFT JOIN customer c ON c.id = d.customer_id - LEFT JOIN attribute_kv da ON da.entity_type = 'DEVICE' AND da.entity_id = d.id AND da.attribute_type = 'SERVER_SCOPE' AND da.attribute_key = 'active'; + LEFT JOIN attribute_kv da ON da.entity_id = d.id AND da.attribute_type = 2 AND da.attribute_key = (select key_id from attribute_kv_dictionary where key = 'active'); DROP VIEW IF EXISTS device_info_active_ts_view CASCADE; CREATE OR REPLACE VIEW device_info_active_ts_view AS diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java index ad23b7492b..1c798ea7de 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java @@ -29,6 +29,7 @@ import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; @@ -321,12 +322,13 @@ public class TbDeviceProfileNodeTest { device.setCustomerId(customerId); AttributeKvCompositeKey compositeKey = new AttributeKvCompositeKey( - EntityType.TENANT, deviceId.getId(), "SERVER_SCOPE", "alarmEnabled" + deviceId.getId(), AttributeScope.SERVER_SCOPE.getId(), 10 ); AttributeKvEntity attributeKvEntity = new AttributeKvEntity(); attributeKvEntity.setId(compositeKey); attributeKvEntity.setBooleanValue(Boolean.TRUE); + attributeKvEntity.setStrKey("alarmEnabled"); attributeKvEntity.setLastUpdateTs(System.currentTimeMillis()); AttributeKvEntry entry = attributeKvEntity.toData(); @@ -403,11 +405,12 @@ public class TbDeviceProfileNodeTest { device.setCustomerId(customerId); AttributeKvCompositeKey compositeKey = new AttributeKvCompositeKey( - EntityType.TENANT, tenantId.getId(), "SERVER_SCOPE", "alarmEnabled" + tenantId.getId(), AttributeScope.SERVER_SCOPE.getId(), 10 ); AttributeKvEntity attributeKvEntity = new AttributeKvEntity(); attributeKvEntity.setId(compositeKey); + attributeKvEntity.setStrKey("alarmEnabled"); attributeKvEntity.setBooleanValue(Boolean.TRUE); attributeKvEntity.setLastUpdateTs(System.currentTimeMillis()); @@ -486,11 +489,12 @@ public class TbDeviceProfileNodeTest { DeviceProfileData deviceProfileData = new DeviceProfileData(); AttributeKvCompositeKey compositeKey = new AttributeKvCompositeKey( - EntityType.TENANT, deviceId.getId(), "SERVER_SCOPE", "greaterAttribute" + deviceId.getId(), AttributeScope.SERVER_SCOPE.getId(), 10 ); AttributeKvEntity attributeKvEntity = new AttributeKvEntity(); attributeKvEntity.setId(compositeKey); + attributeKvEntity.setStrKey("greaterAttribute"); attributeKvEntity.setLongValue(30L); attributeKvEntity.setLastUpdateTs(0L); @@ -555,20 +559,22 @@ public class TbDeviceProfileNodeTest { DeviceProfileData deviceProfileData = new DeviceProfileData(); AttributeKvCompositeKey compositeKey = new AttributeKvCompositeKey( - EntityType.TENANT, deviceId.getId(), "SERVER_SCOPE", "greaterAttribute" + deviceId.getId(), AttributeScope.SERVER_SCOPE.getId(), 10 ); AttributeKvEntity attributeKvEntity = new AttributeKvEntity(); attributeKvEntity.setId(compositeKey); + attributeKvEntity.setStrKey("greaterAttribute"); attributeKvEntity.setLongValue(30L); attributeKvEntity.setLastUpdateTs(0L); AttributeKvCompositeKey alarmDelayCompositeKey = new AttributeKvCompositeKey( - EntityType.TENANT, deviceId.getId(), "SERVER_SCOPE", "alarm_delay" + deviceId.getId(), AttributeScope.SERVER_SCOPE.getId(), 11 ); AttributeKvEntity alarmDelayAttributeKvEntity = new AttributeKvEntity(); alarmDelayAttributeKvEntity.setId(alarmDelayCompositeKey); + alarmDelayAttributeKvEntity.setStrKey("alarm_delay"); long alarmDelayInSeconds = 5L; alarmDelayAttributeKvEntity.setLongValue(alarmDelayInSeconds); alarmDelayAttributeKvEntity.setLastUpdateTs(0L); @@ -669,20 +675,22 @@ public class TbDeviceProfileNodeTest { AttributeKvCompositeKey compositeKey = new AttributeKvCompositeKey( - EntityType.TENANT, deviceId.getId(), "SERVER_SCOPE", "greaterAttribute" + deviceId.getId(), AttributeScope.SERVER_SCOPE.getId(), 10 ); AttributeKvEntity attributeKvEntity = new AttributeKvEntity(); attributeKvEntity.setId(compositeKey); + attributeKvEntity.setStrKey("greaterAttribute"); attributeKvEntity.setLongValue(30L); attributeKvEntity.setLastUpdateTs(0L); AttributeKvCompositeKey alarmDelayCompositeKey = new AttributeKvCompositeKey( - EntityType.TENANT, deviceId.getId(), "SERVER_SCOPE", "alarm_delay" + deviceId.getId(), AttributeScope.SERVER_SCOPE.getId(), 11 ); AttributeKvEntity alarmDelayAttributeKvEntity = new AttributeKvEntity(); alarmDelayAttributeKvEntity.setId(alarmDelayCompositeKey); + alarmDelayAttributeKvEntity.setStrKey("alarm_delay"); long alarmDelayInSeconds = 5L; alarmDelayAttributeKvEntity.setLongValue(alarmDelayInSeconds); alarmDelayAttributeKvEntity.setLastUpdateTs(0L); @@ -788,20 +796,22 @@ public class TbDeviceProfileNodeTest { DeviceProfileData deviceProfileData = new DeviceProfileData(); AttributeKvCompositeKey compositeKey = new AttributeKvCompositeKey( - EntityType.TENANT, deviceId.getId(), "SERVER_SCOPE", "greaterAttribute" + deviceId.getId(), AttributeScope.SERVER_SCOPE.getId(), 10 ); AttributeKvEntity attributeKvEntity = new AttributeKvEntity(); attributeKvEntity.setId(compositeKey); + attributeKvEntity.setStrKey("greaterAttribute"); attributeKvEntity.setLongValue(30L); attributeKvEntity.setLastUpdateTs(0L); AttributeKvCompositeKey alarmDelayCompositeKey = new AttributeKvCompositeKey( - EntityType.TENANT, deviceId.getId(), "SERVER_SCOPE", "alarm_delay" + deviceId.getId(), AttributeScope.SERVER_SCOPE.getId(), 11 ); AttributeKvEntity alarmDelayAttributeKvEntity = new AttributeKvEntity(); alarmDelayAttributeKvEntity.setId(alarmDelayCompositeKey); + alarmDelayAttributeKvEntity.setStrKey("alarm_delay"); long alarmRepeating = 2; alarmDelayAttributeKvEntity.setLongValue(alarmRepeating); alarmDelayAttributeKvEntity.setLastUpdateTs(0L); @@ -891,7 +901,7 @@ public class TbDeviceProfileNodeTest { DeviceProfileData deviceProfileData = new DeviceProfileData(); AttributeKvCompositeKey compositeKey = new AttributeKvCompositeKey( - EntityType.TENANT, deviceId.getId(), "SERVER_SCOPE", "greaterAttribute" + deviceId.getId(), AttributeScope.SERVER_SCOPE.getId(), 10 ); Device device = new Device(); @@ -900,15 +910,17 @@ public class TbDeviceProfileNodeTest { AttributeKvEntity attributeKvEntity = new AttributeKvEntity(); attributeKvEntity.setId(compositeKey); + attributeKvEntity.setStrKey("greaterAttribute"); attributeKvEntity.setLongValue(30L); attributeKvEntity.setLastUpdateTs(0L); AttributeKvCompositeKey alarmDelayCompositeKey = new AttributeKvCompositeKey( - EntityType.TENANT, deviceId.getId(), "SERVER_SCOPE", "alarm_delay" + deviceId.getId(), AttributeScope.SERVER_SCOPE.getId(), 11 ); AttributeKvEntity alarmDelayAttributeKvEntity = new AttributeKvEntity(); alarmDelayAttributeKvEntity.setId(alarmDelayCompositeKey); + alarmDelayAttributeKvEntity.setStrKey("alarm_delay"); long repeatingCondition = 2; alarmDelayAttributeKvEntity.setLongValue(repeatingCondition); alarmDelayAttributeKvEntity.setLastUpdateTs(0L); @@ -1012,11 +1024,12 @@ public class TbDeviceProfileNodeTest { device.setCustomerId(customerId); AttributeKvCompositeKey compositeKey = new AttributeKvCompositeKey( - EntityType.TENANT, deviceId.getId(), "SERVER_SCOPE", "greaterAttribute" + deviceId.getId(), AttributeScope.SERVER_SCOPE.getId(), 10 ); AttributeKvEntity attributeKvEntity = new AttributeKvEntity(); attributeKvEntity.setId(compositeKey); + attributeKvEntity.setStrKey("greaterAttribute"); attributeKvEntity.setLongValue(30L); attributeKvEntity.setLastUpdateTs(0L); @@ -1113,11 +1126,12 @@ public class TbDeviceProfileNodeTest { device.setCustomerId(customerId); AttributeKvCompositeKey compositeKey = new AttributeKvCompositeKey( - EntityType.TENANT, deviceId.getId(), "SERVER_SCOPE", "greaterAttribute" + deviceId.getId(), AttributeScope.SERVER_SCOPE.getId(), 10 ); AttributeKvEntity attributeKvEntity = new AttributeKvEntity(); attributeKvEntity.setId(compositeKey); + attributeKvEntity.setStrKey("greaterAttribute"); attributeKvEntity.setLongValue(30L); attributeKvEntity.setLastUpdateTs(0L); @@ -1196,11 +1210,12 @@ public class TbDeviceProfileNodeTest { device.setCustomerId(customerId); AttributeKvCompositeKey compositeKeyActiveSchedule = new AttributeKvCompositeKey( - EntityType.TENANT, deviceId.getId(), "SERVER_SCOPE", "dynamicValueActiveSchedule" + deviceId.getId(), AttributeScope.SERVER_SCOPE.getId(), 10 ); AttributeKvEntity attributeKvEntityActiveSchedule = new AttributeKvEntity(); attributeKvEntityActiveSchedule.setId(compositeKeyActiveSchedule); + attributeKvEntityActiveSchedule.setStrKey("dynamicValueActiveSchedule"); attributeKvEntityActiveSchedule.setJsonValue( "{\"timezone\":\"Europe/Kiev\",\"items\":[{\"enabled\":true,\"dayOfWeek\":1,\"startsOn\":0,\"endsOn\":8.64e+7},{\"enabled\":true,\"dayOfWeek\":2,\"startsOn\":0,\"endsOn\":8.64e+7},{\"enabled\":true,\"dayOfWeek\":3,\"startsOn\":0,\"endsOn\":8.64e+7},{\"enabled\":true,\"dayOfWeek\":4,\"startsOn\":0,\"endsOn\":8.64e+7},{\"enabled\":true,\"dayOfWeek\":5,\"startsOn\":0,\"endsOn\":8.64e+7},{\"enabled\":true,\"dayOfWeek\":6,\"startsOn\":0,\"endsOn\":8.64e+7},{\"enabled\":true,\"dayOfWeek\":7,\"startsOn\":0,\"endsOn\":8.64e+7}],\"dynamicValue\":null}" ); @@ -1280,11 +1295,12 @@ public class TbDeviceProfileNodeTest { device.setCustomerId(customerId); AttributeKvCompositeKey compositeKeyInactiveSchedule = new AttributeKvCompositeKey( - EntityType.TENANT, deviceId.getId(), "SERVER_SCOPE", "dynamicValueInactiveSchedule" + deviceId.getId(), AttributeScope.SERVER_SCOPE.getId(), 10 ); AttributeKvEntity attributeKvEntityInactiveSchedule = new AttributeKvEntity(); attributeKvEntityInactiveSchedule.setId(compositeKeyInactiveSchedule); + attributeKvEntityInactiveSchedule.setStrKey("dynamicValueInactiveSchedule"); attributeKvEntityInactiveSchedule.setJsonValue( "{\"timezone\":\"Europe/Kiev\",\"items\":[{\"enabled\":false,\"dayOfWeek\":1,\"startsOn\":0,\"endsOn\":0},{\"enabled\":false,\"dayOfWeek\":2,\"startsOn\":0,\"endsOn\":0},{\"enabled\":false,\"dayOfWeek\":3,\"startsOn\":0,\"endsOn\":0},{\"enabled\":false,\"dayOfWeek\":4,\"startsOn\":0,\"endsOn\":0},{\"enabled\":false,\"dayOfWeek\":5,\"startsOn\":0,\"endsOn\":0},{\"enabled\":false,\"dayOfWeek\":6,\"startsOn\":0,\"endsOn\":0},{\"enabled\":false,\"dayOfWeek\":7,\"startsOn\":0,\"endsOn\":0}],\"dynamicValue\":null}" ); @@ -1372,11 +1388,12 @@ public class TbDeviceProfileNodeTest { device.setCustomerId(customerId); AttributeKvCompositeKey compositeKey = new AttributeKvCompositeKey( - EntityType.CUSTOMER, deviceId.getId(), "SERVER_SCOPE", "lessAttribute" + deviceId.getId(), AttributeScope.SERVER_SCOPE.getId(), 10 ); AttributeKvEntity attributeKvEntity = new AttributeKvEntity(); attributeKvEntity.setId(compositeKey); + attributeKvEntity.setStrKey("lessAttribute"); attributeKvEntity.setLongValue(30L); attributeKvEntity.setLastUpdateTs(0L); @@ -1447,11 +1464,12 @@ public class TbDeviceProfileNodeTest { DeviceProfileData deviceProfileData = new DeviceProfileData(); AttributeKvCompositeKey compositeKey = new AttributeKvCompositeKey( - EntityType.TENANT, deviceId.getId(), "SERVER_SCOPE", "lessAttribute" + deviceId.getId(), AttributeScope.SERVER_SCOPE.getId(), 10 ); AttributeKvEntity attributeKvEntity = new AttributeKvEntity(); attributeKvEntity.setId(compositeKey); + attributeKvEntity.setStrKey("lessAttribute"); attributeKvEntity.setLongValue(50L); attributeKvEntity.setLastUpdateTs(0L); @@ -1520,7 +1538,7 @@ public class TbDeviceProfileNodeTest { DeviceProfileData deviceProfileData = new DeviceProfileData(); AttributeKvCompositeKey compositeKey = new AttributeKvCompositeKey( - EntityType.TENANT, deviceId.getId(), "SERVER_SCOPE", "tenantAttribute" + deviceId.getId(), AttributeScope.SERVER_SCOPE.getId(), 10 ); Device device = new Device(); @@ -1529,6 +1547,7 @@ public class TbDeviceProfileNodeTest { AttributeKvEntity attributeKvEntity = new AttributeKvEntity(); attributeKvEntity.setId(compositeKey); + attributeKvEntity.setStrKey("tenantAttribute"); attributeKvEntity.setLongValue(100L); attributeKvEntity.setLastUpdateTs(0L); @@ -1605,7 +1624,7 @@ public class TbDeviceProfileNodeTest { DeviceProfileData deviceProfileData = new DeviceProfileData(); AttributeKvCompositeKey compositeKey = new AttributeKvCompositeKey( - EntityType.DEVICE, deviceId.getId(), EntityKeyType.SERVER_ATTRIBUTE.name(), "tenantAttribute" + deviceId.getId(), AttributeScope.SERVER_SCOPE.getId(), 10 ); Device device = new Device(); @@ -1614,6 +1633,7 @@ public class TbDeviceProfileNodeTest { AttributeKvEntity attributeKvEntity = new AttributeKvEntity(); attributeKvEntity.setId(compositeKey); + attributeKvEntity.setStrKey("tenantAttribute"); attributeKvEntity.setLongValue(100L); attributeKvEntity.setLastUpdateTs(0L);