diff --git a/application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json b/application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json index 9adeb4f49e..e663ff779d 100644 --- a/application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json +++ b/application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json @@ -33,8 +33,13 @@ }, "type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", "name": "Save Timeseries", + "configurationVersion": 1, "configuration": { - "defaultTTL": 0 + "defaultTTL": 0, + "useServerTs": false, + "persistenceSettings": { + "type": "ON_EVERY_MESSAGE" + } }, "externalId": null }, @@ -185,4 +190,4 @@ ], "ruleChainConnections": null } -} \ No newline at end of file +} diff --git a/application/src/main/data/json/tenant/device_profile/rule_chain_template.json b/application/src/main/data/json/tenant/device_profile/rule_chain_template.json index bce88d62b0..325e01003f 100644 --- a/application/src/main/data/json/tenant/device_profile/rule_chain_template.json +++ b/application/src/main/data/json/tenant/device_profile/rule_chain_template.json @@ -19,8 +19,13 @@ }, "type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", "name": "Save Timeseries", + "configurationVersion": 1, "configuration": { - "defaultTTL": 0 + "defaultTTL": 0, + "useServerTs": false, + "persistenceSettings": { + "type": "ON_EVERY_MESSAGE" + } } }, { @@ -134,4 +139,4 @@ ], "ruleChainConnections": null } -} \ No newline at end of file +} diff --git a/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json b/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json index ee38849c1b..4dc202d740 100644 --- a/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json +++ b/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json @@ -18,8 +18,13 @@ }, "type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", "name": "Save Timeseries", + "configurationVersion": 1, "configuration": { - "defaultTTL": 0 + "defaultTTL": 0, + "useServerTs": false, + "persistenceSettings": { + "type": "ON_EVERY_MESSAGE" + } } }, { diff --git a/application/src/main/data/upgrade/basic/schema_update.sql b/application/src/main/data/upgrade/basic/schema_update.sql index 64bbbaca05..5d66056701 100644 --- a/application/src/main/data/upgrade/basic/schema_update.sql +++ b/application/src/main/data/upgrade/basic/schema_update.sql @@ -14,198 +14,50 @@ -- limitations under the License. -- -ALTER TABLE user_credentials ADD COLUMN IF NOT EXISTS last_login_ts BIGINT; -UPDATE user_credentials c SET last_login_ts = (SELECT (additional_info::json ->> 'lastLoginTs')::bigint FROM tb_user u WHERE u.id = c.user_id) - WHERE last_login_ts IS NULL; +-- UPDATE SAVE TIME SERIES NODES START -ALTER TABLE user_credentials ADD COLUMN IF NOT EXISTS failed_login_attempts INT; -UPDATE user_credentials c SET failed_login_attempts = (SELECT (additional_info::json ->> 'failedLoginAttempts')::int FROM tb_user u WHERE u.id = c.user_id) - WHERE failed_login_attempts IS NULL; - -UPDATE tb_user SET additional_info = (additional_info::jsonb - 'lastLoginTs' - 'failedLoginAttempts' - 'userCredentialsEnabled')::text - WHERE additional_info IS NOT NULL AND additional_info != 'null' AND jsonb_typeof(additional_info::jsonb) = 'object'; - --- UPDATE RULE NODE DEBUG MODE TO DEBUG STRATEGY START - -ALTER TABLE rule_node ADD COLUMN IF NOT EXISTS debug_settings varchar(1024) DEFAULT null; -DO -$$ - BEGIN - IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'rule_node' AND column_name = 'debug_mode') - THEN - UPDATE rule_node SET debug_settings = '{"failuresEnabled": true, "allEnabledUntil": ' || cast((extract(epoch from now()) + 900) * 1000 as bigint) || '}' WHERE debug_mode = true; -- 15 minutes according to thingsboard.yml default settings. - ALTER TABLE rule_node DROP COLUMN debug_mode; - END IF; - END -$$; - --- UPDATE RULE NODE DEBUG MODE TO DEBUG STRATEGY END - - --- CREATE MOBILE APP BUNDLES FROM EXISTING APPS - -CREATE TABLE IF NOT EXISTS mobile_app_bundle ( - id uuid NOT NULL CONSTRAINT mobile_app_bundle_pkey PRIMARY KEY, - created_time bigint NOT NULL, - tenant_id uuid, - title varchar(255), - description varchar(1024), - android_app_id uuid UNIQUE, - ios_app_id uuid UNIQUE, - layout_config varchar(16384), - oauth2_enabled boolean, - CONSTRAINT fk_android_app_id FOREIGN KEY (android_app_id) REFERENCES mobile_app(id) ON DELETE SET NULL, - CONSTRAINT fk_ios_app_id FOREIGN KEY (ios_app_id) REFERENCES mobile_app(id) ON DELETE SET NULL -); -CREATE INDEX IF NOT EXISTS mobile_app_bundle_tenant_id ON mobile_app_bundle(tenant_id); - -ALTER TABLE mobile_app ADD COLUMN IF NOT EXISTS platform_type varchar(32), - ADD COLUMN IF NOT EXISTS status varchar(32), - ADD COLUMN IF NOT EXISTS version_info varchar(100000), - ADD COLUMN IF NOT EXISTS store_info varchar(16384), - DROP CONSTRAINT IF EXISTS mobile_app_pkg_name_key, - DROP CONSTRAINT IF EXISTS mobile_app_unq_key; - --- rename mobile_app_oauth2_client to mobile_app_bundle_oauth2_client -DO -$$ - BEGIN - -- in case of running the upgrade script a second time - IF EXISTS(SELECT 1 FROM information_schema.tables WHERE table_name = 'mobile_app_oauth2_client') THEN - ALTER TABLE mobile_app_oauth2_client RENAME TO mobile_app_bundle_oauth2_client; - ALTER TABLE mobile_app_bundle_oauth2_client DROP CONSTRAINT IF EXISTS fk_domain; - ALTER TABLE mobile_app_bundle_oauth2_client RENAME COLUMN mobile_app_id TO mobile_app_bundle_id; - END IF; - END; -$$; - - -CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; - --- duplicate each mobile app and create mobile app bundle for the pair of android and ios app -DO -$$ - DECLARE - generatedBundleId uuid; - iosAppId uuid; - mobileAppRecord RECORD; +DO $$ BEGIN - -- in case of running the upgrade script a second time - IF EXISTS(SELECT 1 FROM information_schema.columns WHERE table_name = 'mobile_app' and column_name = 'oauth2_enabled') THEN - UPDATE mobile_app SET platform_type = 'ANDROID' WHERE platform_type IS NULL; - UPDATE mobile_app SET status = 'DRAFT' WHERE mobile_app.status IS NULL; - FOR mobileAppRecord IN SELECT * FROM mobile_app - LOOP - -- duplicate app for iOS platform type - iosAppId := uuid_generate_v4(); - INSERT INTO mobile_app(id, created_time, tenant_id, pkg_name, app_secret, platform_type, status) - VALUES (iosAppId, mobileAppRecord.created_time, mobileAppRecord.tenant_id, mobileAppRecord.pkg_name, mobileAppRecord.app_secret, 'IOS', mobileAppRecord.status) - ON CONFLICT DO NOTHING; - -- create bundle for android and iOS app - generatedBundleId := uuid_generate_v4(); - INSERT INTO mobile_app_bundle(id, created_time, tenant_id, title, android_app_id, ios_app_id, oauth2_enabled) - VALUES (generatedBundleId, mobileAppRecord.created_time, mobileAppRecord.tenant_id, - mobileAppRecord.pkg_name || ' (autogenerated)', mobileAppRecord.id, iosAppId, mobileAppRecord.oauth2_enabled) - ON CONFLICT DO NOTHING; - UPDATE mobile_app_bundle_oauth2_client SET mobile_app_bundle_id = generatedBundleId WHERE mobile_app_bundle_id = mobileAppRecord.id; - END LOOP; - END IF; - IF NOT EXISTS(SELECT 1 FROM pg_constraint WHERE conname = 'fk_mobile_app_bundle_oauth2_client_bundle_id') THEN - ALTER TABLE mobile_app_bundle_oauth2_client ADD CONSTRAINT fk_mobile_app_bundle_oauth2_client_bundle_id - FOREIGN KEY (mobile_app_bundle_id) REFERENCES mobile_app_bundle(id) ON DELETE CASCADE; - END IF; - ALTER TABLE mobile_app DROP COLUMN IF EXISTS oauth2_enabled; - IF NOT EXISTS(SELECT 1 FROM pg_constraint WHERE conname = 'mobile_app_pkg_name_platform_unq_key') THEN - ALTER TABLE mobile_app ADD CONSTRAINT mobile_app_pkg_name_platform_unq_key UNIQUE (pkg_name, platform_type); - END IF; - END; -$$; + -- Check if the rule_node table exists + IF EXISTS ( + SELECT 1 + FROM information_schema.tables + WHERE table_name = 'rule_node' + ) THEN + + UPDATE rule_node + SET configuration = ( + (configuration::jsonb - 'skipLatestPersistence') + || jsonb_build_object( + 'persistenceSettings', jsonb_build_object( + 'type', 'ADVANCED', + 'timeseries', jsonb_build_object('type', 'ON_EVERY_MESSAGE'), + 'latest', jsonb_build_object('type', 'SKIP'), + 'webSockets', jsonb_build_object('type', 'ON_EVERY_MESSAGE') + ) + ) + )::text, + configuration_version = 1 + WHERE type = 'org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode' + AND configuration_version = 0 + AND configuration::jsonb ->> 'skipLatestPersistence' = 'true'; + + UPDATE rule_node + SET configuration = ( + (configuration::jsonb - 'skipLatestPersistence') + || jsonb_build_object( + 'persistenceSettings', jsonb_build_object( + 'type', 'ON_EVERY_MESSAGE' + ) + ) + )::text, + configuration_version = 1 + WHERE type = 'org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode' + AND configuration_version = 0 + AND (configuration::jsonb ->> 'skipLatestPersistence' != 'true' OR configuration::jsonb ->> 'skipLatestPersistence' IS NULL); -ALTER TABLE IF EXISTS mobile_app_settings RENAME TO qr_code_settings; -ALTER TABLE qr_code_settings ADD COLUMN IF NOT EXISTS mobile_app_bundle_id uuid, - ADD COLUMN IF NOT EXISTS android_enabled boolean, - ADD COLUMN IF NOT EXISTS ios_enabled boolean; - --- migrate mobile apps from qr code settings to mobile_app, create mobile app bundle for the pair of apps -DO -$$ - DECLARE - androidPkgName varchar; - iosPkgName varchar; - androidAppId uuid; - iosAppId uuid; - generatedBundleId uuid; - qrCodeRecord RECORD; - BEGIN - -- in case of running the upgrade script a second time - IF EXISTS(SELECT 1 FROM information_schema.columns WHERE table_name = 'qr_code_settings' AND column_name = 'android_config') THEN - FOR qrCodeRecord IN SELECT * FROM qr_code_settings - LOOP - generatedBundleId := NULL; - -- migrate android config - IF (qrCodeRecord.android_config::jsonb ->> 'appPackage' IS NOT NULL) THEN - androidPkgName := qrCodeRecord.android_config::jsonb ->> 'appPackage'; - SELECT id into androidAppId FROM mobile_app WHERE pkg_name = androidPkgName AND platform_type = 'ANDROID'; - IF androidAppId IS NULL THEN - androidAppId := uuid_generate_v4(); - INSERT INTO mobile_app(id, created_time, tenant_id, pkg_name, platform_type, status, store_info) - VALUES (androidAppId, (extract(epoch from now()) * 1000), qrCodeRecord.tenant_id, - androidPkgName, 'ANDROID', 'DRAFT', qrCodeRecord.android_config::jsonb - 'appPackage' - 'enabled'); - generatedBundleId := uuid_generate_v4(); - INSERT INTO mobile_app_bundle(id, created_time, tenant_id, title, android_app_id) - VALUES (generatedBundleId, (extract(epoch from now()) * 1000), qrCodeRecord.tenant_id, androidPkgName || ' (autogenerated)', androidAppId); - UPDATE qr_code_settings SET mobile_app_bundle_id = generatedBundleId; - ELSE - UPDATE mobile_app SET store_info = qrCodeRecord.android_config::jsonb - 'appPackage' - 'enabled' WHERE id = androidAppId; - UPDATE qr_code_settings SET mobile_app_bundle_id = (SELECT id FROM mobile_app_bundle WHERE mobile_app_bundle.android_app_id = androidAppId); - END IF; - END IF; - UPDATE qr_code_settings SET android_enabled = (qrCodeRecord.android_config::jsonb ->> 'enabled')::boolean WHERE id = qrCodeRecord.id; - - -- migrate ios config - IF (qrCodeRecord.ios_config::jsonb ->> 'appId' IS NOT NULL) THEN - iosPkgName := substring(qrCodeRecord.ios_config::jsonb ->> 'appId', strpos(qrCodeRecord.ios_config::jsonb ->> 'appId', '.') + 1); - SELECT id INTO iosAppId FROM mobile_app WHERE pkg_name = iosPkgName AND platform_type = 'IOS'; - IF iosAppId IS NULL THEN - iosAppId := uuid_generate_v4(); - INSERT INTO mobile_app(id, created_time, tenant_id, pkg_name, platform_type, status, store_info) - VALUES (iosAppId, (extract(epoch from now()) * 1000), qrCodeRecord.tenant_id, - iosPkgName, 'IOS', 'DRAFT', qrCodeRecord.ios_config::jsonb - 'enabled'); - IF generatedBundleId IS NULL THEN - generatedBundleId := uuid_generate_v4(); - INSERT INTO mobile_app_bundle(id, created_time, tenant_id, title, ios_app_id) - VALUES (generatedBundleId, (extract(epoch from now()) * 1000), qrCodeRecord.tenant_id, iosPkgName || ' (autogenerated)', iosAppId); - UPDATE qr_code_settings SET mobile_app_bundle_id = generatedBundleId; - ELSE - UPDATE mobile_app_bundle SET ios_app_id = iosAppId WHERE id = generatedBundleId; - END IF; - ELSE - UPDATE qr_code_settings SET mobile_app_bundle_id = (SELECT id FROM mobile_app_bundle WHERE mobile_app_bundle.ios_app_id = iosAppId); - UPDATE mobile_app SET store_info = qrCodeRecord.ios_config::jsonb - 'enabled' WHERE id = iosAppId; - END IF; - END IF; - UPDATE qr_code_settings SET ios_enabled = (qrCodeRecord.ios_config::jsonb -> 'enabled')::boolean WHERE id = qrCodeRecord.id; - END LOOP; - ALTER TABLE qr_code_settings RENAME CONSTRAINT mobile_app_settings_tenant_id_unq_key TO qr_code_settings_tenant_id_unq_key; - ALTER TABLE qr_code_settings RENAME CONSTRAINT mobile_app_settings_pkey TO qr_code_settings_pkey; - END IF; - ALTER TABLE qr_code_settings DROP COLUMN IF EXISTS android_config, DROP COLUMN IF EXISTS ios_config; - END; -$$; - --- update constraint name -DO -$$ - BEGIN - ALTER TABLE domain DROP CONSTRAINT IF EXISTS domain_unq_key; - IF NOT EXISTS(SELECT 1 FROM pg_constraint WHERE conname = 'domain_name_key') THEN - ALTER TABLE domain ADD CONSTRAINT domain_name_key UNIQUE (name); END IF; END; $$; --- UPDATE RESOURCE JS_MODULE SUB TYPE START - -UPDATE resource SET resource_sub_type = 'EXTENSION' WHERE resource_type = 'JS_MODULE' AND resource_sub_type IS NULL; - --- UPDATE RESOURCE JS_MODULE SUB TYPE END +-- UPDATE SAVE TIME SERIES NODES END diff --git a/application/src/main/java/org/thingsboard/server/config/SwaggerConfiguration.java b/application/src/main/java/org/thingsboard/server/config/SwaggerConfiguration.java index c32533bd04..0f3f529cb7 100644 --- a/application/src/main/java/org/thingsboard/server/config/SwaggerConfiguration.java +++ b/application/src/main/java/org/thingsboard/server/config/SwaggerConfiguration.java @@ -84,6 +84,7 @@ import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE; public class SwaggerConfiguration { public static final String LOGIN_ENDPOINT = "/api/auth/login"; + public static final String REFRESH_TOKEN_ENDPOINT = "/api/auth/token"; private static final ApiResponses loginResponses = loginResponses(); private static final ApiResponses defaultErrorResponses = defaultErrorResponses(false); @@ -150,6 +151,7 @@ public class SwaggerConfiguration { .info(info); addDefaultSchemas(openApi); addLoginOperation(openApi); + addRefreshTokenOperation(openApi); return openApi; } @@ -210,6 +212,29 @@ public class SwaggerConfiguration { openAPI.path(LOGIN_ENDPOINT, pathItem); } + private void addRefreshTokenOperation(OpenAPI openAPI) { + var operation = new Operation(); + operation.summary("Refresh user JWT token data"); + operation.description(""" + Method to refresh JWT token. Provide a valid refresh token to get a new JWT token. + + The response contains a new token that can be used for authorization. + + `X-Authorization: Bearer $JWT_TOKEN_VALUE`"""); + + var requestBody = new RequestBody().description("Refresh token request") + .content(new Content().addMediaType(APPLICATION_JSON_VALUE, + new MediaType().schema(new Schema().addProperty("refreshToken", new Schema<>().type("string"))))); + + operation.requestBody(requestBody); + + operation.responses(loginResponses); + + operation.addTagsItem("login-endpoint"); + var pathItem = new PathItem().post(operation); + openAPI.path(REFRESH_TOKEN_ENDPOINT, pathItem); + } + @Bean public GroupedOpenApi groupedApi(SpringDocParameterNameDiscoverer localSpringDocParameterNameDiscoverer) { return GroupedOpenApi.builder() diff --git a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java index 622c0ab2dc..ea4c059222 100644 --- a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java +++ b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java @@ -35,8 +35,6 @@ import org.thingsboard.server.service.install.migrate.TsLatestMigrateService; import org.thingsboard.server.service.install.update.CacheCleanupService; import org.thingsboard.server.service.install.update.DataUpdateService; -import static org.thingsboard.server.service.install.update.DefaultDataUpdateService.getEnv; - @Service @Profile("install") @Slf4j @@ -99,8 +97,6 @@ public class ThingsboardInstallService { if ("cassandra-latest-to-postgres".equals(upgradeFromVersion)) { log.info("Migrating ThingsBoard latest timeseries data from cassandra to SQL database ..."); latestMigrateService.migrate(); - } else if (upgradeFromVersion.equals("3.9.0-resources")) { - installScripts.updateResourcesUsage(); } else { // TODO DON'T FORGET to update SUPPORTED_VERSIONS_FROM in DefaultDatabaseSchemaSettingsService databaseSchemaVersionService.validateSchemaSettings(); @@ -118,25 +114,16 @@ public class ThingsboardInstallService { entityDatabaseSchemaService.createOrUpdateDeviceInfoView(persistToTelemetry); // Creates missing indexes. entityDatabaseSchemaService.createDatabaseIndexes(); - // Runs upgrade scripts that are not possible in plain SQL. + // TODO: cleanup update code after each release - if (!getEnv("SKIP_RESOURCES_USAGE_MIGRATION", false)) { - installScripts.setUpdateResourcesUsage(true); - } else { - log.info("Skipping resources usage migration. Run the upgrade with fromVersion as '3.9.0-resources' to migrate"); - } - if (installScripts.isUpdateResourcesUsage()) { - installScripts.updateResourcesUsage(); - } + + // Runs upgrade scripts that are not possible in plain SQL. dataUpdateService.updateData(); log.info("Updating system data..."); dataUpdateService.upgradeRuleNodes(); systemDataLoaderService.loadSystemWidgets(); installScripts.loadSystemLwm2mResources(); installScripts.loadSystemImagesAndResources(); - if (installScripts.isUpdateImages()) { - installScripts.updateImages(); - } databaseSchemaVersionService.updateSchemaVersion(); } log.info("Upgrade finished successfully!"); diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/dashboard/DashboardSyncService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/dashboard/DashboardSyncService.java index 21eec71388..2adb6822dc 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/dashboard/DashboardSyncService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/dashboard/DashboardSyncService.java @@ -17,6 +17,7 @@ package org.thingsboard.server.service.entitiy.dashboard; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; @@ -30,6 +31,7 @@ import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.util.AfterStartUp; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.install.ProjectInfo; import org.thingsboard.server.service.sync.GitSyncService; import org.thingsboard.server.service.sync.vc.GitRepository.FileType; import org.thingsboard.server.service.sync.vc.GitRepository.RepoFile; @@ -51,10 +53,11 @@ public class DashboardSyncService { private final ImageService imageService; private final WidgetsBundleService widgetsBundleService; private final PartitionService partitionService; + private final ProjectInfo projectInfo; @Value("${transport.gateway.dashboard.sync.repository_url:}") private String repoUrl; - @Value("${transport.gateway.dashboard.sync.branch:main}") + @Value("${transport.gateway.dashboard.sync.branch:}") private String branch; @Value("${transport.gateway.dashboard.sync.fetch_frequency:24}") private int fetchFrequencyHours; @@ -64,6 +67,9 @@ public class DashboardSyncService { @AfterStartUp(order = AfterStartUp.REGULAR_SERVICE) public void init() throws Exception { + if (StringUtils.isBlank(branch)) { + branch = "release/" + projectInfo.getProjectVersion(); + } gitSyncService.registerSync(REPO_KEY, repoUrl, branch, TimeUnit.HOURS.toMillis(fetchFrequencyHours), this::update); } diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/entityview/DefaultTbEntityViewService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/entityview/DefaultTbEntityViewService.java index 686e85cb1e..2c384aa493 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/entityview/DefaultTbEntityViewService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/entityview/DefaultTbEntityViewService.java @@ -348,7 +348,7 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen .tenantId(entityView.getTenantId()) .entityId(entityId) .entries(latestValues) - .onlyLatest(true) + .strategy(TimeseriesSaveRequest.Strategy.LATEST_AND_WS) .callback(new FutureCallback() { @Override public void onSuccess(@Nullable Void tmp) { diff --git a/application/src/main/java/org/thingsboard/server/service/install/DefaultDatabaseSchemaSettingsService.java b/application/src/main/java/org/thingsboard/server/service/install/DefaultDatabaseSchemaSettingsService.java index f1d9259e91..3b21685d22 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/DefaultDatabaseSchemaSettingsService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/DefaultDatabaseSchemaSettingsService.java @@ -17,9 +17,6 @@ package org.thingsboard.server.service.install; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.info.BuildProperties; import org.springframework.context.annotation.Profile; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Service; @@ -33,37 +30,29 @@ import java.util.List; @RequiredArgsConstructor public class DefaultDatabaseSchemaSettingsService implements DatabaseSchemaSettingsService { - private static final String CURRENT_PRODUCT = "CE"; // This list should include all versions which are compatible for the upgrade. // The compatibility cycle usually breaks when we have some scripts written in Java that may not work after new release. - private static final List SUPPORTED_VERSIONS_FOR_UPGRADE = List.of("3.8.0", "3.8.1"); + private static final List SUPPORTED_VERSIONS_FOR_UPGRADE = List.of("3.9.0"); - private final BuildProperties buildProperties; + private final ProjectInfo projectInfo; private final JdbcTemplate jdbcTemplate; - @Value("${install.upgrade.from_version:}") - private String upgradeFromVersion; - private String packageSchemaVersion; private String schemaVersionFromDb; @Override public void validateSchemaSettings() { - //TODO: remove after release (3.9.0) - createProductIfNotExists(); - - String dbSchemaVersion = getDbSchemaVersion(); - if (DefaultDataUpdateService.getEnv("SKIP_SCHEMA_VERSION_CHECK", false)) { log.info("Skipped DB schema version check due to SKIP_SCHEMA_VERSION_CHECK set to 'true'."); return; } String product = getProductFromDb(); - if (!CURRENT_PRODUCT.equals(product)) { - onSchemaSettingsError(String.format("Upgrade failed: can't upgrade ThingsBoard %s database using ThingsBoard %s.", product, CURRENT_PRODUCT)); + if (!projectInfo.getProductType().equals(product)) { + onSchemaSettingsError(String.format("Upgrade failed: can't upgrade ThingsBoard %s database using ThingsBoard %s.", product, projectInfo.getProductType())); } + String dbSchemaVersion = getDbSchemaVersion(); if (dbSchemaVersion.equals(getPackageSchemaVersion())) { onSchemaSettingsError("Upgrade failed: database already upgraded to current version. You can set SKIP_SCHEMA_VERSION_CHECK to 'true' if force re-upgrade needed."); } @@ -75,19 +64,11 @@ public class DefaultDatabaseSchemaSettingsService implements DatabaseSchemaSetti } } - @Deprecated(forRemoval = true, since = "3.9.0") - private void createProductIfNotExists() { - boolean isCommunityEdition = jdbcTemplate.queryForList( - "SELECT 1 FROM information_schema.tables WHERE table_name = 'integration'", Integer.class).isEmpty(); - String product = isCommunityEdition ? "CE" : "PE"; - jdbcTemplate.execute("ALTER TABLE tb_schema_settings ADD COLUMN IF NOT EXISTS product varchar(2) DEFAULT '" + product + "'"); - } - @Override public void createSchemaSettings() { Long schemaVersion = getSchemaVersionFromDb(); if (schemaVersion == null) { - jdbcTemplate.execute("INSERT INTO tb_schema_settings (schema_version, product) VALUES (" + getPackageSchemaVersionForDb() + ", '" + CURRENT_PRODUCT + "')"); + jdbcTemplate.execute("INSERT INTO tb_schema_settings (schema_version, product) VALUES (" + getPackageSchemaVersionForDb() + ", '" + projectInfo.getProductType() + "')"); } } @@ -99,7 +80,7 @@ public class DefaultDatabaseSchemaSettingsService implements DatabaseSchemaSetti @Override public String getPackageSchemaVersion() { if (packageSchemaVersion == null) { - packageSchemaVersion = buildProperties.getVersion().replaceAll("[^\\d.]", ""); + packageSchemaVersion = projectInfo.getProjectVersion(); } return packageSchemaVersion; } @@ -107,15 +88,6 @@ public class DefaultDatabaseSchemaSettingsService implements DatabaseSchemaSetti @Override public String getDbSchemaVersion() { if (schemaVersionFromDb == null) { - if (StringUtils.isNotBlank(upgradeFromVersion)) { - /* - * TODO - Remove after the release of 3.9.0: - * This a temporary workaround due to the issue that schema version in the - * tb_schema_settings was set as 3.6.4 during the install of 3.8.1. - * */ - schemaVersionFromDb = upgradeFromVersion; - return schemaVersionFromDb; - } Long version = getSchemaVersionFromDb(); if (version == null) { onSchemaSettingsError("Upgrade failed: the database schema version is missing."); diff --git a/application/src/main/java/org/thingsboard/server/service/install/InstallScripts.java b/application/src/main/java/org/thingsboard/server/service/install/InstallScripts.java index a6ccb85e11..b230be0d57 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/InstallScripts.java +++ b/application/src/main/java/org/thingsboard/server/service/install/InstallScripts.java @@ -17,8 +17,6 @@ package org.thingsboard.server.service.install; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import lombok.Getter; -import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -120,11 +118,6 @@ public class InstallScripts { @Autowired private ResourcesUpdater resourcesUpdater; - @Getter @Setter - private boolean updateImages = false; - - @Getter @Setter - private boolean updateResourcesUsage = false; @Autowired private ImageService imageService; @@ -395,14 +388,6 @@ public class InstallScripts { } } - public void updateImages() { - resourcesUpdater.updateWidgetsBundlesImages(); - resourcesUpdater.updateWidgetTypesImages(); - resourcesUpdater.updateDashboardsImages(); - resourcesUpdater.updateDeviceProfilesImages(); - resourcesUpdater.updateAssetProfilesImages(); - } - public void loadSystemImagesAndResources() { log.info("Loading system images and resources..."); Stream dashboardsFiles = Stream.concat(listDir(Paths.get(getDataDir(), JSON_DIR, DEMO_DIR, DASHBOARDS_DIR)), @@ -512,11 +497,6 @@ public class InstallScripts { } } - public void updateResourcesUsage() { - resourcesUpdater.updateDashboardsResources(); - resourcesUpdater.updateWidgetsResources(); - } - private void loadSystemResources(Path dir, ResourceType resourceType, ResourceSubType resourceSubType) { listDir(dir).forEach(resourceFile -> { String resourceKey = resourceFile.getFileName().toString(); diff --git a/application/src/main/java/org/thingsboard/server/service/install/ProjectInfo.java b/application/src/main/java/org/thingsboard/server/service/install/ProjectInfo.java new file mode 100644 index 0000000000..eddd4c8ccf --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/install/ProjectInfo.java @@ -0,0 +1,36 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.install; + +import lombok.RequiredArgsConstructor; +import org.springframework.boot.info.BuildProperties; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +public class ProjectInfo { + + private final BuildProperties buildProperties; + + public String getProjectVersion() { + return buildProperties.getVersion().replaceAll("[^\\d.]", ""); + } + + public String getProductType() { + return "CE"; + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java index e8622fae37..cea938bce9 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java @@ -34,7 +34,6 @@ import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.sql.JpaExecutorService; import org.thingsboard.server.service.component.ComponentDiscoveryService; import org.thingsboard.server.service.component.RuleNodeClassInfo; -import org.thingsboard.server.service.install.InstallScripts; import org.thingsboard.server.utils.TbNodeUpgradeUtils; import java.util.ArrayList; @@ -58,9 +57,6 @@ public class DefaultDataUpdateService implements DataUpdateService { @Autowired JpaExecutorService jpaExecutorService; - @Autowired - private InstallScripts installScripts; - @Override public void updateData() throws Exception { log.info("Updating data ..."); diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index d4444d5d7f..cab2f18dc7 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -118,10 +118,10 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer EntityId entityId = request.getEntityId(); checkInternalEntity(entityId); boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null; - if (sysTenant || request.isOnlyLatest() || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) { + if (sysTenant || !request.getStrategy().saveTimeseries() || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) { KvUtils.validate(request.getEntries(), valueNoXssValidation); ListenableFuture future = saveTimeseriesInternal(request); - if (!request.isOnlyLatest()) { + if (request.getStrategy().saveTimeseries()) { FutureCallback callback = getApiUsageCallback(tenantId, request.getCustomerId(), sysTenant, request.getCallback()); Futures.addCallback(future, callback, tsCallBackExecutor); } @@ -134,19 +134,24 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer public ListenableFuture saveTimeseriesInternal(TimeseriesSaveRequest request) { TenantId tenantId = request.getTenantId(); EntityId entityId = request.getEntityId(); + TimeseriesSaveRequest.Strategy strategy = request.getStrategy(); ListenableFuture saveFuture; - if (request.isOnlyLatest()) { - saveFuture = Futures.transform(tsService.saveLatest(tenantId, entityId, request.getEntries()), result -> 0, MoreExecutors.directExecutor()); - } else if (request.isSaveLatest()) { + if (strategy.saveTimeseries() && strategy.saveLatest()) { saveFuture = tsService.save(tenantId, entityId, request.getEntries(), request.getTtl()); - } else { + } else if (strategy.saveLatest()) { + saveFuture = Futures.transform(tsService.saveLatest(tenantId, entityId, request.getEntries()), result -> 0, MoreExecutors.directExecutor()); + } else if (strategy.saveTimeseries()) { saveFuture = tsService.saveWithoutLatest(tenantId, entityId, request.getEntries(), request.getTtl()); + } else { + saveFuture = Futures.immediateFuture(0); } addMainCallback(saveFuture, request.getCallback()); - addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries())); - if (request.isSaveLatest() && !request.isOnlyLatest()) { - addEntityViewCallback(tenantId, entityId, request.getEntries()); + if (strategy.sendWsUpdate()) { + addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries())); + } + if (strategy.saveLatest()) { + copyLatestToEntityViews(tenantId, entityId, request.getEntries()); } return saveFuture; } @@ -201,7 +206,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } } - private void addEntityViewCallback(TenantId tenantId, EntityId entityId, List ts) { + private void copyLatestToEntityViews(TenantId tenantId, EntityId entityId, List ts) { if (EntityType.DEVICE.equals(entityId.getEntityType()) || EntityType.ASSET.equals(entityId.getEntityType())) { Futures.addCallback(this.tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId), new FutureCallback<>() { @@ -232,7 +237,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer .tenantId(tenantId) .entityId(entityView.getId()) .entries(entityViewLatest) - .onlyLatest(true) + .strategy(TimeseriesSaveRequest.Strategy.LATEST_AND_WS) .callback(new FutureCallback<>() { @Override public void onSuccess(@Nullable Void tmp) {} diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index f29cc55197..368367bbab 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1271,7 +1271,7 @@ transport: # URL of gateways dashboard repository repository_url: "${TB_GATEWAY_DASHBOARD_SYNC_REPOSITORY_URL:https://github.com/thingsboard/gateway-management-extensions-dist.git}" # Branch of gateways dashboard repository to work with - branch: "${TB_GATEWAY_DASHBOARD_SYNC_BRANCH:main}" + branch: "${TB_GATEWAY_DASHBOARD_SYNC_BRANCH:}" # Fetch frequency in hours for gateways dashboard repository fetch_frequency: "${TB_GATEWAY_DASHBOARD_SYNC_FETCH_FREQUENCY:24}" diff --git a/application/src/test/java/org/thingsboard/server/service/entitiy/entityview/DefaultTbEntityViewServiceTest.java b/application/src/test/java/org/thingsboard/server/service/entitiy/entityview/DefaultTbEntityViewServiceTest.java new file mode 100644 index 0000000000..aa6bfde935 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/service/entitiy/entityview/DefaultTbEntityViewServiceTest.java @@ -0,0 +1,107 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.entitiy.entityview; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; +import org.thingsboard.server.common.data.EntityView; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityViewId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.DoubleDataEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.objects.AttributesEntityView; +import org.thingsboard.server.common.data.objects.TelemetryEntityView; +import org.thingsboard.server.dao.attributes.AttributesService; +import org.thingsboard.server.dao.entityview.EntityViewService; +import org.thingsboard.server.dao.timeseries.TimeseriesService; +import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; + +import java.util.List; +import java.util.UUID; + +import static com.google.common.util.concurrent.Futures.immediateFuture; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; + +@ExtendWith(MockitoExtension.class) +class DefaultTbEntityViewServiceTest { + + final TenantId tenantId = TenantId.fromUUID(UUID.fromString("f09c8180-686c-11ef-9471-a71d33080e9c")); + final EntityId entityId = DeviceId.fromString("782aaab0-c7a8-11ef-a668-79582e785d5f"); + + @Mock + EntityViewService entityViewService; + @Mock + AttributesService attributesService; + @Mock + TelemetrySubscriptionService tsSubService; + @Mock + TimeseriesService tsService; + + DefaultTbEntityViewService defaultTbEntityViewService; + + @BeforeEach + void setup() { + defaultTbEntityViewService = new DefaultTbEntityViewService(entityViewService, attributesService, tsSubService, tsService); + } + + @Test + void shouldNotSaveTimeseriesWhenCopyingLatestToEntityView() throws Exception { + // GIVEN + var entityView = new EntityView(new EntityViewId(UUID.randomUUID())); + entityView.setTenantId(tenantId); + entityView.setEntityId(entityId); + entityView.setKeys(new TelemetryEntityView(List.of("temperature"), new AttributesEntityView())); + + List latest = List.of(new BasicTsKvEntry(123L, new DoubleDataEntry("temperature", 22.3))); + + given(tsService.findAll(eq(tenantId), eq(entityId), anyList())).willReturn(immediateFuture(latest)); + + // WHEN + defaultTbEntityViewService.updateEntityViewAttributes(tenantId, entityView, null, null); + + // THEN + var captor = ArgumentCaptor.forClass(TimeseriesSaveRequest.class); + then(tsSubService).should().saveTimeseries(captor.capture()); + + var expectedCopyLatestRequest = TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .entityId(entityView.getId()) + .entries(latest) + .ttl(0L) + .strategy(TimeseriesSaveRequest.Strategy.LATEST_AND_WS) + .build(); + + var actualCopyLatestRequest = captor.getValue(); + + assertThat(actualCopyLatestRequest) + .usingRecursiveComparison() + .ignoringFields("callback") + .isEqualTo(expectedCopyLatestRequest); + } + +} diff --git a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java new file mode 100644 index 0000000000..10fdd85504 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java @@ -0,0 +1,373 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.telemetry; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; +import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.common.data.ApiUsageRecordKey; +import org.thingsboard.server.common.data.ApiUsageState; +import org.thingsboard.server.common.data.ApiUsageStateValue; +import org.thingsboard.server.common.data.EntityView; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityViewId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.DoubleDataEntry; +import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.objects.AttributesEntityView; +import org.thingsboard.server.common.data.objects.TelemetryEntityView; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.common.stats.TbApiUsageReportClient; +import org.thingsboard.server.dao.attributes.AttributesService; +import org.thingsboard.server.dao.timeseries.TimeseriesService; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.QueueKey; +import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; +import org.thingsboard.server.service.apiusage.TbApiUsageStateService; +import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService; +import org.thingsboard.server.service.subscription.SubscriptionManagerService; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.stream.LongStream; +import java.util.stream.Stream; + +import static com.google.common.util.concurrent.Futures.immediateFuture; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.Mockito.lenient; + +@ExtendWith(MockitoExtension.class) +class DefaultTelemetrySubscriptionServiceTest { + + final TenantId tenantId = TenantId.fromUUID(UUID.fromString("a00ec470-c6b4-11ef-8c88-63b5533fb5bc")); + final CustomerId customerId = new CustomerId(UUID.fromString("7bdc9750-c775-11ef-8e03-ff69ed8da327")); + final EntityId entityId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088"); + + final long sampleTtl = 10_000L; + + final List sampleTelemetry = List.of( + new BasicTsKvEntry(100L, new DoubleDataEntry("temperature", 65.2)), + new BasicTsKvEntry(100L, new DoubleDataEntry("humidity", 33.1)) + ); + + ApiUsageState apiUsageState; + + final TopicPartitionInfo tpi = TopicPartitionInfo.builder() + .tenantId(tenantId) + .myPartition(true) + .build(); + + final FutureCallback emptyCallback = new FutureCallback<>() { + @Override + public void onSuccess(Void result) {} + + @Override + public void onFailure(@NonNull Throwable t) {} + }; + + ExecutorService wsCallBackExecutor; + ExecutorService tsCallBackExecutor; + + @Mock + TbClusterService clusterService; + @Mock + PartitionService partitionService; + @Mock + SubscriptionManagerService subscriptionManagerService; + @Mock + AttributesService attrService; + @Mock + TimeseriesService tsService; + @Mock + TbEntityViewService tbEntityViewService; + @Mock + TbApiUsageReportClient apiUsageClient; + @Mock + TbApiUsageStateService apiUsageStateService; + + DefaultTelemetrySubscriptionService telemetryService; + + @BeforeEach + void setup() { + telemetryService = new DefaultTelemetrySubscriptionService(attrService, tsService, tbEntityViewService, apiUsageClient, apiUsageStateService); + ReflectionTestUtils.setField(telemetryService, "clusterService", clusterService); + ReflectionTestUtils.setField(telemetryService, "partitionService", partitionService); + ReflectionTestUtils.setField(telemetryService, "subscriptionManagerService", Optional.of(subscriptionManagerService)); + + wsCallBackExecutor = MoreExecutors.newDirectExecutorService(); + ReflectionTestUtils.setField(telemetryService, "wsCallBackExecutor", wsCallBackExecutor); + + tsCallBackExecutor = MoreExecutors.newDirectExecutorService(); + ReflectionTestUtils.setField(telemetryService, "tsCallBackExecutor", tsCallBackExecutor); + + apiUsageState = new ApiUsageState(); + apiUsageState.setDbStorageState(ApiUsageStateValue.ENABLED); + lenient().when(apiUsageStateService.getApiUsageState(tenantId)).thenReturn(apiUsageState); + + lenient().when(partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId)).thenReturn(tpi); + + lenient().when(tsService.save(tenantId, entityId, sampleTelemetry, sampleTtl)).thenReturn(immediateFuture(sampleTelemetry.size())); + lenient().when(tsService.saveWithoutLatest(tenantId, entityId, sampleTelemetry, sampleTtl)).thenReturn(immediateFuture(sampleTelemetry.size())); + lenient().when(tsService.saveLatest(tenantId, entityId, sampleTelemetry)).thenReturn(immediateFuture(listOfNNumbers(sampleTelemetry.size()))); + + // mock no entity views + lenient().when(tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId)).thenReturn(immediateFuture(Collections.emptyList())); + + // send partition change event so currentPartitions set is populated + telemetryService.onTbApplicationEvent(new PartitionChangeEvent(this, ServiceType.TB_CORE, Map.of(new QueueKey(ServiceType.TB_CORE), Set.of(tpi)))); + } + + @AfterEach + void cleanup() { + wsCallBackExecutor.shutdownNow(); + tsCallBackExecutor.shutdownNow(); + } + + @Test + void shouldReportStorageDataPointsApiUsageWhenTimeSeriesIsSaved() { + // GIVEN + var request = TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .customerId(customerId) + .entityId(entityId) + .entries(sampleTelemetry) + .ttl(sampleTtl) + .strategy(new TimeseriesSaveRequest.Strategy(true, false, false)) + .callback(emptyCallback) + .build(); + + // WHEN + telemetryService.saveTimeseries(request); + + // THEN + then(apiUsageClient).should().report(tenantId, customerId, ApiUsageRecordKey.STORAGE_DP_COUNT, sampleTelemetry.size()); + } + + @Test + void shouldNotReportStorageDataPointsApiUsageWhenTimeSeriesIsNotSaved() { + // GIVEN + var request = TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .customerId(customerId) + .entityId(entityId) + .entries(sampleTelemetry) + .ttl(sampleTtl) + .strategy(TimeseriesSaveRequest.Strategy.LATEST_AND_WS) + .callback(emptyCallback) + .build(); + + // WHEN + telemetryService.saveTimeseries(request); + + // THEN + then(apiUsageClient).shouldHaveNoInteractions(); + } + + @Test + void shouldThrowStorageDisabledWhenTimeSeriesIsSavedAndStorageIsDisabled() { + // GIVEN + apiUsageState.setDbStorageState(ApiUsageStateValue.DISABLED); + + SettableFuture future = SettableFuture.create(); + var request = TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .customerId(customerId) + .entityId(entityId) + .entries(sampleTelemetry) + .ttl(sampleTtl) + .strategy(TimeseriesSaveRequest.Strategy.SAVE_ALL) + .future(future) + .build(); + + // WHEN + telemetryService.saveTimeseries(request); + + // THEN + assertThat(future).failsWithin(Duration.ofSeconds(5)) + .withThrowableOfType(ExecutionException.class) + .withCauseInstanceOf(RuntimeException.class) + .withMessageContaining("DB storage writes are disabled due to API limits!"); + } + + @Test + void shouldNotThrowStorageDisabledWhenTimeSeriesIsNotSavedAndStorageIsDisabled() { + // GIVEN + apiUsageState.setDbStorageState(ApiUsageStateValue.DISABLED); + + SettableFuture future = SettableFuture.create(); + var request = TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .customerId(customerId) + .entityId(entityId) + .entries(sampleTelemetry) + .ttl(sampleTtl) + .strategy(TimeseriesSaveRequest.Strategy.LATEST_AND_WS) + .future(future) + .build(); + + // WHEN + telemetryService.saveTimeseries(request); + + // THEN + assertThat(future).succeedsWithin(Duration.ofSeconds(5)); + } + + @Test + void shouldCopyLatestToEntityViewWhenLatestIsSavedOnMainEntity() { + // GIVEN + var entityView = new EntityView(new EntityViewId(UUID.randomUUID())); + entityView.setTenantId(tenantId); + entityView.setCustomerId(customerId); + entityView.setEntityId(entityId); + entityView.setKeys(new TelemetryEntityView(sampleTelemetry.stream().map(KvEntry::getKey).toList(), new AttributesEntityView())); + + // mock that there is one entity view + given(tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId)).willReturn(immediateFuture(List.of(entityView))); + // mock that save latest call for entity view is successful + given(tsService.saveLatest(tenantId, entityView.getId(), sampleTelemetry)).willReturn(immediateFuture(listOfNNumbers(sampleTelemetry.size()))); + // mock TPI for entity view + given(partitionService.resolve(ServiceType.TB_CORE, tenantId, entityView.getId())).willReturn(tpi); + + var request = TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .customerId(customerId) + .entityId(entityId) + .entries(sampleTelemetry) + .ttl(sampleTtl) + .strategy(new TimeseriesSaveRequest.Strategy(false, true, false)) + .callback(emptyCallback) + .build(); + + // WHEN + telemetryService.saveTimeseries(request); + + // THEN + // should save latest to both the main entity and it's entity view + then(tsService).should().saveLatest(tenantId, entityId, sampleTelemetry); + then(tsService).should().saveLatest(tenantId, entityView.getId(), sampleTelemetry); + then(tsService).shouldHaveNoMoreInteractions(); + + // should send WS update only for entity view (WS update for the main entity is disabled in the save request) + then(subscriptionManagerService).should().onTimeSeriesUpdate(tenantId, entityView.getId(), sampleTelemetry, TbCallback.EMPTY); + then(subscriptionManagerService).shouldHaveNoMoreInteractions(); + } + + @Test + void shouldNotCopyLatestToEntityViewWhenLatestIsNotSavedOnMainEntity() { + // GIVEN + var request = TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .customerId(customerId) + .entityId(entityId) + .entries(sampleTelemetry) + .ttl(sampleTtl) + .strategy(new TimeseriesSaveRequest.Strategy(true, false, false)) + .callback(emptyCallback) + .build(); + + // WHEN + telemetryService.saveTimeseries(request); + + // THEN + // should save only time series for the main entity + then(tsService).should().saveWithoutLatest(tenantId, entityId, sampleTelemetry, sampleTtl); + then(tsService).shouldHaveNoMoreInteractions(); + + // should not send any WS updates + then(subscriptionManagerService).shouldHaveNoInteractions(); + } + + @ParameterizedTest + @MethodSource("booleanCombinations") + void shouldCallCorrectApiBasedOnBooleanFlagsInTheSaveRequest(boolean saveTimeseries, boolean saveLatest, boolean sendWsUpdate) { + // GIVEN + var request = TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .customerId(customerId) + .entityId(entityId) + .entries(sampleTelemetry) + .ttl(sampleTtl) + .strategy(new TimeseriesSaveRequest.Strategy(saveTimeseries, saveLatest, sendWsUpdate)) + .callback(emptyCallback) + .build(); + + // WHEN + telemetryService.saveTimeseries(request); + + // THEN + if (saveTimeseries && saveLatest) { + then(tsService).should().save(tenantId, entityId, sampleTelemetry, sampleTtl); + } else if (saveLatest) { + then(tsService).should().saveLatest(tenantId, entityId, sampleTelemetry); + } else if (saveTimeseries) { + then(tsService).should().saveWithoutLatest(tenantId, entityId, sampleTelemetry, sampleTtl); + } + then(tsService).shouldHaveNoMoreInteractions(); + + if (sendWsUpdate) { + then(subscriptionManagerService).should().onTimeSeriesUpdate(tenantId, entityId, sampleTelemetry, TbCallback.EMPTY); + } else { + then(subscriptionManagerService).shouldHaveNoInteractions(); + } + } + + private static Stream booleanCombinations() { + return Stream.of( + Arguments.of(true, true, true), + Arguments.of(true, true, false), + Arguments.of(true, false, true), + Arguments.of(true, false, false), + Arguments.of(false, true, true), + Arguments.of(false, true, false), + Arguments.of(false, false, true), + Arguments.of(false, false, false) + ); + } + + // used to emulate sequence numbers returned by save latest API + private static List listOfNNumbers(int N) { + return LongStream.range(0, N).boxed().toList(); + } + +} diff --git a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/GitRepository.java b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/GitRepository.java index 4841f05471..365fe23079 100644 --- a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/GitRepository.java +++ b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/GitRepository.java @@ -450,7 +450,7 @@ public class GitRepository { } ObjectId result = git.getRepository().resolve(rev); if (result == null) { - throw new IllegalArgumentException("Failed to parse git revision string: \"" + rev + "\""); + throw new IllegalArgumentException("Failed to resolve '" + rev + "'"); } return result; } diff --git a/monitoring/src/main/resources/root_rule_chain.json b/monitoring/src/main/resources/root_rule_chain.json index ff44ebfe79..eda3e44e1b 100644 --- a/monitoring/src/main/resources/root_rule_chain.json +++ b/monitoring/src/main/resources/root_rule_chain.json @@ -21,9 +21,13 @@ "type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", "name": "Save Timeseries", "singletonMode": false, - "configurationVersion": 0, + "configurationVersion": 1, "configuration": { - "defaultTTL": 0 + "defaultTTL": 0, + "useServerTs": false, + "persistenceSettings": { + "type": "ON_EVERY_MESSAGE" + } }, "externalId": null }, @@ -273,9 +277,13 @@ "type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", "name": "Save Timeseries", "singletonMode": false, - "configurationVersion": 0, + "configurationVersion": 1, "configuration": { - "defaultTTL": 0 + "defaultTTL": 0, + "useServerTs": false, + "persistenceSettings": { + "type": "ON_EVERY_MESSAGE" + } }, "externalId": null }, @@ -307,11 +315,13 @@ "type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", "name": "Save Timeseries with TTL", "singletonMode": false, - "configurationVersion": 0, + "configurationVersion": 1, "configuration": { "defaultTTL": 180, - "skipLatestPersistence": null, - "useServerTs": null + "useServerTs": false, + "persistenceSettings": { + "type": "ON_EVERY_MESSAGE" + } }, "externalId": null } @@ -415,4 +425,4 @@ ], "ruleChainConnections": null } -} \ No newline at end of file +} diff --git a/msa/black-box-tests/src/test/resources/MqttRuleNodeTestMetadata.json b/msa/black-box-tests/src/test/resources/MqttRuleNodeTestMetadata.json index c2bb52514e..c495e590cd 100644 --- a/msa/black-box-tests/src/test/resources/MqttRuleNodeTestMetadata.json +++ b/msa/black-box-tests/src/test/resources/MqttRuleNodeTestMetadata.json @@ -36,11 +36,13 @@ "type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", "name": "save timeseries", "singletonMode": false, - "configurationVersion": 0, + "configurationVersion": 1, "configuration": { "defaultTTL": 0, - "skipLatestPersistence": false, - "useServerTs": false + "useServerTs": false, + "persistenceSettings": { + "type": "ON_EVERY_MESSAGE" + } }, "externalId": null }, diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java index 2b5881212d..fb667fbfb2 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java @@ -38,10 +38,18 @@ public class TimeseriesSaveRequest { private final EntityId entityId; private final List entries; private final long ttl; - private final boolean saveLatest; - private final boolean onlyLatest; + private final Strategy strategy; private final FutureCallback callback; + public record Strategy(boolean saveTimeseries, boolean saveLatest, boolean sendWsUpdate) { + + public static final Strategy SAVE_ALL = new Strategy(true, true, true); + public static final Strategy WS_ONLY = new Strategy(false, false, true); + public static final Strategy LATEST_AND_WS = new Strategy(false, true, true); + public static final Strategy SKIP_ALL = new Strategy(false, false, false); + + } + public static Builder builder() { return new Builder(); } @@ -53,9 +61,8 @@ public class TimeseriesSaveRequest { private EntityId entityId; private List entries; private long ttl; + private Strategy strategy = Strategy.SAVE_ALL; private FutureCallback callback; - private boolean saveLatest = true; - private boolean onlyLatest; Builder() {} @@ -92,14 +99,8 @@ public class TimeseriesSaveRequest { return this; } - public Builder saveLatest(boolean saveLatest) { - this.saveLatest = saveLatest; - return this; - } - - public Builder onlyLatest(boolean onlyLatest) { - this.onlyLatest = onlyLatest; - this.saveLatest = true; + public Builder strategy(Strategy strategy) { + this.strategy = strategy; return this; } @@ -123,7 +124,7 @@ public class TimeseriesSaveRequest { } public TimeseriesSaveRequest build() { - return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, saveLatest, onlyLatest, callback); + return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, strategy, callback); } } diff --git a/rule-engine/rule-engine-api/src/test/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequestTest.java b/rule-engine/rule-engine-api/src/test/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequestTest.java new file mode 100644 index 0000000000..321892991e --- /dev/null +++ b/rule-engine/rule-engine-api/src/test/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequestTest.java @@ -0,0 +1,51 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.api; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class TimeseriesSaveRequestTest { + + @Test + void testDefaultSaveStrategyIsSaveAll() { + var request = TimeseriesSaveRequest.builder().build(); + + assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.SAVE_ALL); + } + + @Test + void testSaveAllStrategy() { + assertThat(TimeseriesSaveRequest.Strategy.SAVE_ALL).isEqualTo(new TimeseriesSaveRequest.Strategy(true, true, true)); + } + + @Test + void testWsOnlyStrategy() { + assertThat(TimeseriesSaveRequest.Strategy.WS_ONLY).isEqualTo(new TimeseriesSaveRequest.Strategy(false, false, true)); + } + + @Test + void testLatestAndWsStrategy() { + assertThat(TimeseriesSaveRequest.Strategy.LATEST_AND_WS).isEqualTo(new TimeseriesSaveRequest.Strategy(false, true, true)); + } + + @Test + void testSkipAllStrategy() { + assertThat(TimeseriesSaveRequest.Strategy.SKIP_ALL).isEqualTo(new TimeseriesSaveRequest.Strategy(false, false, false)); + } + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java index b0bdb1de69..b98b1d205e 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java @@ -15,8 +15,11 @@ */ package org.thingsboard.rule.engine.telemetry; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.gson.JsonParser; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; @@ -24,6 +27,7 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.telemetry.strategy.PersistenceStrategy; import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.TenantProfile; @@ -32,13 +36,20 @@ import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; +import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.TbMsg; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.TimeUnit; +import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings; +import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Advanced; +import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Deduplicate; +import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.OnEveryMessage; +import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.WebSocketsOnly; import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_REQUEST; @Slf4j @@ -46,20 +57,52 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_RE type = ComponentType.ACTION, name = "save time series", configClazz = TbMsgTimeseriesNodeConfiguration.class, - nodeDescription = "Saves time series data", - nodeDetails = "Saves time series telemetry data based on configurable TTL parameter. Expects messages with 'POST_TELEMETRY_REQUEST' message type. " + - "Timestamp in milliseconds will be taken from metadata.ts, otherwise 'now' message timestamp will be applied. " + - "Allows stopping updating values for incoming keys in the latest ts_kv table if 'skipLatestPersistence' is set to true.\n " + - "
" + - "Enable 'useServerTs' param to use the timestamp of the message processing instead of the timestamp from the message. " + - "Useful for all sorts of sequential processing if you merge messages from multiple sources (devices, assets, etc).\n" + - "
" + - "In the case of sequential processing, the platform guarantees that the messages are processed in the order of their submission to the queue. " + - "However, the timestamp of the messages originated by multiple devices/servers may be unsynchronized long before they are pushed to the queue. " + - "The DB layer has certain optimizations to ignore the updates of the \"attributes\" and \"latest values\" tables if the new record has a timestamp that is older than the previous record. " + - "So, to make sure that all the messages will be processed correctly, one should enable this parameter for sequential message processing scenarios.", + nodeDescription = """ + Saves time series data with a configurable TTL and according to configured persistence strategies. + """, + nodeDetails = """ + Node performs three actions: +
    +
  • Time series: save time series data to a ts_kv table in a DB.
  • +
  • Latest values: save time series data to a ts_kv_latest table in a DB.
  • +
  • WebSockets: notify WebSockets subscriptions about time series data updates.
  • +
+ + For each action, three persistence strategies are available: +
    +
  • On every message: perform the action for every message.
  • +
  • Deduplicate: perform the action only for the first message from a particular originator within a configurable interval.
  • +
  • Skip: never perform the action.
  • +
+ + Persistence strategies are configured using persistence settings, which support two modes: +
    +
  • Basic +
      +
    • On every message: applies the "On every message" strategy to all actions.
    • +
    • Deduplicate: applies the "Deduplicate" strategy (with a specified interval) to all actions.
    • +
    • WebSockets only: applies the "Skip" strategy to Time series and Latest values, and the "On every message" strategy to WebSockets.
    • +
    +
  • +
  • Advanced: configure each action’s strategy independently.
  • +
+ + By default, the timestamp is taken from metadata.ts. You can enable + Use server timestamp to always use the current server time instead. This is particularly + useful in sequential processing scenarios where messages may arrive with out-of-order timestamps from + multiple sources. Note that the DB layer may ignore older records for attributes and latest values, + so enabling Use server timestamp can ensure correct ordering. +

+ The TTL is taken first from metadata.TTL. If absent, the node configuration’s default + TTL is used. If neither is set, the tenant profile default applies. +

+ This node expects messages of type POST_TELEMETRY_REQUEST. +

+ Output connections: Success, Failure. + """, configDirective = "tbActionNodeTimeseriesConfig", - icon = "file_upload" + icon = "file_upload", + version = 1 ) public class TbMsgTimeseriesNode implements TbNode { @@ -67,15 +110,18 @@ public class TbMsgTimeseriesNode implements TbNode { private TbContext ctx; private long tenantProfileDefaultStorageTtl; + private PersistenceSettings persistenceSettings; + @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { this.config = TbNodeUtils.convert(configuration, TbMsgTimeseriesNodeConfiguration.class); this.ctx = ctx; ctx.addTenantProfileListener(this::onTenantProfileUpdate); onTenantProfileUpdate(ctx.getTenantProfile()); + persistenceSettings = config.getPersistenceSettings(); } - void onTenantProfileUpdate(TenantProfile tenantProfile) { + private void onTenantProfileUpdate(TenantProfile tenantProfile) { DefaultTenantProfileConfiguration configuration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration(); tenantProfileDefaultStorageTtl = TimeUnit.DAYS.toSeconds(configuration.getDefaultStorageTtlDays()); } @@ -87,6 +133,15 @@ public class TbMsgTimeseriesNode implements TbNode { return; } long ts = computeTs(msg, config.isUseServerTs()); + + TimeseriesSaveRequest.Strategy strategy = determineSaveStrategy(ts, msg.getOriginator().getId()); + + // short-circuit + if (!strategy.saveTimeseries() && !strategy.saveLatest() && !strategy.sendWsUpdate()) { + ctx.tellSuccess(msg); + return; + } + String src = msg.getData(); Map> tsKvMap = JsonConverter.convertToTelemetry(JsonParser.parseString(src), ts); if (tsKvMap.isEmpty()) { @@ -110,7 +165,7 @@ public class TbMsgTimeseriesNode implements TbNode { .entityId(msg.getOriginator()) .entries(tsKvEntryList) .ttl(ttl) - .saveLatest(!config.isSkipLatestPersistence()) + .strategy(strategy) .callback(new TelemetryNodeCallback(ctx, msg)) .build()); } @@ -119,9 +174,56 @@ public class TbMsgTimeseriesNode implements TbNode { return ignoreMetadataTs ? System.currentTimeMillis() : msg.getMetaDataTs(); } + private TimeseriesSaveRequest.Strategy determineSaveStrategy(long ts, UUID originatorUuid) { + if (persistenceSettings instanceof OnEveryMessage) { + return TimeseriesSaveRequest.Strategy.SAVE_ALL; + } + if (persistenceSettings instanceof WebSocketsOnly) { + return TimeseriesSaveRequest.Strategy.WS_ONLY; + } + if (persistenceSettings instanceof Deduplicate deduplicate) { + boolean isFirstMsgInInterval = deduplicate.getPersistenceStrategy().shouldPersist(ts, originatorUuid); + return isFirstMsgInInterval ? TimeseriesSaveRequest.Strategy.SAVE_ALL : TimeseriesSaveRequest.Strategy.SKIP_ALL; + } + if (persistenceSettings instanceof Advanced advanced) { + return new TimeseriesSaveRequest.Strategy( + advanced.timeseries().shouldPersist(ts, originatorUuid), + advanced.latest().shouldPersist(ts, originatorUuid), + advanced.webSockets().shouldPersist(ts, originatorUuid) + ); + } + // should not happen + throw new IllegalArgumentException("Unknown persistence settings type: " + persistenceSettings.getClass().getSimpleName()); + } + @Override public void destroy() { ctx.removeListeners(); } + @Override + public TbPair upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException { + boolean hasChanges = false; + switch (fromVersion) { + case 0: + hasChanges = true; + JsonNode skipLatestPersistence = oldConfiguration.get("skipLatestPersistence"); + if (skipLatestPersistence != null && "true".equals(skipLatestPersistence.asText())) { + var skipLatestPersistenceSettings = new Advanced( + PersistenceStrategy.onEveryMessage(), + PersistenceStrategy.skip(), + PersistenceStrategy.onEveryMessage() + ); + ((ObjectNode) oldConfiguration).set("persistenceSettings", JacksonUtil.valueToTree(skipLatestPersistenceSettings)); + } else { + ((ObjectNode) oldConfiguration).set("persistenceSettings", JacksonUtil.valueToTree(new OnEveryMessage())); + } + ((ObjectNode) oldConfiguration).remove("skipLatestPersistence"); + break; + default: + break; + } + return new TbPair<>(hasChanges, oldConfiguration); + } + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java index 1c33778a6b..a1386fee49 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java @@ -15,22 +15,84 @@ */ package org.thingsboard.rule.engine.telemetry; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import jakarta.validation.constraints.NotNull; import lombok.Data; +import lombok.Getter; import org.thingsboard.rule.engine.api.NodeConfiguration; +import org.thingsboard.rule.engine.telemetry.strategy.PersistenceStrategy; + +import java.util.Objects; + +import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Advanced; +import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Deduplicate; +import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.OnEveryMessage; +import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.WebSocketsOnly; @Data public class TbMsgTimeseriesNodeConfiguration implements NodeConfiguration { private long defaultTTL; - private boolean skipLatestPersistence; private boolean useServerTs; + @NotNull + private PersistenceSettings persistenceSettings; @Override public TbMsgTimeseriesNodeConfiguration defaultConfiguration() { TbMsgTimeseriesNodeConfiguration configuration = new TbMsgTimeseriesNodeConfiguration(); configuration.setDefaultTTL(0L); - configuration.setSkipLatestPersistence(false); configuration.setUseServerTs(false); + configuration.setPersistenceSettings(new OnEveryMessage()); return configuration; } + + @JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, + include = JsonTypeInfo.As.PROPERTY, + property = "type" + ) + @JsonSubTypes({ + @JsonSubTypes.Type(value = OnEveryMessage.class, name = "ON_EVERY_MESSAGE"), + @JsonSubTypes.Type(value = WebSocketsOnly.class, name = "WEBSOCKETS_ONLY"), + @JsonSubTypes.Type(value = Deduplicate.class, name = "DEDUPLICATE"), + @JsonSubTypes.Type(value = Advanced.class, name = "ADVANCED") + }) + sealed interface PersistenceSettings permits OnEveryMessage, Deduplicate, WebSocketsOnly, Advanced { + + record OnEveryMessage() implements PersistenceSettings {} + + record WebSocketsOnly() implements PersistenceSettings {} + + @Getter + final class Deduplicate implements PersistenceSettings { + + private final int deduplicationIntervalSecs; + + @JsonIgnore + private final PersistenceStrategy persistenceStrategy; + + @JsonCreator + Deduplicate(@JsonProperty("deduplicationIntervalSecs") int deduplicationIntervalSecs) { + this.deduplicationIntervalSecs = deduplicationIntervalSecs; + persistenceStrategy = PersistenceStrategy.deduplicate(deduplicationIntervalSecs); + } + + } + + record Advanced(PersistenceStrategy timeseries, PersistenceStrategy latest, PersistenceStrategy webSockets) implements PersistenceSettings { + + public Advanced { + Objects.requireNonNull(timeseries); + Objects.requireNonNull(latest); + Objects.requireNonNull(webSockets); + } + + } + + } + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/DeduplicatePersistenceStrategy.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/DeduplicatePersistenceStrategy.java new file mode 100644 index 0000000000..868e0d8ca4 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/DeduplicatePersistenceStrategy.java @@ -0,0 +1,89 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.telemetry.strategy; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.google.common.collect.Sets; +import com.google.common.primitives.Longs; + +import java.time.Duration; +import java.util.Set; +import java.util.UUID; + +final class DeduplicatePersistenceStrategy implements PersistenceStrategy { + + private static final int MIN_DEDUPLICATION_INTERVAL_SECS = 1; + private static final int MAX_DEDUPLICATION_INTERVAL_SECS = (int) Duration.ofDays(1L).toSeconds(); + + private static final long MIN_INTERVAL_EXPIRY_MILLIS = Duration.ofMinutes(10L).toMillis(); + private static final int INTERVAL_EXPIRY_FACTOR = 10; + private static final long MAX_INTERVAL_EXPIRY_MILLIS = Duration.ofDays(2L).toMillis(); + + private static final int MAX_TOTAL_INTERVALS_DURATION_SECS = (int) Duration.ofDays(2L).toSeconds(); + private static final int MAX_NUMBER_OF_INTERVALS = 100; + + private final long deduplicationIntervalMillis; + private final LoadingCache> deduplicationCache; + + @JsonCreator + public DeduplicatePersistenceStrategy(@JsonProperty("deduplicationIntervalSecs") int deduplicationIntervalSecs) { + if (deduplicationIntervalSecs < MIN_DEDUPLICATION_INTERVAL_SECS || deduplicationIntervalSecs > MAX_DEDUPLICATION_INTERVAL_SECS) { + throw new IllegalArgumentException("Deduplication interval must be at least " + MIN_DEDUPLICATION_INTERVAL_SECS + " second(s) " + + "and at most " + MAX_DEDUPLICATION_INTERVAL_SECS + " second(s), was " + deduplicationIntervalSecs + " second(s)"); + } + deduplicationIntervalMillis = Duration.ofSeconds(deduplicationIntervalSecs).toMillis(); + deduplicationCache = Caffeine.newBuilder() + .softValues() + .expireAfterAccess(calculateExpireAfterAccess(deduplicationIntervalSecs)) + .maximumSize(calculateMaxNumberOfDeduplicationIntervals(deduplicationIntervalSecs)) + .build(__ -> Sets.newConcurrentHashSet()); + } + + /** + * Calculates the expire-after-access duration. By default, we keep each deduplication interval + * alive for 10 “iterations” (interval duration × 10). However, we never let this drop below + * 10 minutes to ensure adequate retention for small intervals, nor exceed 48 hours to prevent + * storing stale data in memory. + */ + private static Duration calculateExpireAfterAccess(int deduplicationIntervalSecs) { + long desiredExpiryMillis = Duration.ofSeconds(deduplicationIntervalSecs).toMillis() * INTERVAL_EXPIRY_FACTOR; + return Duration.ofMillis(Longs.constrainToRange(desiredExpiryMillis, MIN_INTERVAL_EXPIRY_MILLIS, MAX_INTERVAL_EXPIRY_MILLIS)); + } + + /** + * Calculates the maximum number of deduplication intervals we will store in the cache. + * We limit retention to two days to avoid stale data and cap it at 100 intervals to manage memory usage. + */ + private static long calculateMaxNumberOfDeduplicationIntervals(int deduplicationIntervalSecs) { + int numberOfDeduplicationIntervals = MAX_TOTAL_INTERVALS_DURATION_SECS / deduplicationIntervalSecs; + return Math.min(numberOfDeduplicationIntervals, MAX_NUMBER_OF_INTERVALS); + } + + @JsonProperty("deduplicationIntervalSecs") + public long getDeduplicationIntervalSecs() { + return Duration.ofMillis(deduplicationIntervalMillis).toSeconds(); + } + + @Override + public boolean shouldPersist(long ts, UUID originatorUuid) { + long intervalNumber = ts / deduplicationIntervalMillis; + return deduplicationCache.get(intervalNumber).add(originatorUuid); + } + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/OnEveryMessagePersistenceStrategy.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/OnEveryMessagePersistenceStrategy.java new file mode 100644 index 0000000000..4fcb74dc33 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/OnEveryMessagePersistenceStrategy.java @@ -0,0 +1,38 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.telemetry.strategy; + +import com.fasterxml.jackson.annotation.JsonCreator; + +import java.util.UUID; + +final class OnEveryMessagePersistenceStrategy implements PersistenceStrategy { + + private static final OnEveryMessagePersistenceStrategy INSTANCE = new OnEveryMessagePersistenceStrategy(); + + private OnEveryMessagePersistenceStrategy() {} + + @JsonCreator + public static OnEveryMessagePersistenceStrategy getInstance() { + return INSTANCE; + } + + @Override + public boolean shouldPersist(long ts, UUID originatorUuid) { + return true; + } + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/PersistenceStrategy.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/PersistenceStrategy.java new file mode 100644 index 0000000000..453092117b --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/PersistenceStrategy.java @@ -0,0 +1,49 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.telemetry.strategy; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +import java.util.UUID; + +@JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, + include = JsonTypeInfo.As.PROPERTY, + property = "type" +) +@JsonSubTypes({ + @JsonSubTypes.Type(value = OnEveryMessagePersistenceStrategy.class, name = "ON_EVERY_MESSAGE"), + @JsonSubTypes.Type(value = DeduplicatePersistenceStrategy.class, name = "DEDUPLICATE"), + @JsonSubTypes.Type(value = SkipPersistenceStrategy.class, name = "SKIP") +}) +public sealed interface PersistenceStrategy permits OnEveryMessagePersistenceStrategy, DeduplicatePersistenceStrategy, SkipPersistenceStrategy { + + static PersistenceStrategy onEveryMessage() { + return OnEveryMessagePersistenceStrategy.getInstance(); + } + + static PersistenceStrategy deduplicate(int deduplicationIntervalSecs) { + return new DeduplicatePersistenceStrategy(deduplicationIntervalSecs); + } + + static PersistenceStrategy skip() { + return SkipPersistenceStrategy.getInstance(); + } + + boolean shouldPersist(long ts, UUID originatorUuid); + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/SkipPersistenceStrategy.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/SkipPersistenceStrategy.java new file mode 100644 index 0000000000..c3d96e8ca7 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/SkipPersistenceStrategy.java @@ -0,0 +1,38 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.telemetry.strategy; + +import com.fasterxml.jackson.annotation.JsonCreator; + +import java.util.UUID; + +final class SkipPersistenceStrategy implements PersistenceStrategy { + + private static final SkipPersistenceStrategy INSTANCE = new SkipPersistenceStrategy(); + + private SkipPersistenceStrategy() {} + + @JsonCreator + public static SkipPersistenceStrategy getInstance() { + return INSTANCE; + } + + @Override + public boolean shouldPersist(long ts, UUID originatorUuid) { + return false; + } + +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java index 360fbce5b0..63432afc37 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java @@ -533,7 +533,7 @@ public class TbMathNodeTest { verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture()); verify(telemetryService, times(1)).saveTimeseries(assertArg(request -> { assertThat(request.getEntries()).size().isOne(); - assertThat(request.isSaveLatest()).isTrue(); + assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.SAVE_ALL); })); TbMsg resultMsg = msgCaptor.getValue(); @@ -569,7 +569,7 @@ public class TbMathNodeTest { verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture()); verify(telemetryService, times(1)).saveTimeseries(assertArg(request -> { assertThat(request.getEntries()).size().isOne(); - assertThat(request.isSaveLatest()).isTrue(); + assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.SAVE_ALL); })); TbMsg resultMsg = msgCaptor.getValue(); diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java index 2cba4b8fb3..02c19ed5fc 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java @@ -25,20 +25,23 @@ import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; -import org.mockito.ThrowingConsumer; import org.mockito.junit.jupiter.MockitoExtension; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; +import org.thingsboard.rule.engine.telemetry.strategy.PersistenceStrategy; import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.msg.TbMsgType; @@ -46,6 +49,8 @@ import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileCon import org.thingsboard.server.common.data.tenant.profile.TenantProfileData; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.dao.exception.DataValidationException; +import org.thingsboard.server.dao.service.ConstraintValidator; import java.util.ArrayList; import java.util.List; @@ -55,24 +60,30 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.assertArg; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.then; +import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) -public class TbMsgTimeseriesNodeTest { +public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest { private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("c8f34868-603a-4433-876a-7d356e5cf377")); private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("e5095e9a-04f4-44c9-b443-1cf1b97d3384")); - private final TenantProfileId TENANT_PROFILE_ID = new TenantProfileId(UUID.fromString("ab78dd78-83d0-43fa-869f-d42ec9ed1744")); + + private TenantProfile tenantProfile; private TbMsgTimeseriesNode node; private TbMsgTimeseriesNodeConfiguration config; - private long tenantProfileDefaultStorageTtl; @Mock private TbContext ctxMock; @@ -81,29 +92,68 @@ public class TbMsgTimeseriesNodeTest { @BeforeEach public void setUp() throws TbNodeException { - node = new TbMsgTimeseriesNode(); + tenantProfile = new TenantProfile(new TenantProfileId(UUID.fromString("ab78dd78-83d0-43fa-869f-d42ec9ed1744"))); + var tenantProfileConfiguration = new DefaultTenantProfileConfiguration(); + tenantProfileConfiguration.setDefaultStorageTtlDays(5); + var tenantProfileData = new TenantProfileData(); + tenantProfileData.setConfiguration(tenantProfileConfiguration); + tenantProfile.setProfileData(tenantProfileData); + lenient().when(ctxMock.getTenantProfile()).thenReturn(tenantProfile); + + lenient().when(ctxMock.getTenantId()).thenReturn(TENANT_ID); + lenient().when(ctxMock.getTelemetryService()).thenReturn(telemetryServiceMock); + + node = spy(new TbMsgTimeseriesNode()); config = new TbMsgTimeseriesNodeConfiguration().defaultConfiguration(); } @Test public void verifyDefaultConfig() { assertThat(config.getDefaultTTL()).isEqualTo(0L); - assertThat(config.isSkipLatestPersistence()).isFalse(); + assertThat(config.getPersistenceSettings()).isInstanceOf(TbMsgTimeseriesNodeConfiguration.PersistenceSettings.OnEveryMessage.class); assertThat(config.isUseServerTs()).isFalse(); } + @Test + public void whenInit_thenShouldAddTenantProfileListener() throws Exception { + // GIVEN-WHEN + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + // THEN + then(ctxMock).should().addTenantProfileListener(any()); + } + + @Test + public void givenPersistenceSettingsAreNull_whenValidatingConstraints_thenThrowsException() { + // GIVEN + config.setPersistenceSettings(null); + + // WHEN-THEN + assertThatThrownBy(() -> ConstraintValidator.validateFields(config)) + .isInstanceOf(DataValidationException.class) + .hasMessage("Validation error: persistenceSettings must not be null"); + } + @ParameterizedTest @EnumSource(TbMsgType.class) public void givenMsgTypeAndEmptyMsgData_whenOnMsg_thenVerifyFailureMsg(TbMsgType msgType) throws TbNodeException { - init(); + // GIVEN + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + TbMsg msg = TbMsg.newMsg() .type(msgType) .originator(DEVICE_ID) .copyMetaData(TbMsgMetaData.EMPTY) .data(TbMsg.EMPTY_JSON_ARRAY) .build(); + + // WHEN node.onMsg(ctxMock, msg); + // THEN + then(ctxMock).should().addTenantProfileListener(any()); + then(ctxMock).should().getTenantProfile(); + ArgumentCaptor throwableCaptor = ArgumentCaptor.forClass(Throwable.class); verify(ctxMock).tellFailure(eq(msg), throwableCaptor.capture()); @@ -117,9 +167,11 @@ public class TbMsgTimeseriesNodeTest { } @Test - public void givenTtlFromConfigIsZeroAndUseServiceTsIsTrue_whenOnMsg_thenSaveTimeseriesUsingTenantProfileDefaultTtl() throws TbNodeException { + public void givenTtlFromConfigIsZeroAndUseServerTsIsTrue_whenOnMsg_thenSaveTimeseriesUsingTenantProfileDefaultTtl() throws TbNodeException { + // GIVEN config.setUseServerTs(true); - init(); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); String data = """ { @@ -134,24 +186,29 @@ public class TbMsgTimeseriesNodeTest { .data(data) .build(); - when(ctxMock.getTelemetryService()).thenReturn(telemetryServiceMock); - when(ctxMock.getTenantId()).thenReturn(TENANT_ID); doAnswer(invocation -> { TimeseriesSaveRequest request = invocation.getArgument(0); request.getCallback().onSuccess(null); return null; }).when(telemetryServiceMock).saveTimeseries(any(TimeseriesSaveRequest.class)); + // WHEN node.onMsg(ctxMock, msg); + // THEN + then(ctxMock).should().getTenantId(); + then(ctxMock).should().getTelemetryService(); + then(ctxMock).should().addTenantProfileListener(any()); + then(ctxMock).should().getTenantProfile(); + List expectedList = getTsKvEntriesListWithTs(data, System.currentTimeMillis()); verify(telemetryServiceMock).saveTimeseries(assertArg(request -> { assertThat(request.getTenantId()).isEqualTo(TENANT_ID); assertThat(request.getCustomerId()).isNull(); assertThat(request.getEntityId()).isEqualTo(DEVICE_ID); assertThat(request.getEntries()).usingRecursiveFieldByFieldElementComparatorIgnoringFields("ts").containsExactlyElementsOf(expectedList); - assertThat(request.getTtl()).isEqualTo(tenantProfileDefaultStorageTtl); - assertThat(request.isSaveLatest()).isTrue(); + assertThat(request.getTtl()).isEqualTo(extractTtlAsSeconds(tenantProfile)); + assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.SAVE_ALL); assertThat(request.getCallback()).isInstanceOf(TelemetryNodeCallback.class); })); verify(ctxMock).tellSuccess(msg); @@ -159,11 +216,17 @@ public class TbMsgTimeseriesNodeTest { } @Test - public void givenSkipLatestPersistenceIsTrueAndTtlFromConfig_whenOnMsg_thenSaveTimeseriesUsingTtlFromConfig() throws TbNodeException { - long ttlFromConfig = 5L; - config.setDefaultTTL(ttlFromConfig); - config.setSkipLatestPersistence(true); - init(); + public void givenSkipLatestPersistenceSettingsAndTtlFromConfig_whenOnMsg_thenSaveTimeseriesUsingTtlFromConfig() throws TbNodeException { + // GIVEN + config.setDefaultTTL(10L); + + var timeseriesStrategy = PersistenceStrategy.onEveryMessage(); + var latestStrategy = PersistenceStrategy.skip(); + var webSockets = PersistenceStrategy.onEveryMessage(); + var persistenceSettings = new TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Advanced(timeseriesStrategy, latestStrategy, webSockets); + config.setPersistenceSettings(persistenceSettings); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); String data = """ { @@ -180,24 +243,29 @@ public class TbMsgTimeseriesNodeTest { .data(data) .build(); - when(ctxMock.getTelemetryService()).thenReturn(telemetryServiceMock); - when(ctxMock.getTenantId()).thenReturn(TENANT_ID); doAnswer(invocation -> { TimeseriesSaveRequest request = invocation.getArgument(0); request.getCallback().onSuccess(null); return null; }).when(telemetryServiceMock).saveTimeseries(any(TimeseriesSaveRequest.class)); + // WHEN node.onMsg(ctxMock, msg); + // THEN + then(ctxMock).should().getTenantId(); + then(ctxMock).should().getTelemetryService(); + then(ctxMock).should().addTenantProfileListener(any()); + then(ctxMock).should().getTenantProfile(); + List expectedList = getTsKvEntriesListWithTs(data, ts); verify(telemetryServiceMock).saveTimeseries(assertArg(request -> { assertThat(request.getTenantId()).isEqualTo(TENANT_ID); assertThat(request.getCustomerId()).isNull(); assertThat(request.getEntityId()).isEqualTo(DEVICE_ID); assertThat(request.getEntries()).containsExactlyElementsOf(expectedList); - assertThat(request.getTtl()).isEqualTo(ttlFromConfig); - assertThat(request.isSaveLatest()).isFalse(); + assertThat(request.getTtl()).isEqualTo(config.getDefaultTTL()); + assertThat(request.getStrategy()).isEqualTo(new TimeseriesSaveRequest.Strategy(true, false, true)); assertThat(request.getCallback()).isInstanceOf(TelemetryNodeCallback.class); })); verify(ctxMock).tellSuccess(msg); @@ -207,11 +275,10 @@ public class TbMsgTimeseriesNodeTest { @ParameterizedTest @MethodSource public void givenTtlFromConfigAndTtlFromMd_whenOnMsg_thenVerifyTtl(String ttlFromMd, long ttlFromConfig, long expectedTtl) throws TbNodeException { + // GIVEN config.setDefaultTTL(ttlFromConfig); - init(); - when(ctxMock.getTelemetryService()).thenReturn(telemetryServiceMock); - when(ctxMock.getTenantId()).thenReturn(TENANT_ID); + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); String data = """ { @@ -227,14 +294,17 @@ public class TbMsgTimeseriesNodeTest { .copyMetaData(metadata) .data(data) .build(); + + // WHEN node.onMsg(ctxMock, msg); + // THEN verify(telemetryServiceMock).saveTimeseries(assertArg(request -> { assertThat(request.getTenantId()).isEqualTo(TENANT_ID); assertThat(request.getCustomerId()).isNull(); assertThat(request.getEntityId()).isEqualTo(DEVICE_ID); assertThat(request.getTtl()).isEqualTo(expectedTtl); - assertThat(request.isSaveLatest()).isTrue(); + assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.SAVE_ALL); assertThat(request.getCallback()).isInstanceOf(TelemetryNodeCallback.class); })); } @@ -251,26 +321,6 @@ public class TbMsgTimeseriesNodeTest { ); } - private void init() throws TbNodeException { - var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); - var tenantProfile = getTenantProfile(); - when(ctxMock.getTenantProfile()).thenReturn(tenantProfile); - tenantProfile.getProfileConfiguration().ifPresent(profileConfiguration -> - tenantProfileDefaultStorageTtl = TimeUnit.DAYS.toSeconds(profileConfiguration.getDefaultStorageTtlDays())); - node.init(ctxMock, configuration); - verify(ctxMock).addTenantProfileListener(any()); - } - - private TenantProfile getTenantProfile() { - var tenantProfile = new TenantProfile(TENANT_PROFILE_ID); - var tenantProfileData = new TenantProfileData(); - var tenantProfileConfiguration = new DefaultTenantProfileConfiguration(); - tenantProfileConfiguration.setDefaultStorageTtlDays(5); - tenantProfileData.setConfiguration(tenantProfileConfiguration); - tenantProfile.setProfileData(tenantProfileData); - return tenantProfile; - } - private static List getTsKvEntriesListWithTs(String data, long ts) { Map> tsKvMap = JsonConverter.convertToTelemetry(JsonParser.parseString(data), ts); List expectedList = new ArrayList<>(); @@ -282,4 +332,309 @@ public class TbMsgTimeseriesNodeTest { return expectedList; } + @Test + public void givenOnEveryMessagePersistenceSettingsAndSameMessageTwoTimes_whenOnMsg_thenPersistSameMessageTwoTimes() throws TbNodeException { + // GIVEN + config.setPersistenceSettings(new TbMsgTimeseriesNodeConfiguration.PersistenceSettings.OnEveryMessage()); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + var msg = TbMsg.newMsg() + .type(TbMsgType.POST_TELEMETRY_REQUEST) + .originator(DEVICE_ID) + .data(JacksonUtil.newObjectNode().put("temperature", 22.3).toString()) + .metaData(new TbMsgMetaData(Map.of("ts", "123"))) + .build(); + + // WHEN-THEN + var expectedSaveRequest = TimeseriesSaveRequest.builder() + .tenantId(TENANT_ID) + .customerId(msg.getCustomerId()) + .entityId(msg.getOriginator()) + .entry(new BasicTsKvEntry(123L, new DoubleDataEntry("temperature", 22.3))) + .ttl(extractTtlAsSeconds(tenantProfile)) + .strategy(TimeseriesSaveRequest.Strategy.SAVE_ALL) + .build(); + + node.onMsg(ctxMock, msg); + then(telemetryServiceMock).should(times(1)).saveTimeseries(assertArg( + actualSaveRequest -> assertThat(actualSaveRequest).usingRecursiveComparison().ignoringFields("callback").isEqualTo(expectedSaveRequest) + )); + + node.onMsg(ctxMock, msg); + then(telemetryServiceMock).should(times(2)).saveTimeseries(assertArg( + actualSaveRequest -> assertThat(actualSaveRequest).usingRecursiveComparison().ignoringFields("callback").isEqualTo(expectedSaveRequest) + )); + } + + @Test + public void givenDeduplicatePersistenceSettingsAndSameMessageTwoTimes_whenOnMsg_thenPersistThisMessageOnlyFirstTime() throws TbNodeException { + // GIVEN + config.setPersistenceSettings(new TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Deduplicate(10)); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + var msg = TbMsg.newMsg() + .type(TbMsgType.POST_TELEMETRY_REQUEST) + .originator(DEVICE_ID) + .data(JacksonUtil.newObjectNode().put("temperature", 22.3).toString()) + .metaData(new TbMsgMetaData(Map.of("ts", "123"))) + .build(); + + // WHEN-THEN + var expectedSaveRequest = TimeseriesSaveRequest.builder() + .tenantId(TENANT_ID) + .customerId(msg.getCustomerId()) + .entityId(msg.getOriginator()) + .entry(new BasicTsKvEntry(123L, new DoubleDataEntry("temperature", 22.3))) + .ttl(extractTtlAsSeconds(tenantProfile)) + .strategy(TimeseriesSaveRequest.Strategy.SAVE_ALL) + .build(); + + node.onMsg(ctxMock, msg); + then(telemetryServiceMock).should().saveTimeseries(assertArg( + actualSaveRequest -> assertThat(actualSaveRequest).usingRecursiveComparison().ignoringFields("callback").isEqualTo(expectedSaveRequest) + )); + + clearInvocations(telemetryServiceMock, ctxMock); + + node.onMsg(ctxMock, msg); + then(telemetryServiceMock).should(never()).saveTimeseries(any()); + } + + @Test + public void givenWebsocketsOnlyPersistenceSettingsAndSameMessageTwoTimes_whenOnMsg_thenSendsOnlyWsUpdateTwoTimes() throws TbNodeException { + // GIVEN + config.setPersistenceSettings(new TbMsgTimeseriesNodeConfiguration.PersistenceSettings.WebSocketsOnly()); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + var msg = TbMsg.newMsg() + .type(TbMsgType.POST_TELEMETRY_REQUEST) + .originator(DEVICE_ID) + .data(JacksonUtil.newObjectNode().put("temperature", 22.3).toString()) + .metaData(new TbMsgMetaData(Map.of("ts", "123"))) + .build(); + + // WHEN-THEN + var expectedSaveRequest = TimeseriesSaveRequest.builder() + .tenantId(TENANT_ID) + .customerId(msg.getCustomerId()) + .entityId(msg.getOriginator()) + .entry(new BasicTsKvEntry(123L, new DoubleDataEntry("temperature", 22.3))) + .ttl(extractTtlAsSeconds(tenantProfile)) + .strategy(TimeseriesSaveRequest.Strategy.WS_ONLY) + .build(); + + node.onMsg(ctxMock, msg); + then(telemetryServiceMock).should(times(1)).saveTimeseries(assertArg( + actualSaveRequest -> assertThat(actualSaveRequest).usingRecursiveComparison().ignoringFields("callback").isEqualTo(expectedSaveRequest) + )); + + node.onMsg(ctxMock, msg); + then(telemetryServiceMock).should(times(2)).saveTimeseries(assertArg( + actualSaveRequest -> assertThat(actualSaveRequest).usingRecursiveComparison().ignoringFields("callback").isEqualTo(expectedSaveRequest) + )); + } + + @Test + public void givenAdvancedPersistenceSettingsWithOnEveryMessageStrategiesForAllActionsAndSameMessageTwoTimes_whenOnMsg_thenPersistSameMessageTwoTimes() throws TbNodeException { + // GIVEN + config.setPersistenceSettings(new TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Advanced( + PersistenceStrategy.onEveryMessage(), + PersistenceStrategy.onEveryMessage(), + PersistenceStrategy.onEveryMessage() + )); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + var msg = TbMsg.newMsg() + .type(TbMsgType.POST_TELEMETRY_REQUEST) + .originator(DEVICE_ID) + .data(JacksonUtil.newObjectNode().put("temperature", 22.3).toString()) + .metaData(new TbMsgMetaData(Map.of("ts", "123"))) + .build(); + + // WHEN-THEN + var expectedSaveRequest = TimeseriesSaveRequest.builder() + .tenantId(TENANT_ID) + .customerId(msg.getCustomerId()) + .entityId(msg.getOriginator()) + .entry(new BasicTsKvEntry(123L, new DoubleDataEntry("temperature", 22.3))) + .ttl(extractTtlAsSeconds(tenantProfile)) + .strategy(TimeseriesSaveRequest.Strategy.SAVE_ALL) + .build(); + + node.onMsg(ctxMock, msg); + then(telemetryServiceMock).should(times(1)).saveTimeseries(assertArg( + actualSaveRequest -> assertThat(actualSaveRequest).usingRecursiveComparison().ignoringFields("callback").isEqualTo(expectedSaveRequest) + )); + + node.onMsg(ctxMock, msg); + then(telemetryServiceMock).should(times(2)).saveTimeseries(assertArg( + actualSaveRequest -> assertThat(actualSaveRequest).usingRecursiveComparison().ignoringFields("callback").isEqualTo(expectedSaveRequest) + )); + } + + @Test + public void givenAdvancedPersistenceSettingsWithDifferentDeduplicateStrategyForEachAction_whenOnMsg_thenEvaluatesStrategiesForEachActionsIndependently() throws TbNodeException { + // GIVEN + config.setPersistenceSettings(new TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Advanced( + PersistenceStrategy.deduplicate(1), + PersistenceStrategy.deduplicate(2), + PersistenceStrategy.deduplicate(3) + )); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + long ts1 = 500L; + long ts2 = 1500L; + long ts3 = 2500L; + + // WHEN-THEN + node.onMsg(ctxMock, TbMsg.newMsg() + .type(TbMsgType.POST_TELEMETRY_REQUEST) + .originator(DEVICE_ID) + .data(JacksonUtil.newObjectNode().put("temperature", 22.3).toString()) + .metaData(new TbMsgMetaData(Map.of("ts", Long.toString(ts1)))) + .build()); + then(telemetryServiceMock).should().saveTimeseries(assertArg( + actualSaveRequest -> assertThat(actualSaveRequest.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.SAVE_ALL) + )); + + clearInvocations(telemetryServiceMock); + + node.onMsg(ctxMock, TbMsg.newMsg() + .type(TbMsgType.POST_TELEMETRY_REQUEST) + .originator(DEVICE_ID) + .data(JacksonUtil.newObjectNode().put("temperature", 22.3).toString()) + .metaData(new TbMsgMetaData(Map.of("ts", Long.toString(ts2)))) + .build()); + then(telemetryServiceMock).should().saveTimeseries(assertArg( + actualSaveRequest -> assertThat(actualSaveRequest.getStrategy()).isEqualTo(new TimeseriesSaveRequest.Strategy(true, false, false)) + )); + + clearInvocations(telemetryServiceMock); + + node.onMsg(ctxMock, TbMsg.newMsg() + .type(TbMsgType.POST_TELEMETRY_REQUEST) + .originator(DEVICE_ID) + .data(JacksonUtil.newObjectNode().put("temperature", 22.3).toString()) + .metaData(new TbMsgMetaData(Map.of("ts", Long.toString(ts3)))) + .build()); + then(telemetryServiceMock).should().saveTimeseries(assertArg( + actualSaveRequest -> assertThat(actualSaveRequest.getStrategy()).isEqualTo(new TimeseriesSaveRequest.Strategy(true, true, false)) + )); + } + + @Test + public void givenAdvancedPersistenceSettingsWithSkipStrategiesForAllActionsAndSameMessageTwoTimes_whenOnMsg_thenSkipsSameMessageTwoTimes() throws TbNodeException { + // GIVEN + config.setPersistenceSettings(new TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Advanced( + PersistenceStrategy.skip(), + PersistenceStrategy.skip(), + PersistenceStrategy.skip() + )); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + var msg = TbMsg.newMsg() + .type(TbMsgType.POST_TELEMETRY_REQUEST) + .originator(DEVICE_ID) + .data(JacksonUtil.newObjectNode().put("temperature", 22.3).toString()) + .metaData(new TbMsgMetaData(Map.of("ts", "123"))) + .build(); + + // WHEN-THEN + node.onMsg(ctxMock, msg); + then(telemetryServiceMock).should(never()).saveTimeseries(any()); + then(ctxMock).should(times(1)).tellSuccess(msg); + + node.onMsg(ctxMock, msg); + then(telemetryServiceMock).should(never()).saveTimeseries(any()); + then(ctxMock).should(times(2)).tellSuccess(msg); + } + + private static long extractTtlAsSeconds(TenantProfile tenantProfile) { + return TimeUnit.DAYS.toSeconds(tenantProfile.getDefaultProfileConfiguration().getDefaultStorageTtlDays()); + } + + @Override + protected TbNode getTestNode() { + return node; + } + + private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { + return Stream.of( + Arguments.of(0, """ + { + "defaultTTL": 0, + "useServerTs": false, + "skipLatestPersistence": false + }""", + true, + """ + { + "defaultTTL": 0, + "useServerTs": false, + "persistenceSettings": { + "type": "ON_EVERY_MESSAGE" + } + }"""), + Arguments.of(0, """ + { + "defaultTTL": 0, + "useServerTs": false + }""", + true, + """ + { + "defaultTTL": 0, + "useServerTs": false, + "persistenceSettings": { + "type": "ON_EVERY_MESSAGE" + } + }"""), + Arguments.of(0, """ + { + "defaultTTL": 0, + "useServerTs": false, + "skipLatestPersistence": null + }""", + true, + """ + { + "defaultTTL": 0, + "useServerTs": false, + "persistenceSettings": { + "type": "ON_EVERY_MESSAGE" + } + }"""), + Arguments.of(0, """ + { + "defaultTTL": 0, + "useServerTs": false, + "skipLatestPersistence": true + }""", + true, + """ + { + "defaultTTL": 0, + "useServerTs": false, + "persistenceSettings": { + "type": "ADVANCED", + "timeseries": { + "type": "ON_EVERY_MESSAGE" + }, + "latest": { + "type": "SKIP" + }, + "webSockets": { + "type": "ON_EVERY_MESSAGE" + } + } + }""") + ); + } + } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/strategy/DeduplicatePersistenceStrategyTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/strategy/DeduplicatePersistenceStrategyTest.java new file mode 100644 index 0000000000..1ae2b367a5 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/strategy/DeduplicatePersistenceStrategyTest.java @@ -0,0 +1,260 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.telemetry.strategy; + +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.Policy; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.test.util.ReflectionTestUtils; + +import java.time.Duration; +import java.util.Set; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class DeduplicatePersistenceStrategyTest { + + final int deduplicationIntervalSecs = 10; + + DeduplicatePersistenceStrategy strategy; + + @BeforeEach + void setup() { + strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs); + } + + @Test + void shouldThrowWhenDeduplicationIntervalIsLessThanOneSecond() { + assertThatThrownBy(() -> new DeduplicatePersistenceStrategy(0)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Deduplication interval must be at least 1 second(s) and at most 86400 second(s), was 0 second(s)"); + } + + @Test + void shouldThrowWhenDeduplicationIntervalIsMoreThan24Hours() { + assertThatThrownBy(() -> new DeduplicatePersistenceStrategy(86401)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Deduplication interval must be at least 1 second(s) and at most 86400 second(s), was 86401 second(s)"); + } + + @Test + void shouldUseAtLeastTenMinutesForExpireAfterAccess() { + // GIVEN + int deduplicationIntervalSecs = 1; // min deduplication interval duration + + // WHEN + strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs); + + // THEN + var deduplicationCache = (LoadingCache>) ReflectionTestUtils.getField(strategy, "deduplicationCache"); + + assertThat(deduplicationCache.policy().expireAfterAccess()) + .isPresent() + .map(Policy.FixedExpiration::getExpiresAfter) + .hasValue(Duration.ofMinutes(10L)); + } + + @Test + void shouldCalculateExpireAfterAccessAsIntervalDurationMultipliedByTen() { + // GIVEN + int deduplicationIntervalSecs = (int) Duration.ofHours(1L).toSeconds(); // max deduplication interval duration + + // WHEN + strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs); + + // THEN + var deduplicationCache = (LoadingCache>) ReflectionTestUtils.getField(strategy, "deduplicationCache"); + + assertThat(deduplicationCache.policy().expireAfterAccess()) + .isPresent() + .map(Policy.FixedExpiration::getExpiresAfter) + .hasValue(Duration.ofHours(10L)); + } + + @Test + void shouldUseAtMostTwoDaysForExpireAfterAccess() { + // GIVEN + int deduplicationIntervalSecs = (int) Duration.ofDays(1L).toSeconds(); // max deduplication interval duration + + // WHEN + strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs); + + // THEN + var deduplicationCache = (LoadingCache>) ReflectionTestUtils.getField(strategy, "deduplicationCache"); + + assertThat(deduplicationCache.policy().expireAfterAccess()) + .isPresent() + .map(Policy.FixedExpiration::getExpiresAfter) + .hasValue(Duration.ofDays(2L)); + } + + @Test + void shouldNotAllowMoreThan100DeduplicationIntervals() { + // GIVEN + int deduplicationIntervalSecs = 1; // min deduplication interval duration + + // WHEN + strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs); + + // THEN + var deduplicationCache = (LoadingCache>) ReflectionTestUtils.getField(strategy, "deduplicationCache"); + + assertThat(deduplicationCache.policy().eviction()) + .isPresent() + .map(Policy.Eviction::getMaximum) + .hasValue(100L); + } + + @Test + void shouldCalculateMaxIntervalsAsTwoDaysDividedByIntervalDuration() { + // GIVEN + int deduplicationIntervalSecs = (int) Duration.ofHours(1L).toSeconds(); + + // WHEN + strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs); + + // THEN + var deduplicationCache = (LoadingCache>) ReflectionTestUtils.getField(strategy, "deduplicationCache"); + + assertThat(deduplicationCache.policy().eviction()) + .isPresent() + .map(Policy.Eviction::getMaximum) + .hasValue(48L); + } + + @Test + void shouldKeepAtLeastTwoDeduplicationIntervals() { + // GIVEN + int deduplicationIntervalSecs = (int) Duration.ofDays(1L).toSeconds(); // max deduplication interval duration + + // WHEN + strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs); + + // THEN + var deduplicationCache = (LoadingCache>) ReflectionTestUtils.getField(strategy, "deduplicationCache"); + + assertThat(deduplicationCache.policy().eviction()) + .isPresent() + .map(Policy.Eviction::getMaximum) + .hasValue(2L); + } + + @Test + void shouldReturnTrueForFirstCallInInterval() { + long ts = 1_000_000L; + UUID originator = UUID.randomUUID(); + + assertThat(strategy.shouldPersist(ts, originator)).isTrue(); + } + + @Test + void shouldReturnFalseForSubsequentCallsInInterval() { + long baseTs = 1_000_000L; + UUID originator = UUID.randomUUID(); + + // Initial call should return true + assertThat(strategy.shouldPersist(baseTs, originator)).isTrue(); + + // Subsequent call within the same interval should return false for the same originator + long withinSameIntervalTs = baseTs + 1000L; + assertThat(strategy.shouldPersist(withinSameIntervalTs, originator)).isFalse(); + } + + @Test + void shouldHandleMultipleOriginatorsIndependently() { + long baseTs = 1_000_000L; + UUID originator1 = UUID.randomUUID(); + UUID originator2 = UUID.randomUUID(); + + // First call for different originators in the same interval should return true independently + assertThat(strategy.shouldPersist(baseTs, originator1)).isTrue(); + assertThat(strategy.shouldPersist(baseTs, originator2)).isTrue(); + + // Subsequent calls for the same originators within the same interval should return false + assertThat(strategy.shouldPersist(baseTs + 500L, originator1)).isFalse(); + assertThat(strategy.shouldPersist(baseTs + 500L, originator2)).isFalse(); + } + + @Test + void shouldHandleEdgeCaseTimestamps() { + long minTs = Long.MIN_VALUE; + long maxTs = Long.MAX_VALUE; + UUID originator = UUID.randomUUID(); + + assertThat(strategy.shouldPersist(minTs, originator)).isTrue(); + assertThat(strategy.shouldPersist(minTs + 1L, originator)).isFalse(); + + assertThat(strategy.shouldPersist(maxTs, originator)).isTrue(); + assertThat(strategy.shouldPersist(maxTs - 1L, originator)).isFalse(); + } + + @Test + void shouldResetDeduplicationAtIntervalBoundaries() { + UUID originator = UUID.randomUUID(); + + // check 1st interval + long firstIntervalStart = 0L; + long firstIntervalEnd = firstIntervalStart + Duration.ofSeconds(deduplicationIntervalSecs).toMillis() - 1L; + long firstIntervalMiddle = calculateMiddle(firstIntervalStart, firstIntervalEnd); + + assertThat(strategy.shouldPersist(firstIntervalStart, originator)).isTrue(); + assertThat(strategy.shouldPersist(firstIntervalStart + 1, originator)).isFalse(); + assertThat(strategy.shouldPersist(firstIntervalMiddle, originator)).isFalse(); + assertThat(strategy.shouldPersist(firstIntervalEnd - 1, originator)).isFalse(); + assertThat(strategy.shouldPersist(firstIntervalEnd, originator)).isFalse(); + + // check 2nd interval + long secondIntervalStart = firstIntervalEnd + 1L; + long secondIntervalEnd = secondIntervalStart + Duration.ofSeconds(deduplicationIntervalSecs).toMillis() - 1L; + long secondIntervalMiddle = calculateMiddle(secondIntervalStart, secondIntervalEnd); + + assertThat(strategy.shouldPersist(secondIntervalStart, originator)).isTrue(); + assertThat(strategy.shouldPersist(secondIntervalStart + 1, originator)).isFalse(); + assertThat(strategy.shouldPersist(secondIntervalMiddle, originator)).isFalse(); + assertThat(strategy.shouldPersist(secondIntervalEnd - 1, originator)).isFalse(); + assertThat(strategy.shouldPersist(secondIntervalEnd, originator)).isFalse(); + } + + @Test + void shouldHandleMultipleOriginatorsOverMultipleIntervals() { + UUID originator1 = UUID.randomUUID(); + UUID originator2 = UUID.randomUUID(); + long baseTs = 0L; + + // First interval for both originators + assertThat(strategy.shouldPersist(baseTs, originator1)).isTrue(); + assertThat(strategy.shouldPersist(baseTs, originator2)).isTrue(); + + // Move to the next interval + long nextIntervalTs = baseTs + Duration.ofSeconds(10).toMillis(); + + // Each originator should be allowed again in the new interval + assertThat(strategy.shouldPersist(nextIntervalTs, originator1)).isTrue(); + assertThat(strategy.shouldPersist(nextIntervalTs, originator2)).isTrue(); + + // Subsequent calls in the same new interval should return false + assertThat(strategy.shouldPersist(nextIntervalTs + 500L, originator1)).isFalse(); + assertThat(strategy.shouldPersist(nextIntervalTs + 500L, originator2)).isFalse(); + } + + private static long calculateMiddle(long start, long end) { + return start + (end - start) / 2; + } + +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/strategy/OnEveryMessagePersistenceStrategyTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/strategy/OnEveryMessagePersistenceStrategyTest.java new file mode 100644 index 0000000000..125da3a495 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/strategy/OnEveryMessagePersistenceStrategyTest.java @@ -0,0 +1,48 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.telemetry.strategy; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.UUID; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +class OnEveryMessagePersistenceStrategyTest { + + @ParameterizedTest + @MethodSource("edgeCaseProvider") + void shouldAlwaysReturnTrueForAnyInput(long timestamp, UUID originator) { + var onEveryMessage = OnEveryMessagePersistenceStrategy.getInstance(); + assertThat(onEveryMessage.shouldPersist(timestamp, originator)).isTrue(); + } + + private static Stream edgeCaseProvider() { + return Stream.of( + Arguments.of(Long.MIN_VALUE, new UUID(0L, 0L)), + Arguments.of(Long.MAX_VALUE, new UUID(Long.MAX_VALUE, Long.MAX_VALUE)), + Arguments.of(0L, new UUID(0L, 0L)), + Arguments.of(-1L, new UUID(-1L, -1L)), + Arguments.of(1L, new UUID(1L, 1L)), + Arguments.of(42L, UUID.randomUUID()), + Arguments.of(1000L, null) + ); + } + +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/strategy/PersistenceStrategyTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/strategy/PersistenceStrategyTest.java new file mode 100644 index 0000000000..71a8eabed5 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/strategy/PersistenceStrategyTest.java @@ -0,0 +1,55 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.telemetry.strategy; + +import org.junit.jupiter.api.Test; +import org.springframework.test.util.ReflectionTestUtils; + +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThat; + +class PersistenceStrategyTest { + + @Test + void testOnEveryMessageReturnsCorrectInstance() { + PersistenceStrategy strategy = PersistenceStrategy.onEveryMessage(); + assertThat(strategy) + .isNotNull() + .isInstanceOf(OnEveryMessagePersistenceStrategy.class); + } + + @Test + void testDeduplicateReturnsCorrectInstance() { + int validDeduplicationIntervalSecs = 5; + PersistenceStrategy strategy = PersistenceStrategy.deduplicate(validDeduplicationIntervalSecs); + assertThat(strategy) + .isNotNull() + .isInstanceOf(DeduplicatePersistenceStrategy.class); + + long actualDeduplicationIntervalMillis = (long) ReflectionTestUtils.getField(strategy, "deduplicationIntervalMillis"); + assertThat(actualDeduplicationIntervalMillis).isEqualTo(Duration.ofSeconds(validDeduplicationIntervalSecs).toMillis()); + } + + @Test + void testSkipReturnsCorrectInstance() { + PersistenceStrategy strategy = PersistenceStrategy.skip(); + assertThat(strategy) + .isNotNull() + .isInstanceOf(SkipPersistenceStrategy.class); + } + +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/strategy/SkipPersistenceStrategyTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/strategy/SkipPersistenceStrategyTest.java new file mode 100644 index 0000000000..1a63ce7460 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/strategy/SkipPersistenceStrategyTest.java @@ -0,0 +1,48 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.telemetry.strategy; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.UUID; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +class SkipPersistenceStrategyTest { + + @ParameterizedTest + @MethodSource("edgeCaseProvider") + void shouldAlwaysReturnFalseForAnyInput(long timestamp, UUID originator) { + var skipStrategy = SkipPersistenceStrategy.getInstance(); + assertThat(skipStrategy.shouldPersist(timestamp, originator)).isFalse(); + } + + private static Stream edgeCaseProvider() { + return Stream.of( + Arguments.of(Long.MIN_VALUE, new UUID(0L, 0L)), + Arguments.of(Long.MAX_VALUE, new UUID(Long.MAX_VALUE, Long.MAX_VALUE)), + Arguments.of(0L, new UUID(0L, 0L)), + Arguments.of(-1L, new UUID(-1L, -1L)), + Arguments.of(1L, new UUID(1L, 1L)), + Arguments.of(42L, UUID.randomUUID()), + Arguments.of(1000L, null) + ); + } + +} diff --git a/ui-ngx/src/app/modules/home/components/rule-node/action/action-rule-node-config.module.ts b/ui-ngx/src/app/modules/home/components/rule-node/action/action-rule-node-config.module.ts index 5e7b53c02b..e71055b5da 100644 --- a/ui-ngx/src/app/modules/home/components/rule-node/action/action-rule-node-config.module.ts +++ b/ui-ngx/src/app/modules/home/components/rule-node/action/action-rule-node-config.module.ts @@ -42,6 +42,12 @@ import { DeleteAttributesConfigComponent } from './delete-attributes-config.comp import { MathFunctionConfigComponent } from './math-function-config.component'; import { DeviceStateConfigComponent } from './device-state-config.component'; import { SendRestApiCallReplyConfigComponent } from './send-rest-api-call-reply-config.component'; +import { + AdvancedPersistenceSettingComponent +} from '@home/components/rule-node/action/advanced-persistence-setting.component'; +import { + AdvancedPersistenceSettingRowComponent +} from '@home/components/rule-node/action/advanced-persistence-setting-row.component'; @NgModule({ declarations: [ @@ -67,7 +73,9 @@ import { SendRestApiCallReplyConfigComponent } from './send-rest-api-call-reply- PushToEdgeConfigComponent, PushToCloudConfigComponent, MathFunctionConfigComponent, - DeviceStateConfigComponent + DeviceStateConfigComponent, + AdvancedPersistenceSettingComponent, + AdvancedPersistenceSettingRowComponent, ], imports: [ CommonModule, diff --git a/ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting-row.component.html b/ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting-row.component.html new file mode 100644 index 0000000000..2bb51ad504 --- /dev/null +++ b/ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting-row.component.html @@ -0,0 +1,40 @@ + +
+
{{ title }}
+ + rule-node-config.save-time-series.strategy + + @for (strategy of persistenceStrategies; track strategy) { + {{ PersistenceTypeTranslationMap.get(strategy) | translate }} + } + + + @if(persistenceSettingRowForm.get('type').value === PersistenceType.DEDUPLICATE) { + + + } +
diff --git a/ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting-row.component.ts b/ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting-row.component.ts new file mode 100644 index 0000000000..64ea045f8f --- /dev/null +++ b/ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting-row.component.ts @@ -0,0 +1,114 @@ +/// +/// Copyright © 2016-2024 The Thingsboard Authors +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// + +import { Component, forwardRef, Input } from '@angular/core'; +import { + ControlValueAccessor, + FormBuilder, + NG_VALIDATORS, + NG_VALUE_ACCESSOR, + ValidationErrors, + Validator +} from '@angular/forms'; +import { + AdvancedPersistenceConfig, + defaultAdvancedPersistenceConfig, + maxDeduplicateTimeSecs, + PersistenceType, + PersistenceTypeTranslationMap +} from '@home/components/rule-node/action/timeseries-config.models'; +import { isDefinedAndNotNull } from '@core/utils'; +import { takeUntilDestroyed } from '@angular/core/rxjs-interop'; + +@Component({ + selector: 'tb-advanced-persistence-setting-row', + templateUrl: './advanced-persistence-setting-row.component.html', + providers: [{ + provide: NG_VALUE_ACCESSOR, + useExisting: forwardRef(() => AdvancedPersistenceSettingRowComponent), + multi: true + },{ + provide: NG_VALIDATORS, + useExisting: forwardRef(() => AdvancedPersistenceSettingRowComponent), + multi: true + }] +}) +export class AdvancedPersistenceSettingRowComponent implements ControlValueAccessor, Validator { + + @Input() + title: string; + + persistenceSettingRowForm = this.fb.group({ + type: [defaultAdvancedPersistenceConfig.type], + deduplicationIntervalSecs: [{value: 60, disabled: true}] + }); + + PersistenceType = PersistenceType; + persistenceStrategies = [PersistenceType.ON_EVERY_MESSAGE, PersistenceType.DEDUPLICATE, PersistenceType.SKIP]; + PersistenceTypeTranslationMap = PersistenceTypeTranslationMap; + + maxDeduplicateTime = maxDeduplicateTimeSecs; + + private propagateChange: (value: any) => void = () => {}; + + constructor(private fb: FormBuilder) { + this.persistenceSettingRowForm.get('type').valueChanges.pipe( + takeUntilDestroyed() + ).subscribe(() => this.updatedValidation()); + + this.persistenceSettingRowForm.valueChanges.pipe( + takeUntilDestroyed() + ).subscribe((value) => this.propagateChange(value)); + } + + registerOnChange(fn: any) { + this.propagateChange = fn; + } + + registerOnTouched(_fn: any) { + } + + setDisabledState(isDisabled: boolean) { + if (isDisabled) { + this.persistenceSettingRowForm.disable({emitEvent: false}); + } else { + this.persistenceSettingRowForm.enable({emitEvent: false}); + this.updatedValidation(); + } + } + + validate(): ValidationErrors | null { + return this.persistenceSettingRowForm.valid ? null : { + persistenceSettingRow: false + }; + } + + writeValue(value: AdvancedPersistenceConfig) { + if (isDefinedAndNotNull(value)) { + this.persistenceSettingRowForm.patchValue(value, {emitEvent: false}); + } else { + this.persistenceSettingRowForm.patchValue(defaultAdvancedPersistenceConfig); + } + } + + private updatedValidation() { + if (this.persistenceSettingRowForm.get('type').value === PersistenceType.DEDUPLICATE) { + this.persistenceSettingRowForm.get('deduplicationIntervalSecs').enable({emitEvent: false}); + } else { + this.persistenceSettingRowForm.get('deduplicationIntervalSecs').disable({emitEvent: false}) + } + } +} diff --git a/ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting.component.html b/ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting.component.html new file mode 100644 index 0000000000..eb0f05bec2 --- /dev/null +++ b/ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting.component.html @@ -0,0 +1,31 @@ + +
+ + + +
diff --git a/ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting.component.ts b/ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting.component.ts new file mode 100644 index 0000000000..5c9930fbb7 --- /dev/null +++ b/ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting.component.ts @@ -0,0 +1,83 @@ +/// +/// Copyright © 2016-2024 The Thingsboard Authors +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// + +import { + ControlValueAccessor, + FormBuilder, + NG_VALIDATORS, + NG_VALUE_ACCESSOR, + ValidationErrors, + Validator +} from '@angular/forms'; +import { Component, forwardRef } from '@angular/core'; +import { takeUntilDestroyed } from '@angular/core/rxjs-interop'; +import { AdvancedPersistenceStrategy } from '@home/components/rule-node/action/timeseries-config.models'; + +@Component({ + selector: 'tb-advanced-persistence-settings', + templateUrl: './advanced-persistence-setting.component.html', + providers: [{ + provide: NG_VALUE_ACCESSOR, + useExisting: forwardRef(() => AdvancedPersistenceSettingComponent), + multi: true + },{ + provide: NG_VALIDATORS, + useExisting: forwardRef(() => AdvancedPersistenceSettingComponent), + multi: true + }] +}) +export class AdvancedPersistenceSettingComponent implements ControlValueAccessor, Validator { + + persistenceForm = this.fb.group({ + timeseries: [null], + latest: [null], + webSockets: [null] + }); + + private propagateChange: (value: any) => void = () => {}; + + constructor(private fb: FormBuilder) { + this.persistenceForm.valueChanges.pipe( + takeUntilDestroyed() + ).subscribe(value => this.propagateChange(value)); + } + + registerOnChange(fn: any) { + this.propagateChange = fn; + } + + registerOnTouched(_fn: any) { + } + + setDisabledState(isDisabled: boolean) { + if (isDisabled) { + this.persistenceForm.disable({emitEvent: false}); + } else { + this.persistenceForm.enable({emitEvent: false}); + } + } + + validate(): ValidationErrors | null { + return this.persistenceForm.valid ? null : { + persistenceForm: false + }; + } + + writeValue(value: AdvancedPersistenceStrategy) { + this.persistenceForm.patchValue(value, {emitEvent: false}); + } + +} diff --git a/ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.component.html b/ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.component.html index 31e827d331..e803d7443d 100644 --- a/ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.component.html +++ b/ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.component.html @@ -16,35 +16,75 @@ -->
- - rule-node-config.default-ttl - - - help - - - {{ 'rule-node-config.default-ttl-required' | translate }} - - - {{ 'rule-node-config.min-default-ttl-message' | translate }} - - -
-
- - {{ 'rule-node-config.use-server-ts' | translate }} - -
-
- - {{ 'rule-node-config.skip-latest-persistence' | translate }} - +
+
+
+ rule-node-config.save-time-series.persistence-settings +
+ + {{ 'rule-node-config.basic-mode' | translate}} + {{ 'rule-node-config.advanced-mode' | translate }} +
+ @if(!timeseriesConfigForm.get('persistenceSettings.isAdvanced').value) { + + rule-node-config.save-time-series.strategy + + @for (strategy of persistenceStrategies; track strategy) { + {{ PersistenceTypeTranslationMap.get(strategy) | translate }} + } + + + + @if(timeseriesConfigForm.get('persistenceSettings.type').value === PersistenceType.DEDUPLICATE) { + + + } + } + @else { + + }
+
+ + + rule-node-config.advanced-settings + + +
+ + {{ 'rule-node-config.use-server-ts' | translate }} + +
+ + + help + + +
+
+
diff --git a/ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.component.ts b/ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.component.ts index d480ec4d39..4a061d3193 100644 --- a/ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.component.ts +++ b/ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.component.ts @@ -15,8 +15,18 @@ /// import { Component } from '@angular/core'; -import { UntypedFormBuilder, UntypedFormGroup, Validators } from '@angular/forms'; -import { RuleNodeConfiguration, RuleNodeConfigurationComponent } from '@shared/models/rule-node.models'; +import { FormBuilder, FormGroup, Validators } from '@angular/forms'; +import { RuleNodeConfigurationComponent } from '@shared/models/rule-node.models'; +import { + defaultAdvancedPersistenceStrategy, + maxDeduplicateTimeSecs, + PersistenceSettings, + PersistenceSettingsForm, + PersistenceType, + PersistenceTypeTranslationMap, + TimeseriesNodeConfiguration, + TimeseriesNodeConfigurationForm +} from '@home/components/rule-node/action/timeseries-config.models'; @Component({ selector: 'tb-action-node-timeseries-config', @@ -25,21 +35,98 @@ import { RuleNodeConfiguration, RuleNodeConfigurationComponent } from '@shared/m }) export class TimeseriesConfigComponent extends RuleNodeConfigurationComponent { - timeseriesConfigForm: UntypedFormGroup; + timeseriesConfigForm: FormGroup; - constructor(private fb: UntypedFormBuilder) { + PersistenceType = PersistenceType; + persistenceStrategies = [PersistenceType.ON_EVERY_MESSAGE, PersistenceType.DEDUPLICATE, PersistenceType.WEBSOCKETS_ONLY]; + PersistenceTypeTranslationMap = PersistenceTypeTranslationMap; + + maxDeduplicateTime = maxDeduplicateTimeSecs + + constructor(private fb: FormBuilder) { super(); } - protected configForm(): UntypedFormGroup { + protected configForm(): FormGroup { return this.timeseriesConfigForm; } - protected onConfigurationSet(configuration: RuleNodeConfiguration) { + protected validatorTriggers(): string[] { + return ['persistenceSettings.isAdvanced', 'persistenceSettings.type']; + } + + protected prepareInputConfig(config: TimeseriesNodeConfiguration): TimeseriesNodeConfigurationForm { + let persistenceSettings: PersistenceSettingsForm; + if (config?.persistenceSettings) { + const isAdvanced = config?.persistenceSettings?.type === PersistenceType.ADVANCED; + persistenceSettings = { + type: isAdvanced ? PersistenceType.ON_EVERY_MESSAGE : config.persistenceSettings.type, + isAdvanced: isAdvanced, + deduplicationIntervalSecs: config.persistenceSettings?.deduplicationIntervalSecs ?? 60, + advanced: isAdvanced ? config.persistenceSettings : defaultAdvancedPersistenceStrategy + } + } else { + persistenceSettings = { + type: PersistenceType.ON_EVERY_MESSAGE, + isAdvanced: false, + deduplicationIntervalSecs: 60, + advanced: defaultAdvancedPersistenceStrategy + }; + } + return { + ...config, + persistenceSettings: persistenceSettings + } + } + + protected prepareOutputConfig(config: TimeseriesNodeConfigurationForm): TimeseriesNodeConfiguration { + let persistenceSettings: PersistenceSettings; + if (config.persistenceSettings.isAdvanced) { + persistenceSettings = { + ...config.persistenceSettings.advanced, + type: PersistenceType.ADVANCED + }; + } else { + persistenceSettings = { + type: config.persistenceSettings.type, + deduplicationIntervalSecs: config.persistenceSettings?.deduplicationIntervalSecs + }; + } + return { + ...config, + persistenceSettings + }; + } + + protected onConfigurationSet(config: TimeseriesNodeConfigurationForm) { this.timeseriesConfigForm = this.fb.group({ - defaultTTL: [configuration ? configuration.defaultTTL : null, [Validators.required, Validators.min(0)]], - skipLatestPersistence: [configuration ? configuration.skipLatestPersistence : false, []], - useServerTs: [configuration ? configuration.useServerTs : false, []] + persistenceSettings: this.fb.group({ + isAdvanced: [config?.persistenceSettings?.isAdvanced ?? false], + type: [config?.persistenceSettings?.type ?? PersistenceType.ON_EVERY_MESSAGE], + deduplicationIntervalSecs: [ + {value: config?.persistenceSettings?.deduplicationIntervalSecs ?? 60, disabled: true}, + [Validators.required, Validators.max(maxDeduplicateTimeSecs)] + ], + advanced: [{value: null, disabled: true}] + }), + defaultTTL: [config?.defaultTTL ?? null, [Validators.required, Validators.min(0)]], + useServerTs: [config?.useServerTs ?? false] }); } + + protected updateValidators(emitEvent: boolean, _trigger?: string) { + const persistenceForm = this.timeseriesConfigForm.get('persistenceSettings') as FormGroup; + const isAdvanced: boolean = persistenceForm.get('isAdvanced').value; + const type: PersistenceType = persistenceForm.get('type').value; + if (!isAdvanced && type === PersistenceType.DEDUPLICATE) { + persistenceForm.get('deduplicationIntervalSecs').enable({emitEvent}); + } else { + persistenceForm.get('deduplicationIntervalSecs').disable({emitEvent}); + } + if (isAdvanced) { + persistenceForm.get('advanced').enable({emitEvent}); + } else { + persistenceForm.get('advanced').disable({emitEvent}); + } + } } diff --git a/ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.models.ts b/ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.models.ts new file mode 100644 index 0000000000..f70e8548b1 --- /dev/null +++ b/ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.models.ts @@ -0,0 +1,78 @@ +/// +/// Copyright © 2016-2024 The Thingsboard Authors +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// + +import { DAY, SECOND } from '@shared/models/time/time.models'; + +export const maxDeduplicateTimeSecs = DAY / SECOND; + +export interface TimeseriesNodeConfiguration { + persistenceSettings: PersistenceSettings; + defaultTTL: number; + useServerTs: boolean; +} + +export interface TimeseriesNodeConfigurationForm extends Omit { + persistenceSettings: PersistenceSettingsForm +} + +export type PersistenceSettings = BasicPersistenceSettings & Partial & Partial; + +export type PersistenceSettingsForm = Omit & { + isAdvanced: boolean; + advanced?: Partial; + type: PersistenceType; +}; + +export enum PersistenceType { + ON_EVERY_MESSAGE = 'ON_EVERY_MESSAGE', + DEDUPLICATE = 'DEDUPLICATE', + WEBSOCKETS_ONLY = 'WEBSOCKETS_ONLY', + ADVANCED = 'ADVANCED', + SKIP = 'SKIP' +} + +export const PersistenceTypeTranslationMap = new Map([ + [PersistenceType.ON_EVERY_MESSAGE, 'rule-node-config.save-time-series.strategy-type.every-message'], + [PersistenceType.DEDUPLICATE, 'rule-node-config.save-time-series.strategy-type.deduplicate'], + [PersistenceType.WEBSOCKETS_ONLY, 'rule-node-config.save-time-series.strategy-type.web-sockets-only'], + [PersistenceType.SKIP, 'rule-node-config.save-time-series.strategy-type.skip'], +]) + +export interface BasicPersistenceSettings { + type: PersistenceType; +} + +export interface DeduplicatePersistenceStrategy extends BasicPersistenceSettings{ + deduplicationIntervalSecs: number; +} + +export interface AdvancedPersistenceStrategy extends BasicPersistenceSettings{ + timeseries: AdvancedPersistenceConfig; + latest: AdvancedPersistenceConfig; + webSockets: AdvancedPersistenceConfig; +} + +export type AdvancedPersistenceConfig = WithOptional; + +export const defaultAdvancedPersistenceConfig: AdvancedPersistenceConfig = { + type: PersistenceType.ON_EVERY_MESSAGE +} + +export const defaultAdvancedPersistenceStrategy: Omit = { + timeseries: defaultAdvancedPersistenceConfig, + latest: defaultAdvancedPersistenceConfig, + webSockets: defaultAdvancedPersistenceConfig, +} diff --git a/ui-ngx/src/app/modules/home/components/rule-node/common/common-rule-node-config.module.ts b/ui-ngx/src/app/modules/home/components/rule-node/common/common-rule-node-config.module.ts index c72b9901c9..b6370be318 100644 --- a/ui-ngx/src/app/modules/home/components/rule-node/common/common-rule-node-config.module.ts +++ b/ui-ngx/src/app/modules/home/components/rule-node/common/common-rule-node-config.module.ts @@ -33,6 +33,7 @@ import { RelationsQueryConfigOldComponent } from './relations-query-config-old.c import { SelectAttributesComponent } from './select-attributes.component'; import { AlarmStatusSelectComponent } from './alarm-status-select.component'; import { ExampleHintComponent } from './example-hint.component'; +import { TimeUnitInputComponent } from './time-unit-input.component'; @NgModule({ declarations: [ @@ -50,7 +51,8 @@ import { ExampleHintComponent } from './example-hint.component'; RelationsQueryConfigOldComponent, SelectAttributesComponent, AlarmStatusSelectComponent, - ExampleHintComponent + ExampleHintComponent, + TimeUnitInputComponent ], imports: [ CommonModule, @@ -72,7 +74,8 @@ import { ExampleHintComponent } from './example-hint.component'; RelationsQueryConfigOldComponent, SelectAttributesComponent, AlarmStatusSelectComponent, - ExampleHintComponent + ExampleHintComponent, + TimeUnitInputComponent ] }) diff --git a/ui-ngx/src/app/modules/home/components/rule-node/common/time-unit-input.component.html b/ui-ngx/src/app/modules/home/components/rule-node/common/time-unit-input.component.html new file mode 100644 index 0000000000..8fe112f62c --- /dev/null +++ b/ui-ngx/src/app/modules/home/components/rule-node/common/time-unit-input.component.html @@ -0,0 +1,44 @@ + +
+ + {{ labelText }} + +
+ +
+ + + {{ requiredText }} + + + {{ minErrorText }} + + + {{ maxErrorText }} + +
+ + rule-node-config.units + + @for (timeUnit of timeUnits; track timeUnit) { + {{ timeUnitTranslations.get(timeUnit) | translate }} + } + + +
diff --git a/ui-ngx/src/app/modules/home/components/rule-node/common/time-unit-input.component.ts b/ui-ngx/src/app/modules/home/components/rule-node/common/time-unit-input.component.ts new file mode 100644 index 0000000000..11358ae6eb --- /dev/null +++ b/ui-ngx/src/app/modules/home/components/rule-node/common/time-unit-input.component.ts @@ -0,0 +1,199 @@ +/// +/// Copyright © 2016-2024 The Thingsboard Authors +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// + +import { Component, DestroyRef, forwardRef, Input, OnInit } from '@angular/core'; +import { + AbstractControl, + ControlValueAccessor, + FormBuilder, + NG_VALIDATORS, + NG_VALUE_ACCESSOR, + ValidationErrors, + Validator, + Validators +} from '@angular/forms'; +import { TimeUnit, timeUnitTranslations } from '../rule-node-config.models'; +import { isDefinedAndNotNull, isNumeric } from '@core/utils'; +import { takeUntilDestroyed } from '@angular/core/rxjs-interop'; +import { coerceBoolean, coerceNumber } from '@shared/decorators/coercion'; +import { DAY, HOUR, MINUTE, SECOND } from '@shared/models/time/time.models'; +import { SubscriptSizing } from '@angular/material/form-field'; + +interface TimeUnitInputModel { + time: number; + timeUnit: TimeUnit +} + +@Component({ + selector: 'tb-time-unit-input', + templateUrl: './time-unit-input.component.html', + providers: [{ + provide: NG_VALUE_ACCESSOR, + useExisting: forwardRef(() => TimeUnitInputComponent), + multi: true + },{ + provide: NG_VALIDATORS, + useExisting: forwardRef(() => TimeUnitInputComponent), + multi: true + }] +}) +export class TimeUnitInputComponent implements ControlValueAccessor, Validator, OnInit { + + @Input() + labelText: string; + + @Input() + @coerceBoolean() + required: boolean; + + @Input() + requiredText: string; + + @Input() + @coerceNumber() + minTime = 0; + + @Input() + minErrorText: string; + + @Input() + @coerceNumber() + maxTime: number; + + @Input() + maxErrorText: string; + + @Input() + subscriptSizing: SubscriptSizing = 'fixed'; + + timeUnits = Object.values(TimeUnit).filter(item => item !== TimeUnit.MILLISECONDS) as TimeUnit[]; + + timeUnitTranslations = timeUnitTranslations; + + timeInputForm = this.fb.group({ + time: [0], + timeUnit: [TimeUnit.SECONDS] + }); + + private timeIntervalsInSec = new Map([ + [TimeUnit.DAYS, DAY/SECOND], + [TimeUnit.HOURS, HOUR/SECOND], + [TimeUnit.MINUTES, MINUTE/SECOND], + [TimeUnit.SECONDS, SECOND/SECOND], + ]); + + private modelValue: number; + + private propagateChange: (value: any) => void = () => {}; + + constructor(private fb: FormBuilder, + private destroyRef: DestroyRef) { + } + + ngOnInit() { + if(this.required || this.maxTime) { + const timeControl = this.timeInputForm.get('time'); + const validators = [Validators.pattern(/^\d*$/)]; + if (this.required) { + validators.push(Validators.required); + } + if (this.maxTime) { + validators.push((control: AbstractControl) => + Validators.max(Math.floor(this.maxTime / this.timeIntervalsInSec.get(this.timeInputForm.get('timeUnit').value)))(control) + ); + } + if (isDefinedAndNotNull(this.minTime)) { + validators.push(Validators.min(this.minTime)); + } + + timeControl.setValidators(validators); + timeControl.updateValueAndValidity({ emitEvent: false }); + } + + this.timeInputForm.get('timeUnit').valueChanges.pipe( + takeUntilDestroyed(this.destroyRef) + ).subscribe(() => { + this.timeInputForm.get('time').updateValueAndValidity({onlySelf: true}); + this.timeInputForm.get('time').markAsTouched({onlySelf: true}); + }); + + this.timeInputForm.valueChanges.pipe( + takeUntilDestroyed(this.destroyRef) + ).subscribe(value => { + this.updatedModel(value); + }); + } + + registerOnChange(fn: any) { + this.propagateChange = fn; + } + + registerOnTouched(_fn: any) { + } + + setDisabledState(isDisabled: boolean) { + if (isDisabled) { + this.timeInputForm.disable({emitEvent: false}); + } else { + this.timeInputForm.enable({emitEvent: false}); + if(this.timeInputForm.invalid) { + setTimeout(() => this.updatedModel(this.timeInputForm.value, true)) + } + } + } + + writeValue(sec: number) { + if (sec !== this.modelValue) { + if (isDefinedAndNotNull(sec) && isNumeric(sec) && Number(sec) !== 0) { + this.timeInputForm.patchValue(this.parseTime(sec), {emitEvent: false}); + this.modelValue = sec; + } else { + this.timeInputForm.patchValue({ + time: 0, + timeUnit: TimeUnit.SECONDS + }, {emitEvent: false}); + this.modelValue = 0; + } + } + } + + validate(): ValidationErrors | null { + return this.timeInputForm.valid ? null : { + timeInput: false + }; + } + + private updatedModel(value: Partial, forceUpdated = false) { + const time = value.time * this.timeIntervalsInSec.get(value.timeUnit); + if (this.modelValue !== time || forceUpdated) { + this.modelValue = time; + this.propagateChange(time); + } + } + + private parseTime(value: number): TimeUnitInputModel { + for (const [timeUnit, timeValue] of this.timeIntervalsInSec) { + const calc = value / timeValue; + if (Number.isInteger(calc)) { + return { + time: calc, + timeUnit: timeUnit + } + } + } + } + +} diff --git a/ui-ngx/src/assets/locale/locale.constant-en_US.json b/ui-ngx/src/assets/locale/locale.constant-en_US.json index 1a08cd5ba8..57df3865e9 100644 --- a/ui-ngx/src/assets/locale/locale.constant-en_US.json +++ b/ui-ngx/src/assets/locale/locale.constant-en_US.json @@ -4604,7 +4604,7 @@ "originator-entity": "Entity by name pattern", "clone-message": "Clone message", "transform": "Transform", - "default-ttl": "Default TTL in seconds", + "default-ttl": "Default TTL", "default-ttl-required": "Default TTL is required.", "default-ttl-hint": "Rule node will fetch Time-to-Live (TTL) value from the message metadata. If no value is present, it defaults to the TTL specified in the configuration. If the value is set to 0, the TTL from the tenant profile configuration will be applied.", "default-ttl-zero-hint": "TTL will not be applied if its value is set to 0.", @@ -4969,10 +4969,8 @@ "general-pattern-hint": "Use ${metadataKey} for value from metadata, $[messageKey] for value from message body.", "alarm-severity-pattern-hint": "Use ${metadataKey} for value from metadata, $[messageKey] for value from message body. Alarm severity should be system (CRITICAL, MAJOR etc.)", "output-node-name-hint": "The rule node name corresponds to the relation type of the output message, and it is used to forward messages to other rule nodes in the caller rule chain.", - "skip-latest-persistence": "Skip latest persistence", - "skip-latest-persistence-hint": "Rule node will not update values for incoming keys for the latest time series data. Useful for highly loaded use-cases to decrease the pressure on the DB.", - "use-server-ts": "Use server ts", - "use-server-ts-hint": "Rule node will use the timestamp of message processing instead of the timestamp from the message. Useful for all sorts of sequential processing if you merge messages from multiple sources (devices, assets, etc).", + "use-server-ts": "Use server timestamp", + "use-server-ts-hint": "Use the server’s current timestamp for time series data that lacks an explicit timestamp. This helps maintain proper ordering when processing messages from multiple sources or when messages arrive out of sequence.", "kv-map-pattern-hint": "All input fields support templatization. Use $[messageKey] to extract value from the message and ${metadataKey} to extract value from the metadata.", "kv-map-single-pattern-hint": "Input field support templatization. Use $[messageKey] to extract value from the message and ${metadataKey} to extract value from the metadata.", "shared-scope": "Shared scope", @@ -5130,8 +5128,28 @@ "request-timeout-required": "Request timeout is required", "request-timeout-min": "Min request timeout is 0", "request-timeout-hint": "The amount of time to wait in seconds for the request to complete before giving up and timing out. A value of 0 means infinity, and is not recommended.", + "units": "Units", "tell-failure-aws-lambda": "Tell Failure if AWS Lambda function execution raises exception", "tell-failure-aws-lambda-hint": "Rule node forces failure of message processing if AWS Lambda function execution raises exception.", + "basic-mode": "Basic", + "advanced-mode": "Advanced", + "save-time-series": { + "persistence-settings": "Persistence settings", + "persistence-settings-hint": "Define how and when time series data is saved. In Basic mode, apply a single persistence strategy to all actions or enable only WebSockets updates. Advanced mode allows you to configure individual persistence strategies for each action.", + "strategy": "Strategy", + "deduplication-interval": "Deduplication interval", + "deduplication-interval-required": "Deduplication interval is required", + "deduplication-interval-min-max-range": "Deduplication interval should be at least 1 second and at most 1 day", + "strategy-type": { + "every-message": "On every message", + "skip": "Skip", + "deduplicate": "Deduplicate", + "web-sockets-only": "WebSockets only" + }, + "time-series": "Time series", + "latest": "Latest", + "web-sockets": "WebSockets" + }, "key-val": { "key": "Key", "value": "Value",