Browse Source

Merge branch 'master' into improvement/datetime-period-validation

pull/12630/head
Ekaterina Chantsova 1 year ago
committed by GitHub
parent
commit
fc4edc0209
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 9
      application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json
  2. 9
      application/src/main/data/json/tenant/device_profile/rule_chain_template.json
  3. 7
      application/src/main/data/json/tenant/rule_chains/root_rule_chain.json
  4. 228
      application/src/main/data/upgrade/basic/schema_update.sql
  5. 25
      application/src/main/java/org/thingsboard/server/config/SwaggerConfiguration.java
  6. 19
      application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java
  7. 8
      application/src/main/java/org/thingsboard/server/service/entitiy/dashboard/DashboardSyncService.java
  8. 2
      application/src/main/java/org/thingsboard/server/service/entitiy/entityview/DefaultTbEntityViewService.java
  9. 42
      application/src/main/java/org/thingsboard/server/service/install/DefaultDatabaseSchemaSettingsService.java
  10. 20
      application/src/main/java/org/thingsboard/server/service/install/InstallScripts.java
  11. 36
      application/src/main/java/org/thingsboard/server/service/install/ProjectInfo.java
  12. 4
      application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java
  13. 27
      application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
  14. 2
      application/src/main/resources/thingsboard.yml
  15. 107
      application/src/test/java/org/thingsboard/server/service/entitiy/entityview/DefaultTbEntityViewServiceTest.java
  16. 373
      application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java
  17. 2
      common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/GitRepository.java
  18. 26
      monitoring/src/main/resources/root_rule_chain.json
  19. 8
      msa/black-box-tests/src/test/resources/MqttRuleNodeTestMetadata.json
  20. 27
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java
  21. 51
      rule-engine/rule-engine-api/src/test/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequestTest.java
  22. 132
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java
  23. 66
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java
  24. 89
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/DeduplicatePersistenceStrategy.java
  25. 38
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/OnEveryMessagePersistenceStrategy.java
  26. 49
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/PersistenceStrategy.java
  27. 38
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/SkipPersistenceStrategy.java
  28. 4
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java
  29. 449
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java
  30. 260
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/strategy/DeduplicatePersistenceStrategyTest.java
  31. 48
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/strategy/OnEveryMessagePersistenceStrategyTest.java
  32. 55
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/strategy/PersistenceStrategyTest.java
  33. 48
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/strategy/SkipPersistenceStrategyTest.java
  34. 10
      ui-ngx/src/app/modules/home/components/rule-node/action/action-rule-node-config.module.ts
  35. 40
      ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting-row.component.html
  36. 114
      ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting-row.component.ts
  37. 31
      ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting.component.html
  38. 83
      ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting.component.ts
  39. 98
      ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.component.html
  40. 105
      ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.component.ts
  41. 78
      ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.models.ts
  42. 7
      ui-ngx/src/app/modules/home/components/rule-node/common/common-rule-node-config.module.ts
  43. 44
      ui-ngx/src/app/modules/home/components/rule-node/common/time-unit-input.component.html
  44. 199
      ui-ngx/src/app/modules/home/components/rule-node/common/time-unit-input.component.ts
  45. 28
      ui-ngx/src/assets/locale/locale.constant-en_US.json

9
application/src/main/data/json/edge/rule_chains/edge_root_rule_chain.json

@ -33,8 +33,13 @@
},
"type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode",
"name": "Save Timeseries",
"configurationVersion": 1,
"configuration": {
"defaultTTL": 0
"defaultTTL": 0,
"useServerTs": false,
"persistenceSettings": {
"type": "ON_EVERY_MESSAGE"
}
},
"externalId": null
},
@ -185,4 +190,4 @@
],
"ruleChainConnections": null
}
}
}

9
application/src/main/data/json/tenant/device_profile/rule_chain_template.json

@ -19,8 +19,13 @@
},
"type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode",
"name": "Save Timeseries",
"configurationVersion": 1,
"configuration": {
"defaultTTL": 0
"defaultTTL": 0,
"useServerTs": false,
"persistenceSettings": {
"type": "ON_EVERY_MESSAGE"
}
}
},
{
@ -134,4 +139,4 @@
],
"ruleChainConnections": null
}
}
}

7
application/src/main/data/json/tenant/rule_chains/root_rule_chain.json

@ -18,8 +18,13 @@
},
"type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode",
"name": "Save Timeseries",
"configurationVersion": 1,
"configuration": {
"defaultTTL": 0
"defaultTTL": 0,
"useServerTs": false,
"persistenceSettings": {
"type": "ON_EVERY_MESSAGE"
}
}
},
{

228
application/src/main/data/upgrade/basic/schema_update.sql

@ -14,198 +14,50 @@
-- limitations under the License.
--
ALTER TABLE user_credentials ADD COLUMN IF NOT EXISTS last_login_ts BIGINT;
UPDATE user_credentials c SET last_login_ts = (SELECT (additional_info::json ->> 'lastLoginTs')::bigint FROM tb_user u WHERE u.id = c.user_id)
WHERE last_login_ts IS NULL;
-- UPDATE SAVE TIME SERIES NODES START
ALTER TABLE user_credentials ADD COLUMN IF NOT EXISTS failed_login_attempts INT;
UPDATE user_credentials c SET failed_login_attempts = (SELECT (additional_info::json ->> 'failedLoginAttempts')::int FROM tb_user u WHERE u.id = c.user_id)
WHERE failed_login_attempts IS NULL;
UPDATE tb_user SET additional_info = (additional_info::jsonb - 'lastLoginTs' - 'failedLoginAttempts' - 'userCredentialsEnabled')::text
WHERE additional_info IS NOT NULL AND additional_info != 'null' AND jsonb_typeof(additional_info::jsonb) = 'object';
-- UPDATE RULE NODE DEBUG MODE TO DEBUG STRATEGY START
ALTER TABLE rule_node ADD COLUMN IF NOT EXISTS debug_settings varchar(1024) DEFAULT null;
DO
$$
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'rule_node' AND column_name = 'debug_mode')
THEN
UPDATE rule_node SET debug_settings = '{"failuresEnabled": true, "allEnabledUntil": ' || cast((extract(epoch from now()) + 900) * 1000 as bigint) || '}' WHERE debug_mode = true; -- 15 minutes according to thingsboard.yml default settings.
ALTER TABLE rule_node DROP COLUMN debug_mode;
END IF;
END
$$;
-- UPDATE RULE NODE DEBUG MODE TO DEBUG STRATEGY END
-- CREATE MOBILE APP BUNDLES FROM EXISTING APPS
CREATE TABLE IF NOT EXISTS mobile_app_bundle (
id uuid NOT NULL CONSTRAINT mobile_app_bundle_pkey PRIMARY KEY,
created_time bigint NOT NULL,
tenant_id uuid,
title varchar(255),
description varchar(1024),
android_app_id uuid UNIQUE,
ios_app_id uuid UNIQUE,
layout_config varchar(16384),
oauth2_enabled boolean,
CONSTRAINT fk_android_app_id FOREIGN KEY (android_app_id) REFERENCES mobile_app(id) ON DELETE SET NULL,
CONSTRAINT fk_ios_app_id FOREIGN KEY (ios_app_id) REFERENCES mobile_app(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS mobile_app_bundle_tenant_id ON mobile_app_bundle(tenant_id);
ALTER TABLE mobile_app ADD COLUMN IF NOT EXISTS platform_type varchar(32),
ADD COLUMN IF NOT EXISTS status varchar(32),
ADD COLUMN IF NOT EXISTS version_info varchar(100000),
ADD COLUMN IF NOT EXISTS store_info varchar(16384),
DROP CONSTRAINT IF EXISTS mobile_app_pkg_name_key,
DROP CONSTRAINT IF EXISTS mobile_app_unq_key;
-- rename mobile_app_oauth2_client to mobile_app_bundle_oauth2_client
DO
$$
BEGIN
-- in case of running the upgrade script a second time
IF EXISTS(SELECT 1 FROM information_schema.tables WHERE table_name = 'mobile_app_oauth2_client') THEN
ALTER TABLE mobile_app_oauth2_client RENAME TO mobile_app_bundle_oauth2_client;
ALTER TABLE mobile_app_bundle_oauth2_client DROP CONSTRAINT IF EXISTS fk_domain;
ALTER TABLE mobile_app_bundle_oauth2_client RENAME COLUMN mobile_app_id TO mobile_app_bundle_id;
END IF;
END;
$$;
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
-- duplicate each mobile app and create mobile app bundle for the pair of android and ios app
DO
$$
DECLARE
generatedBundleId uuid;
iosAppId uuid;
mobileAppRecord RECORD;
DO $$
BEGIN
-- in case of running the upgrade script a second time
IF EXISTS(SELECT 1 FROM information_schema.columns WHERE table_name = 'mobile_app' and column_name = 'oauth2_enabled') THEN
UPDATE mobile_app SET platform_type = 'ANDROID' WHERE platform_type IS NULL;
UPDATE mobile_app SET status = 'DRAFT' WHERE mobile_app.status IS NULL;
FOR mobileAppRecord IN SELECT * FROM mobile_app
LOOP
-- duplicate app for iOS platform type
iosAppId := uuid_generate_v4();
INSERT INTO mobile_app(id, created_time, tenant_id, pkg_name, app_secret, platform_type, status)
VALUES (iosAppId, mobileAppRecord.created_time, mobileAppRecord.tenant_id, mobileAppRecord.pkg_name, mobileAppRecord.app_secret, 'IOS', mobileAppRecord.status)
ON CONFLICT DO NOTHING;
-- create bundle for android and iOS app
generatedBundleId := uuid_generate_v4();
INSERT INTO mobile_app_bundle(id, created_time, tenant_id, title, android_app_id, ios_app_id, oauth2_enabled)
VALUES (generatedBundleId, mobileAppRecord.created_time, mobileAppRecord.tenant_id,
mobileAppRecord.pkg_name || ' (autogenerated)', mobileAppRecord.id, iosAppId, mobileAppRecord.oauth2_enabled)
ON CONFLICT DO NOTHING;
UPDATE mobile_app_bundle_oauth2_client SET mobile_app_bundle_id = generatedBundleId WHERE mobile_app_bundle_id = mobileAppRecord.id;
END LOOP;
END IF;
IF NOT EXISTS(SELECT 1 FROM pg_constraint WHERE conname = 'fk_mobile_app_bundle_oauth2_client_bundle_id') THEN
ALTER TABLE mobile_app_bundle_oauth2_client ADD CONSTRAINT fk_mobile_app_bundle_oauth2_client_bundle_id
FOREIGN KEY (mobile_app_bundle_id) REFERENCES mobile_app_bundle(id) ON DELETE CASCADE;
END IF;
ALTER TABLE mobile_app DROP COLUMN IF EXISTS oauth2_enabled;
IF NOT EXISTS(SELECT 1 FROM pg_constraint WHERE conname = 'mobile_app_pkg_name_platform_unq_key') THEN
ALTER TABLE mobile_app ADD CONSTRAINT mobile_app_pkg_name_platform_unq_key UNIQUE (pkg_name, platform_type);
END IF;
END;
$$;
-- Check if the rule_node table exists
IF EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_name = 'rule_node'
) THEN
UPDATE rule_node
SET configuration = (
(configuration::jsonb - 'skipLatestPersistence')
|| jsonb_build_object(
'persistenceSettings', jsonb_build_object(
'type', 'ADVANCED',
'timeseries', jsonb_build_object('type', 'ON_EVERY_MESSAGE'),
'latest', jsonb_build_object('type', 'SKIP'),
'webSockets', jsonb_build_object('type', 'ON_EVERY_MESSAGE')
)
)
)::text,
configuration_version = 1
WHERE type = 'org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode'
AND configuration_version = 0
AND configuration::jsonb ->> 'skipLatestPersistence' = 'true';
UPDATE rule_node
SET configuration = (
(configuration::jsonb - 'skipLatestPersistence')
|| jsonb_build_object(
'persistenceSettings', jsonb_build_object(
'type', 'ON_EVERY_MESSAGE'
)
)
)::text,
configuration_version = 1
WHERE type = 'org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode'
AND configuration_version = 0
AND (configuration::jsonb ->> 'skipLatestPersistence' != 'true' OR configuration::jsonb ->> 'skipLatestPersistence' IS NULL);
ALTER TABLE IF EXISTS mobile_app_settings RENAME TO qr_code_settings;
ALTER TABLE qr_code_settings ADD COLUMN IF NOT EXISTS mobile_app_bundle_id uuid,
ADD COLUMN IF NOT EXISTS android_enabled boolean,
ADD COLUMN IF NOT EXISTS ios_enabled boolean;
-- migrate mobile apps from qr code settings to mobile_app, create mobile app bundle for the pair of apps
DO
$$
DECLARE
androidPkgName varchar;
iosPkgName varchar;
androidAppId uuid;
iosAppId uuid;
generatedBundleId uuid;
qrCodeRecord RECORD;
BEGIN
-- in case of running the upgrade script a second time
IF EXISTS(SELECT 1 FROM information_schema.columns WHERE table_name = 'qr_code_settings' AND column_name = 'android_config') THEN
FOR qrCodeRecord IN SELECT * FROM qr_code_settings
LOOP
generatedBundleId := NULL;
-- migrate android config
IF (qrCodeRecord.android_config::jsonb ->> 'appPackage' IS NOT NULL) THEN
androidPkgName := qrCodeRecord.android_config::jsonb ->> 'appPackage';
SELECT id into androidAppId FROM mobile_app WHERE pkg_name = androidPkgName AND platform_type = 'ANDROID';
IF androidAppId IS NULL THEN
androidAppId := uuid_generate_v4();
INSERT INTO mobile_app(id, created_time, tenant_id, pkg_name, platform_type, status, store_info)
VALUES (androidAppId, (extract(epoch from now()) * 1000), qrCodeRecord.tenant_id,
androidPkgName, 'ANDROID', 'DRAFT', qrCodeRecord.android_config::jsonb - 'appPackage' - 'enabled');
generatedBundleId := uuid_generate_v4();
INSERT INTO mobile_app_bundle(id, created_time, tenant_id, title, android_app_id)
VALUES (generatedBundleId, (extract(epoch from now()) * 1000), qrCodeRecord.tenant_id, androidPkgName || ' (autogenerated)', androidAppId);
UPDATE qr_code_settings SET mobile_app_bundle_id = generatedBundleId;
ELSE
UPDATE mobile_app SET store_info = qrCodeRecord.android_config::jsonb - 'appPackage' - 'enabled' WHERE id = androidAppId;
UPDATE qr_code_settings SET mobile_app_bundle_id = (SELECT id FROM mobile_app_bundle WHERE mobile_app_bundle.android_app_id = androidAppId);
END IF;
END IF;
UPDATE qr_code_settings SET android_enabled = (qrCodeRecord.android_config::jsonb ->> 'enabled')::boolean WHERE id = qrCodeRecord.id;
-- migrate ios config
IF (qrCodeRecord.ios_config::jsonb ->> 'appId' IS NOT NULL) THEN
iosPkgName := substring(qrCodeRecord.ios_config::jsonb ->> 'appId', strpos(qrCodeRecord.ios_config::jsonb ->> 'appId', '.') + 1);
SELECT id INTO iosAppId FROM mobile_app WHERE pkg_name = iosPkgName AND platform_type = 'IOS';
IF iosAppId IS NULL THEN
iosAppId := uuid_generate_v4();
INSERT INTO mobile_app(id, created_time, tenant_id, pkg_name, platform_type, status, store_info)
VALUES (iosAppId, (extract(epoch from now()) * 1000), qrCodeRecord.tenant_id,
iosPkgName, 'IOS', 'DRAFT', qrCodeRecord.ios_config::jsonb - 'enabled');
IF generatedBundleId IS NULL THEN
generatedBundleId := uuid_generate_v4();
INSERT INTO mobile_app_bundle(id, created_time, tenant_id, title, ios_app_id)
VALUES (generatedBundleId, (extract(epoch from now()) * 1000), qrCodeRecord.tenant_id, iosPkgName || ' (autogenerated)', iosAppId);
UPDATE qr_code_settings SET mobile_app_bundle_id = generatedBundleId;
ELSE
UPDATE mobile_app_bundle SET ios_app_id = iosAppId WHERE id = generatedBundleId;
END IF;
ELSE
UPDATE qr_code_settings SET mobile_app_bundle_id = (SELECT id FROM mobile_app_bundle WHERE mobile_app_bundle.ios_app_id = iosAppId);
UPDATE mobile_app SET store_info = qrCodeRecord.ios_config::jsonb - 'enabled' WHERE id = iosAppId;
END IF;
END IF;
UPDATE qr_code_settings SET ios_enabled = (qrCodeRecord.ios_config::jsonb -> 'enabled')::boolean WHERE id = qrCodeRecord.id;
END LOOP;
ALTER TABLE qr_code_settings RENAME CONSTRAINT mobile_app_settings_tenant_id_unq_key TO qr_code_settings_tenant_id_unq_key;
ALTER TABLE qr_code_settings RENAME CONSTRAINT mobile_app_settings_pkey TO qr_code_settings_pkey;
END IF;
ALTER TABLE qr_code_settings DROP COLUMN IF EXISTS android_config, DROP COLUMN IF EXISTS ios_config;
END;
$$;
-- update constraint name
DO
$$
BEGIN
ALTER TABLE domain DROP CONSTRAINT IF EXISTS domain_unq_key;
IF NOT EXISTS(SELECT 1 FROM pg_constraint WHERE conname = 'domain_name_key') THEN
ALTER TABLE domain ADD CONSTRAINT domain_name_key UNIQUE (name);
END IF;
END;
$$;
-- UPDATE RESOURCE JS_MODULE SUB TYPE START
UPDATE resource SET resource_sub_type = 'EXTENSION' WHERE resource_type = 'JS_MODULE' AND resource_sub_type IS NULL;
-- UPDATE RESOURCE JS_MODULE SUB TYPE END
-- UPDATE SAVE TIME SERIES NODES END

25
application/src/main/java/org/thingsboard/server/config/SwaggerConfiguration.java

@ -84,6 +84,7 @@ import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;
public class SwaggerConfiguration {
public static final String LOGIN_ENDPOINT = "/api/auth/login";
public static final String REFRESH_TOKEN_ENDPOINT = "/api/auth/token";
private static final ApiResponses loginResponses = loginResponses();
private static final ApiResponses defaultErrorResponses = defaultErrorResponses(false);
@ -150,6 +151,7 @@ public class SwaggerConfiguration {
.info(info);
addDefaultSchemas(openApi);
addLoginOperation(openApi);
addRefreshTokenOperation(openApi);
return openApi;
}
@ -210,6 +212,29 @@ public class SwaggerConfiguration {
openAPI.path(LOGIN_ENDPOINT, pathItem);
}
private void addRefreshTokenOperation(OpenAPI openAPI) {
var operation = new Operation();
operation.summary("Refresh user JWT token data");
operation.description("""
Method to refresh JWT token. Provide a valid refresh token to get a new JWT token.
The response contains a new token that can be used for authorization.
`X-Authorization: Bearer $JWT_TOKEN_VALUE`""");
var requestBody = new RequestBody().description("Refresh token request")
.content(new Content().addMediaType(APPLICATION_JSON_VALUE,
new MediaType().schema(new Schema<JsonNode>().addProperty("refreshToken", new Schema<>().type("string")))));
operation.requestBody(requestBody);
operation.responses(loginResponses);
operation.addTagsItem("login-endpoint");
var pathItem = new PathItem().post(operation);
openAPI.path(REFRESH_TOKEN_ENDPOINT, pathItem);
}
@Bean
public GroupedOpenApi groupedApi(SpringDocParameterNameDiscoverer localSpringDocParameterNameDiscoverer) {
return GroupedOpenApi.builder()

19
application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java

@ -35,8 +35,6 @@ import org.thingsboard.server.service.install.migrate.TsLatestMigrateService;
import org.thingsboard.server.service.install.update.CacheCleanupService;
import org.thingsboard.server.service.install.update.DataUpdateService;
import static org.thingsboard.server.service.install.update.DefaultDataUpdateService.getEnv;
@Service
@Profile("install")
@Slf4j
@ -99,8 +97,6 @@ public class ThingsboardInstallService {
if ("cassandra-latest-to-postgres".equals(upgradeFromVersion)) {
log.info("Migrating ThingsBoard latest timeseries data from cassandra to SQL database ...");
latestMigrateService.migrate();
} else if (upgradeFromVersion.equals("3.9.0-resources")) {
installScripts.updateResourcesUsage();
} else {
// TODO DON'T FORGET to update SUPPORTED_VERSIONS_FROM in DefaultDatabaseSchemaSettingsService
databaseSchemaVersionService.validateSchemaSettings();
@ -118,25 +114,16 @@ public class ThingsboardInstallService {
entityDatabaseSchemaService.createOrUpdateDeviceInfoView(persistToTelemetry);
// Creates missing indexes.
entityDatabaseSchemaService.createDatabaseIndexes();
// Runs upgrade scripts that are not possible in plain SQL.
// TODO: cleanup update code after each release
if (!getEnv("SKIP_RESOURCES_USAGE_MIGRATION", false)) {
installScripts.setUpdateResourcesUsage(true);
} else {
log.info("Skipping resources usage migration. Run the upgrade with fromVersion as '3.9.0-resources' to migrate");
}
if (installScripts.isUpdateResourcesUsage()) {
installScripts.updateResourcesUsage();
}
// Runs upgrade scripts that are not possible in plain SQL.
dataUpdateService.updateData();
log.info("Updating system data...");
dataUpdateService.upgradeRuleNodes();
systemDataLoaderService.loadSystemWidgets();
installScripts.loadSystemLwm2mResources();
installScripts.loadSystemImagesAndResources();
if (installScripts.isUpdateImages()) {
installScripts.updateImages();
}
databaseSchemaVersionService.updateSchemaVersion();
}
log.info("Upgrade finished successfully!");

8
application/src/main/java/org/thingsboard/server/service/entitiy/dashboard/DashboardSyncService.java

@ -17,6 +17,7 @@ package org.thingsboard.server.service.entitiy.dashboard;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
@ -30,6 +31,7 @@ import org.thingsboard.server.dao.widget.WidgetsBundleService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.install.ProjectInfo;
import org.thingsboard.server.service.sync.GitSyncService;
import org.thingsboard.server.service.sync.vc.GitRepository.FileType;
import org.thingsboard.server.service.sync.vc.GitRepository.RepoFile;
@ -51,10 +53,11 @@ public class DashboardSyncService {
private final ImageService imageService;
private final WidgetsBundleService widgetsBundleService;
private final PartitionService partitionService;
private final ProjectInfo projectInfo;
@Value("${transport.gateway.dashboard.sync.repository_url:}")
private String repoUrl;
@Value("${transport.gateway.dashboard.sync.branch:main}")
@Value("${transport.gateway.dashboard.sync.branch:}")
private String branch;
@Value("${transport.gateway.dashboard.sync.fetch_frequency:24}")
private int fetchFrequencyHours;
@ -64,6 +67,9 @@ public class DashboardSyncService {
@AfterStartUp(order = AfterStartUp.REGULAR_SERVICE)
public void init() throws Exception {
if (StringUtils.isBlank(branch)) {
branch = "release/" + projectInfo.getProjectVersion();
}
gitSyncService.registerSync(REPO_KEY, repoUrl, branch, TimeUnit.HOURS.toMillis(fetchFrequencyHours), this::update);
}

2
application/src/main/java/org/thingsboard/server/service/entitiy/entityview/DefaultTbEntityViewService.java

@ -348,7 +348,7 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen
.tenantId(entityView.getTenantId())
.entityId(entityId)
.entries(latestValues)
.onlyLatest(true)
.strategy(TimeseriesSaveRequest.Strategy.LATEST_AND_WS)
.callback(new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void tmp) {

42
application/src/main/java/org/thingsboard/server/service/install/DefaultDatabaseSchemaSettingsService.java

@ -17,9 +17,6 @@ package org.thingsboard.server.service.install;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.info.BuildProperties;
import org.springframework.context.annotation.Profile;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
@ -33,37 +30,29 @@ import java.util.List;
@RequiredArgsConstructor
public class DefaultDatabaseSchemaSettingsService implements DatabaseSchemaSettingsService {
private static final String CURRENT_PRODUCT = "CE";
// This list should include all versions which are compatible for the upgrade.
// The compatibility cycle usually breaks when we have some scripts written in Java that may not work after new release.
private static final List<String> SUPPORTED_VERSIONS_FOR_UPGRADE = List.of("3.8.0", "3.8.1");
private static final List<String> SUPPORTED_VERSIONS_FOR_UPGRADE = List.of("3.9.0");
private final BuildProperties buildProperties;
private final ProjectInfo projectInfo;
private final JdbcTemplate jdbcTemplate;
@Value("${install.upgrade.from_version:}")
private String upgradeFromVersion;
private String packageSchemaVersion;
private String schemaVersionFromDb;
@Override
public void validateSchemaSettings() {
//TODO: remove after release (3.9.0)
createProductIfNotExists();
String dbSchemaVersion = getDbSchemaVersion();
if (DefaultDataUpdateService.getEnv("SKIP_SCHEMA_VERSION_CHECK", false)) {
log.info("Skipped DB schema version check due to SKIP_SCHEMA_VERSION_CHECK set to 'true'.");
return;
}
String product = getProductFromDb();
if (!CURRENT_PRODUCT.equals(product)) {
onSchemaSettingsError(String.format("Upgrade failed: can't upgrade ThingsBoard %s database using ThingsBoard %s.", product, CURRENT_PRODUCT));
if (!projectInfo.getProductType().equals(product)) {
onSchemaSettingsError(String.format("Upgrade failed: can't upgrade ThingsBoard %s database using ThingsBoard %s.", product, projectInfo.getProductType()));
}
String dbSchemaVersion = getDbSchemaVersion();
if (dbSchemaVersion.equals(getPackageSchemaVersion())) {
onSchemaSettingsError("Upgrade failed: database already upgraded to current version. You can set SKIP_SCHEMA_VERSION_CHECK to 'true' if force re-upgrade needed.");
}
@ -75,19 +64,11 @@ public class DefaultDatabaseSchemaSettingsService implements DatabaseSchemaSetti
}
}
@Deprecated(forRemoval = true, since = "3.9.0")
private void createProductIfNotExists() {
boolean isCommunityEdition = jdbcTemplate.queryForList(
"SELECT 1 FROM information_schema.tables WHERE table_name = 'integration'", Integer.class).isEmpty();
String product = isCommunityEdition ? "CE" : "PE";
jdbcTemplate.execute("ALTER TABLE tb_schema_settings ADD COLUMN IF NOT EXISTS product varchar(2) DEFAULT '" + product + "'");
}
@Override
public void createSchemaSettings() {
Long schemaVersion = getSchemaVersionFromDb();
if (schemaVersion == null) {
jdbcTemplate.execute("INSERT INTO tb_schema_settings (schema_version, product) VALUES (" + getPackageSchemaVersionForDb() + ", '" + CURRENT_PRODUCT + "')");
jdbcTemplate.execute("INSERT INTO tb_schema_settings (schema_version, product) VALUES (" + getPackageSchemaVersionForDb() + ", '" + projectInfo.getProductType() + "')");
}
}
@ -99,7 +80,7 @@ public class DefaultDatabaseSchemaSettingsService implements DatabaseSchemaSetti
@Override
public String getPackageSchemaVersion() {
if (packageSchemaVersion == null) {
packageSchemaVersion = buildProperties.getVersion().replaceAll("[^\\d.]", "");
packageSchemaVersion = projectInfo.getProjectVersion();
}
return packageSchemaVersion;
}
@ -107,15 +88,6 @@ public class DefaultDatabaseSchemaSettingsService implements DatabaseSchemaSetti
@Override
public String getDbSchemaVersion() {
if (schemaVersionFromDb == null) {
if (StringUtils.isNotBlank(upgradeFromVersion)) {
/*
* TODO - Remove after the release of 3.9.0:
* This a temporary workaround due to the issue that schema version in the
* tb_schema_settings was set as 3.6.4 during the install of 3.8.1.
* */
schemaVersionFromDb = upgradeFromVersion;
return schemaVersionFromDb;
}
Long version = getSchemaVersionFromDb();
if (version == null) {
onSchemaSettingsError("Upgrade failed: the database schema version is missing.");

20
application/src/main/java/org/thingsboard/server/service/install/InstallScripts.java

@ -17,8 +17,6 @@ package org.thingsboard.server.service.install;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
@ -120,11 +118,6 @@ public class InstallScripts {
@Autowired
private ResourcesUpdater resourcesUpdater;
@Getter @Setter
private boolean updateImages = false;
@Getter @Setter
private boolean updateResourcesUsage = false;
@Autowired
private ImageService imageService;
@ -395,14 +388,6 @@ public class InstallScripts {
}
}
public void updateImages() {
resourcesUpdater.updateWidgetsBundlesImages();
resourcesUpdater.updateWidgetTypesImages();
resourcesUpdater.updateDashboardsImages();
resourcesUpdater.updateDeviceProfilesImages();
resourcesUpdater.updateAssetProfilesImages();
}
public void loadSystemImagesAndResources() {
log.info("Loading system images and resources...");
Stream<Path> dashboardsFiles = Stream.concat(listDir(Paths.get(getDataDir(), JSON_DIR, DEMO_DIR, DASHBOARDS_DIR)),
@ -512,11 +497,6 @@ public class InstallScripts {
}
}
public void updateResourcesUsage() {
resourcesUpdater.updateDashboardsResources();
resourcesUpdater.updateWidgetsResources();
}
private void loadSystemResources(Path dir, ResourceType resourceType, ResourceSubType resourceSubType) {
listDir(dir).forEach(resourceFile -> {
String resourceKey = resourceFile.getFileName().toString();

36
application/src/main/java/org/thingsboard/server/service/install/ProjectInfo.java

@ -0,0 +1,36 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.install;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.info.BuildProperties;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class ProjectInfo {
private final BuildProperties buildProperties;
public String getProjectVersion() {
return buildProperties.getVersion().replaceAll("[^\\d.]", "");
}
public String getProductType() {
return "CE";
}
}

4
application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java

@ -34,7 +34,6 @@ import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.sql.JpaExecutorService;
import org.thingsboard.server.service.component.ComponentDiscoveryService;
import org.thingsboard.server.service.component.RuleNodeClassInfo;
import org.thingsboard.server.service.install.InstallScripts;
import org.thingsboard.server.utils.TbNodeUpgradeUtils;
import java.util.ArrayList;
@ -58,9 +57,6 @@ public class DefaultDataUpdateService implements DataUpdateService {
@Autowired
JpaExecutorService jpaExecutorService;
@Autowired
private InstallScripts installScripts;
@Override
public void updateData() throws Exception {
log.info("Updating data ...");

27
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java

@ -118,10 +118,10 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
EntityId entityId = request.getEntityId();
checkInternalEntity(entityId);
boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null;
if (sysTenant || request.isOnlyLatest() || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) {
if (sysTenant || !request.getStrategy().saveTimeseries() || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) {
KvUtils.validate(request.getEntries(), valueNoXssValidation);
ListenableFuture<Integer> future = saveTimeseriesInternal(request);
if (!request.isOnlyLatest()) {
if (request.getStrategy().saveTimeseries()) {
FutureCallback<Integer> callback = getApiUsageCallback(tenantId, request.getCustomerId(), sysTenant, request.getCallback());
Futures.addCallback(future, callback, tsCallBackExecutor);
}
@ -134,19 +134,24 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
public ListenableFuture<Integer> saveTimeseriesInternal(TimeseriesSaveRequest request) {
TenantId tenantId = request.getTenantId();
EntityId entityId = request.getEntityId();
TimeseriesSaveRequest.Strategy strategy = request.getStrategy();
ListenableFuture<Integer> saveFuture;
if (request.isOnlyLatest()) {
saveFuture = Futures.transform(tsService.saveLatest(tenantId, entityId, request.getEntries()), result -> 0, MoreExecutors.directExecutor());
} else if (request.isSaveLatest()) {
if (strategy.saveTimeseries() && strategy.saveLatest()) {
saveFuture = tsService.save(tenantId, entityId, request.getEntries(), request.getTtl());
} else {
} else if (strategy.saveLatest()) {
saveFuture = Futures.transform(tsService.saveLatest(tenantId, entityId, request.getEntries()), result -> 0, MoreExecutors.directExecutor());
} else if (strategy.saveTimeseries()) {
saveFuture = tsService.saveWithoutLatest(tenantId, entityId, request.getEntries(), request.getTtl());
} else {
saveFuture = Futures.immediateFuture(0);
}
addMainCallback(saveFuture, request.getCallback());
addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries()));
if (request.isSaveLatest() && !request.isOnlyLatest()) {
addEntityViewCallback(tenantId, entityId, request.getEntries());
if (strategy.sendWsUpdate()) {
addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries()));
}
if (strategy.saveLatest()) {
copyLatestToEntityViews(tenantId, entityId, request.getEntries());
}
return saveFuture;
}
@ -201,7 +206,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
}
}
private void addEntityViewCallback(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts) {
private void copyLatestToEntityViews(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts) {
if (EntityType.DEVICE.equals(entityId.getEntityType()) || EntityType.ASSET.equals(entityId.getEntityType())) {
Futures.addCallback(this.tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId),
new FutureCallback<>() {
@ -232,7 +237,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
.tenantId(tenantId)
.entityId(entityView.getId())
.entries(entityViewLatest)
.onlyLatest(true)
.strategy(TimeseriesSaveRequest.Strategy.LATEST_AND_WS)
.callback(new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Void tmp) {}

2
application/src/main/resources/thingsboard.yml

@ -1271,7 +1271,7 @@ transport:
# URL of gateways dashboard repository
repository_url: "${TB_GATEWAY_DASHBOARD_SYNC_REPOSITORY_URL:https://github.com/thingsboard/gateway-management-extensions-dist.git}"
# Branch of gateways dashboard repository to work with
branch: "${TB_GATEWAY_DASHBOARD_SYNC_BRANCH:main}"
branch: "${TB_GATEWAY_DASHBOARD_SYNC_BRANCH:}"
# Fetch frequency in hours for gateways dashboard repository
fetch_frequency: "${TB_GATEWAY_DASHBOARD_SYNC_FETCH_FREQUENCY:24}"

107
application/src/test/java/org/thingsboard/server/service/entitiy/entityview/DefaultTbEntityViewServiceTest.java

@ -0,0 +1,107 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.entitiy.entityview;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.objects.AttributesEntityView;
import org.thingsboard.server.common.data.objects.TelemetryEntityView;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import java.util.List;
import java.util.UUID;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
@ExtendWith(MockitoExtension.class)
class DefaultTbEntityViewServiceTest {
final TenantId tenantId = TenantId.fromUUID(UUID.fromString("f09c8180-686c-11ef-9471-a71d33080e9c"));
final EntityId entityId = DeviceId.fromString("782aaab0-c7a8-11ef-a668-79582e785d5f");
@Mock
EntityViewService entityViewService;
@Mock
AttributesService attributesService;
@Mock
TelemetrySubscriptionService tsSubService;
@Mock
TimeseriesService tsService;
DefaultTbEntityViewService defaultTbEntityViewService;
@BeforeEach
void setup() {
defaultTbEntityViewService = new DefaultTbEntityViewService(entityViewService, attributesService, tsSubService, tsService);
}
@Test
void shouldNotSaveTimeseriesWhenCopyingLatestToEntityView() throws Exception {
// GIVEN
var entityView = new EntityView(new EntityViewId(UUID.randomUUID()));
entityView.setTenantId(tenantId);
entityView.setEntityId(entityId);
entityView.setKeys(new TelemetryEntityView(List.of("temperature"), new AttributesEntityView()));
List<TsKvEntry> latest = List.of(new BasicTsKvEntry(123L, new DoubleDataEntry("temperature", 22.3)));
given(tsService.findAll(eq(tenantId), eq(entityId), anyList())).willReturn(immediateFuture(latest));
// WHEN
defaultTbEntityViewService.updateEntityViewAttributes(tenantId, entityView, null, null);
// THEN
var captor = ArgumentCaptor.forClass(TimeseriesSaveRequest.class);
then(tsSubService).should().saveTimeseries(captor.capture());
var expectedCopyLatestRequest = TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.entityId(entityView.getId())
.entries(latest)
.ttl(0L)
.strategy(TimeseriesSaveRequest.Strategy.LATEST_AND_WS)
.build();
var actualCopyLatestRequest = captor.getValue();
assertThat(actualCopyLatestRequest)
.usingRecursiveComparison()
.ignoringFields("callback")
.isEqualTo(expectedCopyLatestRequest);
}
}

373
application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java

@ -0,0 +1,373 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.telemetry;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.ApiUsageState;
import org.thingsboard.server.common.data.ApiUsageStateValue;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.objects.AttributesEntityView;
import org.thingsboard.server.common.data.objects.TelemetryEntityView;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.stats.TbApiUsageReportClient;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService;
import org.thingsboard.server.service.subscription.SubscriptionManagerService;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.lenient;
@ExtendWith(MockitoExtension.class)
class DefaultTelemetrySubscriptionServiceTest {
final TenantId tenantId = TenantId.fromUUID(UUID.fromString("a00ec470-c6b4-11ef-8c88-63b5533fb5bc"));
final CustomerId customerId = new CustomerId(UUID.fromString("7bdc9750-c775-11ef-8e03-ff69ed8da327"));
final EntityId entityId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088");
final long sampleTtl = 10_000L;
final List<TsKvEntry> sampleTelemetry = List.of(
new BasicTsKvEntry(100L, new DoubleDataEntry("temperature", 65.2)),
new BasicTsKvEntry(100L, new DoubleDataEntry("humidity", 33.1))
);
ApiUsageState apiUsageState;
final TopicPartitionInfo tpi = TopicPartitionInfo.builder()
.tenantId(tenantId)
.myPartition(true)
.build();
final FutureCallback<Void> emptyCallback = new FutureCallback<>() {
@Override
public void onSuccess(Void result) {}
@Override
public void onFailure(@NonNull Throwable t) {}
};
ExecutorService wsCallBackExecutor;
ExecutorService tsCallBackExecutor;
@Mock
TbClusterService clusterService;
@Mock
PartitionService partitionService;
@Mock
SubscriptionManagerService subscriptionManagerService;
@Mock
AttributesService attrService;
@Mock
TimeseriesService tsService;
@Mock
TbEntityViewService tbEntityViewService;
@Mock
TbApiUsageReportClient apiUsageClient;
@Mock
TbApiUsageStateService apiUsageStateService;
DefaultTelemetrySubscriptionService telemetryService;
@BeforeEach
void setup() {
telemetryService = new DefaultTelemetrySubscriptionService(attrService, tsService, tbEntityViewService, apiUsageClient, apiUsageStateService);
ReflectionTestUtils.setField(telemetryService, "clusterService", clusterService);
ReflectionTestUtils.setField(telemetryService, "partitionService", partitionService);
ReflectionTestUtils.setField(telemetryService, "subscriptionManagerService", Optional.of(subscriptionManagerService));
wsCallBackExecutor = MoreExecutors.newDirectExecutorService();
ReflectionTestUtils.setField(telemetryService, "wsCallBackExecutor", wsCallBackExecutor);
tsCallBackExecutor = MoreExecutors.newDirectExecutorService();
ReflectionTestUtils.setField(telemetryService, "tsCallBackExecutor", tsCallBackExecutor);
apiUsageState = new ApiUsageState();
apiUsageState.setDbStorageState(ApiUsageStateValue.ENABLED);
lenient().when(apiUsageStateService.getApiUsageState(tenantId)).thenReturn(apiUsageState);
lenient().when(partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId)).thenReturn(tpi);
lenient().when(tsService.save(tenantId, entityId, sampleTelemetry, sampleTtl)).thenReturn(immediateFuture(sampleTelemetry.size()));
lenient().when(tsService.saveWithoutLatest(tenantId, entityId, sampleTelemetry, sampleTtl)).thenReturn(immediateFuture(sampleTelemetry.size()));
lenient().when(tsService.saveLatest(tenantId, entityId, sampleTelemetry)).thenReturn(immediateFuture(listOfNNumbers(sampleTelemetry.size())));
// mock no entity views
lenient().when(tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId)).thenReturn(immediateFuture(Collections.emptyList()));
// send partition change event so currentPartitions set is populated
telemetryService.onTbApplicationEvent(new PartitionChangeEvent(this, ServiceType.TB_CORE, Map.of(new QueueKey(ServiceType.TB_CORE), Set.of(tpi))));
}
@AfterEach
void cleanup() {
wsCallBackExecutor.shutdownNow();
tsCallBackExecutor.shutdownNow();
}
@Test
void shouldReportStorageDataPointsApiUsageWhenTimeSeriesIsSaved() {
// GIVEN
var request = TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.customerId(customerId)
.entityId(entityId)
.entries(sampleTelemetry)
.ttl(sampleTtl)
.strategy(new TimeseriesSaveRequest.Strategy(true, false, false))
.callback(emptyCallback)
.build();
// WHEN
telemetryService.saveTimeseries(request);
// THEN
then(apiUsageClient).should().report(tenantId, customerId, ApiUsageRecordKey.STORAGE_DP_COUNT, sampleTelemetry.size());
}
@Test
void shouldNotReportStorageDataPointsApiUsageWhenTimeSeriesIsNotSaved() {
// GIVEN
var request = TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.customerId(customerId)
.entityId(entityId)
.entries(sampleTelemetry)
.ttl(sampleTtl)
.strategy(TimeseriesSaveRequest.Strategy.LATEST_AND_WS)
.callback(emptyCallback)
.build();
// WHEN
telemetryService.saveTimeseries(request);
// THEN
then(apiUsageClient).shouldHaveNoInteractions();
}
@Test
void shouldThrowStorageDisabledWhenTimeSeriesIsSavedAndStorageIsDisabled() {
// GIVEN
apiUsageState.setDbStorageState(ApiUsageStateValue.DISABLED);
SettableFuture<Void> future = SettableFuture.create();
var request = TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.customerId(customerId)
.entityId(entityId)
.entries(sampleTelemetry)
.ttl(sampleTtl)
.strategy(TimeseriesSaveRequest.Strategy.SAVE_ALL)
.future(future)
.build();
// WHEN
telemetryService.saveTimeseries(request);
// THEN
assertThat(future).failsWithin(Duration.ofSeconds(5))
.withThrowableOfType(ExecutionException.class)
.withCauseInstanceOf(RuntimeException.class)
.withMessageContaining("DB storage writes are disabled due to API limits!");
}
@Test
void shouldNotThrowStorageDisabledWhenTimeSeriesIsNotSavedAndStorageIsDisabled() {
// GIVEN
apiUsageState.setDbStorageState(ApiUsageStateValue.DISABLED);
SettableFuture<Void> future = SettableFuture.create();
var request = TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.customerId(customerId)
.entityId(entityId)
.entries(sampleTelemetry)
.ttl(sampleTtl)
.strategy(TimeseriesSaveRequest.Strategy.LATEST_AND_WS)
.future(future)
.build();
// WHEN
telemetryService.saveTimeseries(request);
// THEN
assertThat(future).succeedsWithin(Duration.ofSeconds(5));
}
@Test
void shouldCopyLatestToEntityViewWhenLatestIsSavedOnMainEntity() {
// GIVEN
var entityView = new EntityView(new EntityViewId(UUID.randomUUID()));
entityView.setTenantId(tenantId);
entityView.setCustomerId(customerId);
entityView.setEntityId(entityId);
entityView.setKeys(new TelemetryEntityView(sampleTelemetry.stream().map(KvEntry::getKey).toList(), new AttributesEntityView()));
// mock that there is one entity view
given(tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId)).willReturn(immediateFuture(List.of(entityView)));
// mock that save latest call for entity view is successful
given(tsService.saveLatest(tenantId, entityView.getId(), sampleTelemetry)).willReturn(immediateFuture(listOfNNumbers(sampleTelemetry.size())));
// mock TPI for entity view
given(partitionService.resolve(ServiceType.TB_CORE, tenantId, entityView.getId())).willReturn(tpi);
var request = TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.customerId(customerId)
.entityId(entityId)
.entries(sampleTelemetry)
.ttl(sampleTtl)
.strategy(new TimeseriesSaveRequest.Strategy(false, true, false))
.callback(emptyCallback)
.build();
// WHEN
telemetryService.saveTimeseries(request);
// THEN
// should save latest to both the main entity and it's entity view
then(tsService).should().saveLatest(tenantId, entityId, sampleTelemetry);
then(tsService).should().saveLatest(tenantId, entityView.getId(), sampleTelemetry);
then(tsService).shouldHaveNoMoreInteractions();
// should send WS update only for entity view (WS update for the main entity is disabled in the save request)
then(subscriptionManagerService).should().onTimeSeriesUpdate(tenantId, entityView.getId(), sampleTelemetry, TbCallback.EMPTY);
then(subscriptionManagerService).shouldHaveNoMoreInteractions();
}
@Test
void shouldNotCopyLatestToEntityViewWhenLatestIsNotSavedOnMainEntity() {
// GIVEN
var request = TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.customerId(customerId)
.entityId(entityId)
.entries(sampleTelemetry)
.ttl(sampleTtl)
.strategy(new TimeseriesSaveRequest.Strategy(true, false, false))
.callback(emptyCallback)
.build();
// WHEN
telemetryService.saveTimeseries(request);
// THEN
// should save only time series for the main entity
then(tsService).should().saveWithoutLatest(tenantId, entityId, sampleTelemetry, sampleTtl);
then(tsService).shouldHaveNoMoreInteractions();
// should not send any WS updates
then(subscriptionManagerService).shouldHaveNoInteractions();
}
@ParameterizedTest
@MethodSource("booleanCombinations")
void shouldCallCorrectApiBasedOnBooleanFlagsInTheSaveRequest(boolean saveTimeseries, boolean saveLatest, boolean sendWsUpdate) {
// GIVEN
var request = TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.customerId(customerId)
.entityId(entityId)
.entries(sampleTelemetry)
.ttl(sampleTtl)
.strategy(new TimeseriesSaveRequest.Strategy(saveTimeseries, saveLatest, sendWsUpdate))
.callback(emptyCallback)
.build();
// WHEN
telemetryService.saveTimeseries(request);
// THEN
if (saveTimeseries && saveLatest) {
then(tsService).should().save(tenantId, entityId, sampleTelemetry, sampleTtl);
} else if (saveLatest) {
then(tsService).should().saveLatest(tenantId, entityId, sampleTelemetry);
} else if (saveTimeseries) {
then(tsService).should().saveWithoutLatest(tenantId, entityId, sampleTelemetry, sampleTtl);
}
then(tsService).shouldHaveNoMoreInteractions();
if (sendWsUpdate) {
then(subscriptionManagerService).should().onTimeSeriesUpdate(tenantId, entityId, sampleTelemetry, TbCallback.EMPTY);
} else {
then(subscriptionManagerService).shouldHaveNoInteractions();
}
}
private static Stream<Arguments> booleanCombinations() {
return Stream.of(
Arguments.of(true, true, true),
Arguments.of(true, true, false),
Arguments.of(true, false, true),
Arguments.of(true, false, false),
Arguments.of(false, true, true),
Arguments.of(false, true, false),
Arguments.of(false, false, true),
Arguments.of(false, false, false)
);
}
// used to emulate sequence numbers returned by save latest API
private static List<Long> listOfNNumbers(int N) {
return LongStream.range(0, N).boxed().toList();
}
}

2
common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/GitRepository.java

@ -450,7 +450,7 @@ public class GitRepository {
}
ObjectId result = git.getRepository().resolve(rev);
if (result == null) {
throw new IllegalArgumentException("Failed to parse git revision string: \"" + rev + "\"");
throw new IllegalArgumentException("Failed to resolve '" + rev + "'");
}
return result;
}

26
monitoring/src/main/resources/root_rule_chain.json

@ -21,9 +21,13 @@
"type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode",
"name": "Save Timeseries",
"singletonMode": false,
"configurationVersion": 0,
"configurationVersion": 1,
"configuration": {
"defaultTTL": 0
"defaultTTL": 0,
"useServerTs": false,
"persistenceSettings": {
"type": "ON_EVERY_MESSAGE"
}
},
"externalId": null
},
@ -273,9 +277,13 @@
"type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode",
"name": "Save Timeseries",
"singletonMode": false,
"configurationVersion": 0,
"configurationVersion": 1,
"configuration": {
"defaultTTL": 0
"defaultTTL": 0,
"useServerTs": false,
"persistenceSettings": {
"type": "ON_EVERY_MESSAGE"
}
},
"externalId": null
},
@ -307,11 +315,13 @@
"type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode",
"name": "Save Timeseries with TTL",
"singletonMode": false,
"configurationVersion": 0,
"configurationVersion": 1,
"configuration": {
"defaultTTL": 180,
"skipLatestPersistence": null,
"useServerTs": null
"useServerTs": false,
"persistenceSettings": {
"type": "ON_EVERY_MESSAGE"
}
},
"externalId": null
}
@ -415,4 +425,4 @@
],
"ruleChainConnections": null
}
}
}

8
msa/black-box-tests/src/test/resources/MqttRuleNodeTestMetadata.json

@ -36,11 +36,13 @@
"type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode",
"name": "save timeseries",
"singletonMode": false,
"configurationVersion": 0,
"configurationVersion": 1,
"configuration": {
"defaultTTL": 0,
"skipLatestPersistence": false,
"useServerTs": false
"useServerTs": false,
"persistenceSettings": {
"type": "ON_EVERY_MESSAGE"
}
},
"externalId": null
},

27
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java

@ -38,10 +38,18 @@ public class TimeseriesSaveRequest {
private final EntityId entityId;
private final List<TsKvEntry> entries;
private final long ttl;
private final boolean saveLatest;
private final boolean onlyLatest;
private final Strategy strategy;
private final FutureCallback<Void> callback;
public record Strategy(boolean saveTimeseries, boolean saveLatest, boolean sendWsUpdate) {
public static final Strategy SAVE_ALL = new Strategy(true, true, true);
public static final Strategy WS_ONLY = new Strategy(false, false, true);
public static final Strategy LATEST_AND_WS = new Strategy(false, true, true);
public static final Strategy SKIP_ALL = new Strategy(false, false, false);
}
public static Builder builder() {
return new Builder();
}
@ -53,9 +61,8 @@ public class TimeseriesSaveRequest {
private EntityId entityId;
private List<TsKvEntry> entries;
private long ttl;
private Strategy strategy = Strategy.SAVE_ALL;
private FutureCallback<Void> callback;
private boolean saveLatest = true;
private boolean onlyLatest;
Builder() {}
@ -92,14 +99,8 @@ public class TimeseriesSaveRequest {
return this;
}
public Builder saveLatest(boolean saveLatest) {
this.saveLatest = saveLatest;
return this;
}
public Builder onlyLatest(boolean onlyLatest) {
this.onlyLatest = onlyLatest;
this.saveLatest = true;
public Builder strategy(Strategy strategy) {
this.strategy = strategy;
return this;
}
@ -123,7 +124,7 @@ public class TimeseriesSaveRequest {
}
public TimeseriesSaveRequest build() {
return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, saveLatest, onlyLatest, callback);
return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, strategy, callback);
}
}

51
rule-engine/rule-engine-api/src/test/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequestTest.java

@ -0,0 +1,51 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.api;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
class TimeseriesSaveRequestTest {
@Test
void testDefaultSaveStrategyIsSaveAll() {
var request = TimeseriesSaveRequest.builder().build();
assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.SAVE_ALL);
}
@Test
void testSaveAllStrategy() {
assertThat(TimeseriesSaveRequest.Strategy.SAVE_ALL).isEqualTo(new TimeseriesSaveRequest.Strategy(true, true, true));
}
@Test
void testWsOnlyStrategy() {
assertThat(TimeseriesSaveRequest.Strategy.WS_ONLY).isEqualTo(new TimeseriesSaveRequest.Strategy(false, false, true));
}
@Test
void testLatestAndWsStrategy() {
assertThat(TimeseriesSaveRequest.Strategy.LATEST_AND_WS).isEqualTo(new TimeseriesSaveRequest.Strategy(false, true, true));
}
@Test
void testSkipAllStrategy() {
assertThat(TimeseriesSaveRequest.Strategy.SKIP_ALL).isEqualTo(new TimeseriesSaveRequest.Strategy(false, false, false));
}
}

132
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java

@ -15,8 +15,11 @@
*/
package org.thingsboard.rule.engine.telemetry;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.gson.JsonParser;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
@ -24,6 +27,7 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.telemetry.strategy.PersistenceStrategy;
import org.thingsboard.server.common.adaptor.JsonConverter;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.TenantProfile;
@ -32,13 +36,20 @@ import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.TbMsg;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings;
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Advanced;
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Deduplicate;
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.OnEveryMessage;
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.WebSocketsOnly;
import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_REQUEST;
@Slf4j
@ -46,20 +57,52 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_RE
type = ComponentType.ACTION,
name = "save time series",
configClazz = TbMsgTimeseriesNodeConfiguration.class,
nodeDescription = "Saves time series data",
nodeDetails = "Saves time series telemetry data based on configurable TTL parameter. Expects messages with 'POST_TELEMETRY_REQUEST' message type. " +
"Timestamp in milliseconds will be taken from metadata.ts, otherwise 'now' message timestamp will be applied. " +
"Allows stopping updating values for incoming keys in the latest ts_kv table if 'skipLatestPersistence' is set to true.\n " +
"<br/>" +
"Enable 'useServerTs' param to use the timestamp of the message processing instead of the timestamp from the message. " +
"Useful for all sorts of sequential processing if you merge messages from multiple sources (devices, assets, etc).\n" +
"<br/>" +
"In the case of sequential processing, the platform guarantees that the messages are processed in the order of their submission to the queue. " +
"However, the timestamp of the messages originated by multiple devices/servers may be unsynchronized long before they are pushed to the queue. " +
"The DB layer has certain optimizations to ignore the updates of the \"attributes\" and \"latest values\" tables if the new record has a timestamp that is older than the previous record. " +
"So, to make sure that all the messages will be processed correctly, one should enable this parameter for sequential message processing scenarios.",
nodeDescription = """
Saves time series data with a configurable TTL and according to configured persistence strategies.
""",
nodeDetails = """
Node performs three <strong>actions:</strong>
<ul>
<li><strong>Time series:</strong> save time series data to a <code>ts_kv</code> table in a DB.</li>
<li><strong>Latest values:</strong> save time series data to a <code>ts_kv_latest</code> table in a DB.</li>
<li><strong>WebSockets:</strong> notify WebSockets subscriptions about time series data updates.</li>
</ul>
For each <em>action</em>, three <strong>persistence strategies</strong> are available:
<ul>
<li><strong>On every message:</strong> perform the action for every message.</li>
<li><strong>Deduplicate:</strong> perform the action only for the first message from a particular originator within a configurable interval.</li>
<li><strong>Skip:</strong> never perform the action.</li>
</ul>
<strong>Persistence strategies</strong> are configured using <em>persistence settings</em>, which support two modes:
<ul>
<li><strong>Basic</strong>
<ul>
<li><strong>On every message:</strong> applies the "On every message" strategy to all actions.</li>
<li><strong>Deduplicate:</strong> applies the "Deduplicate" strategy (with a specified interval) to all actions.</li>
<li><strong>WebSockets only:</strong> applies the "Skip" strategy to Time series and Latest values, and the "On every message" strategy to WebSockets.</li>
</ul>
</li>
<li><strong>Advanced:</strong> configure each actions strategy independently.</li>
</ul>
By default, the timestamp is taken from <code>metadata.ts</code>. You can enable
<em>Use server timestamp</em> to always use the current server time instead. This is particularly
useful in sequential processing scenarios where messages may arrive with out-of-order timestamps from
multiple sources. Note that the DB layer may ignore older records for attributes and latest values,
so enabling <em>Use server timestamp</em> can ensure correct ordering.
<br><br>
The TTL is taken first from <code>metadata.TTL</code>. If absent, the node configurations default
TTL is used. If neither is set, the tenant profile default applies.
<br><br>
This node expects messages of type <code>POST_TELEMETRY_REQUEST</code>.
<br><br>
Output connections: <code>Success</code>, <code>Failure</code>.
""",
configDirective = "tbActionNodeTimeseriesConfig",
icon = "file_upload"
icon = "file_upload",
version = 1
)
public class TbMsgTimeseriesNode implements TbNode {
@ -67,15 +110,18 @@ public class TbMsgTimeseriesNode implements TbNode {
private TbContext ctx;
private long tenantProfileDefaultStorageTtl;
private PersistenceSettings persistenceSettings;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbMsgTimeseriesNodeConfiguration.class);
this.ctx = ctx;
ctx.addTenantProfileListener(this::onTenantProfileUpdate);
onTenantProfileUpdate(ctx.getTenantProfile());
persistenceSettings = config.getPersistenceSettings();
}
void onTenantProfileUpdate(TenantProfile tenantProfile) {
private void onTenantProfileUpdate(TenantProfile tenantProfile) {
DefaultTenantProfileConfiguration configuration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration();
tenantProfileDefaultStorageTtl = TimeUnit.DAYS.toSeconds(configuration.getDefaultStorageTtlDays());
}
@ -87,6 +133,15 @@ public class TbMsgTimeseriesNode implements TbNode {
return;
}
long ts = computeTs(msg, config.isUseServerTs());
TimeseriesSaveRequest.Strategy strategy = determineSaveStrategy(ts, msg.getOriginator().getId());
// short-circuit
if (!strategy.saveTimeseries() && !strategy.saveLatest() && !strategy.sendWsUpdate()) {
ctx.tellSuccess(msg);
return;
}
String src = msg.getData();
Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToTelemetry(JsonParser.parseString(src), ts);
if (tsKvMap.isEmpty()) {
@ -110,7 +165,7 @@ public class TbMsgTimeseriesNode implements TbNode {
.entityId(msg.getOriginator())
.entries(tsKvEntryList)
.ttl(ttl)
.saveLatest(!config.isSkipLatestPersistence())
.strategy(strategy)
.callback(new TelemetryNodeCallback(ctx, msg))
.build());
}
@ -119,9 +174,56 @@ public class TbMsgTimeseriesNode implements TbNode {
return ignoreMetadataTs ? System.currentTimeMillis() : msg.getMetaDataTs();
}
private TimeseriesSaveRequest.Strategy determineSaveStrategy(long ts, UUID originatorUuid) {
if (persistenceSettings instanceof OnEveryMessage) {
return TimeseriesSaveRequest.Strategy.SAVE_ALL;
}
if (persistenceSettings instanceof WebSocketsOnly) {
return TimeseriesSaveRequest.Strategy.WS_ONLY;
}
if (persistenceSettings instanceof Deduplicate deduplicate) {
boolean isFirstMsgInInterval = deduplicate.getPersistenceStrategy().shouldPersist(ts, originatorUuid);
return isFirstMsgInInterval ? TimeseriesSaveRequest.Strategy.SAVE_ALL : TimeseriesSaveRequest.Strategy.SKIP_ALL;
}
if (persistenceSettings instanceof Advanced advanced) {
return new TimeseriesSaveRequest.Strategy(
advanced.timeseries().shouldPersist(ts, originatorUuid),
advanced.latest().shouldPersist(ts, originatorUuid),
advanced.webSockets().shouldPersist(ts, originatorUuid)
);
}
// should not happen
throw new IllegalArgumentException("Unknown persistence settings type: " + persistenceSettings.getClass().getSimpleName());
}
@Override
public void destroy() {
ctx.removeListeners();
}
@Override
public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
boolean hasChanges = false;
switch (fromVersion) {
case 0:
hasChanges = true;
JsonNode skipLatestPersistence = oldConfiguration.get("skipLatestPersistence");
if (skipLatestPersistence != null && "true".equals(skipLatestPersistence.asText())) {
var skipLatestPersistenceSettings = new Advanced(
PersistenceStrategy.onEveryMessage(),
PersistenceStrategy.skip(),
PersistenceStrategy.onEveryMessage()
);
((ObjectNode) oldConfiguration).set("persistenceSettings", JacksonUtil.valueToTree(skipLatestPersistenceSettings));
} else {
((ObjectNode) oldConfiguration).set("persistenceSettings", JacksonUtil.valueToTree(new OnEveryMessage()));
}
((ObjectNode) oldConfiguration).remove("skipLatestPersistence");
break;
default:
break;
}
return new TbPair<>(hasChanges, oldConfiguration);
}
}

66
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java

@ -15,22 +15,84 @@
*/
package org.thingsboard.rule.engine.telemetry;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
import lombok.Getter;
import org.thingsboard.rule.engine.api.NodeConfiguration;
import org.thingsboard.rule.engine.telemetry.strategy.PersistenceStrategy;
import java.util.Objects;
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Advanced;
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Deduplicate;
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.OnEveryMessage;
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.WebSocketsOnly;
@Data
public class TbMsgTimeseriesNodeConfiguration implements NodeConfiguration<TbMsgTimeseriesNodeConfiguration> {
private long defaultTTL;
private boolean skipLatestPersistence;
private boolean useServerTs;
@NotNull
private PersistenceSettings persistenceSettings;
@Override
public TbMsgTimeseriesNodeConfiguration defaultConfiguration() {
TbMsgTimeseriesNodeConfiguration configuration = new TbMsgTimeseriesNodeConfiguration();
configuration.setDefaultTTL(0L);
configuration.setSkipLatestPersistence(false);
configuration.setUseServerTs(false);
configuration.setPersistenceSettings(new OnEveryMessage());
return configuration;
}
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "type"
)
@JsonSubTypes({
@JsonSubTypes.Type(value = OnEveryMessage.class, name = "ON_EVERY_MESSAGE"),
@JsonSubTypes.Type(value = WebSocketsOnly.class, name = "WEBSOCKETS_ONLY"),
@JsonSubTypes.Type(value = Deduplicate.class, name = "DEDUPLICATE"),
@JsonSubTypes.Type(value = Advanced.class, name = "ADVANCED")
})
sealed interface PersistenceSettings permits OnEveryMessage, Deduplicate, WebSocketsOnly, Advanced {
record OnEveryMessage() implements PersistenceSettings {}
record WebSocketsOnly() implements PersistenceSettings {}
@Getter
final class Deduplicate implements PersistenceSettings {
private final int deduplicationIntervalSecs;
@JsonIgnore
private final PersistenceStrategy persistenceStrategy;
@JsonCreator
Deduplicate(@JsonProperty("deduplicationIntervalSecs") int deduplicationIntervalSecs) {
this.deduplicationIntervalSecs = deduplicationIntervalSecs;
persistenceStrategy = PersistenceStrategy.deduplicate(deduplicationIntervalSecs);
}
}
record Advanced(PersistenceStrategy timeseries, PersistenceStrategy latest, PersistenceStrategy webSockets) implements PersistenceSettings {
public Advanced {
Objects.requireNonNull(timeseries);
Objects.requireNonNull(latest);
Objects.requireNonNull(webSockets);
}
}
}
}

89
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/DeduplicatePersistenceStrategy.java

@ -0,0 +1,89 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.telemetry.strategy;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
import java.time.Duration;
import java.util.Set;
import java.util.UUID;
final class DeduplicatePersistenceStrategy implements PersistenceStrategy {
private static final int MIN_DEDUPLICATION_INTERVAL_SECS = 1;
private static final int MAX_DEDUPLICATION_INTERVAL_SECS = (int) Duration.ofDays(1L).toSeconds();
private static final long MIN_INTERVAL_EXPIRY_MILLIS = Duration.ofMinutes(10L).toMillis();
private static final int INTERVAL_EXPIRY_FACTOR = 10;
private static final long MAX_INTERVAL_EXPIRY_MILLIS = Duration.ofDays(2L).toMillis();
private static final int MAX_TOTAL_INTERVALS_DURATION_SECS = (int) Duration.ofDays(2L).toSeconds();
private static final int MAX_NUMBER_OF_INTERVALS = 100;
private final long deduplicationIntervalMillis;
private final LoadingCache<Long, Set<UUID>> deduplicationCache;
@JsonCreator
public DeduplicatePersistenceStrategy(@JsonProperty("deduplicationIntervalSecs") int deduplicationIntervalSecs) {
if (deduplicationIntervalSecs < MIN_DEDUPLICATION_INTERVAL_SECS || deduplicationIntervalSecs > MAX_DEDUPLICATION_INTERVAL_SECS) {
throw new IllegalArgumentException("Deduplication interval must be at least " + MIN_DEDUPLICATION_INTERVAL_SECS + " second(s) " +
"and at most " + MAX_DEDUPLICATION_INTERVAL_SECS + " second(s), was " + deduplicationIntervalSecs + " second(s)");
}
deduplicationIntervalMillis = Duration.ofSeconds(deduplicationIntervalSecs).toMillis();
deduplicationCache = Caffeine.newBuilder()
.softValues()
.expireAfterAccess(calculateExpireAfterAccess(deduplicationIntervalSecs))
.maximumSize(calculateMaxNumberOfDeduplicationIntervals(deduplicationIntervalSecs))
.build(__ -> Sets.newConcurrentHashSet());
}
/**
* Calculates the expire-after-access duration. By default, we keep each deduplication interval
* alive for 10 iterations (interval duration × 10). However, we never let this drop below
* 10 minutes to ensure adequate retention for small intervals, nor exceed 48 hours to prevent
* storing stale data in memory.
*/
private static Duration calculateExpireAfterAccess(int deduplicationIntervalSecs) {
long desiredExpiryMillis = Duration.ofSeconds(deduplicationIntervalSecs).toMillis() * INTERVAL_EXPIRY_FACTOR;
return Duration.ofMillis(Longs.constrainToRange(desiredExpiryMillis, MIN_INTERVAL_EXPIRY_MILLIS, MAX_INTERVAL_EXPIRY_MILLIS));
}
/**
* Calculates the maximum number of deduplication intervals we will store in the cache.
* We limit retention to two days to avoid stale data and cap it at 100 intervals to manage memory usage.
*/
private static long calculateMaxNumberOfDeduplicationIntervals(int deduplicationIntervalSecs) {
int numberOfDeduplicationIntervals = MAX_TOTAL_INTERVALS_DURATION_SECS / deduplicationIntervalSecs;
return Math.min(numberOfDeduplicationIntervals, MAX_NUMBER_OF_INTERVALS);
}
@JsonProperty("deduplicationIntervalSecs")
public long getDeduplicationIntervalSecs() {
return Duration.ofMillis(deduplicationIntervalMillis).toSeconds();
}
@Override
public boolean shouldPersist(long ts, UUID originatorUuid) {
long intervalNumber = ts / deduplicationIntervalMillis;
return deduplicationCache.get(intervalNumber).add(originatorUuid);
}
}

38
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/OnEveryMessagePersistenceStrategy.java

@ -0,0 +1,38 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.telemetry.strategy;
import com.fasterxml.jackson.annotation.JsonCreator;
import java.util.UUID;
final class OnEveryMessagePersistenceStrategy implements PersistenceStrategy {
private static final OnEveryMessagePersistenceStrategy INSTANCE = new OnEveryMessagePersistenceStrategy();
private OnEveryMessagePersistenceStrategy() {}
@JsonCreator
public static OnEveryMessagePersistenceStrategy getInstance() {
return INSTANCE;
}
@Override
public boolean shouldPersist(long ts, UUID originatorUuid) {
return true;
}
}

49
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/PersistenceStrategy.java

@ -0,0 +1,49 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.telemetry.strategy;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import java.util.UUID;
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "type"
)
@JsonSubTypes({
@JsonSubTypes.Type(value = OnEveryMessagePersistenceStrategy.class, name = "ON_EVERY_MESSAGE"),
@JsonSubTypes.Type(value = DeduplicatePersistenceStrategy.class, name = "DEDUPLICATE"),
@JsonSubTypes.Type(value = SkipPersistenceStrategy.class, name = "SKIP")
})
public sealed interface PersistenceStrategy permits OnEveryMessagePersistenceStrategy, DeduplicatePersistenceStrategy, SkipPersistenceStrategy {
static PersistenceStrategy onEveryMessage() {
return OnEveryMessagePersistenceStrategy.getInstance();
}
static PersistenceStrategy deduplicate(int deduplicationIntervalSecs) {
return new DeduplicatePersistenceStrategy(deduplicationIntervalSecs);
}
static PersistenceStrategy skip() {
return SkipPersistenceStrategy.getInstance();
}
boolean shouldPersist(long ts, UUID originatorUuid);
}

38
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/SkipPersistenceStrategy.java

@ -0,0 +1,38 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.telemetry.strategy;
import com.fasterxml.jackson.annotation.JsonCreator;
import java.util.UUID;
final class SkipPersistenceStrategy implements PersistenceStrategy {
private static final SkipPersistenceStrategy INSTANCE = new SkipPersistenceStrategy();
private SkipPersistenceStrategy() {}
@JsonCreator
public static SkipPersistenceStrategy getInstance() {
return INSTANCE;
}
@Override
public boolean shouldPersist(long ts, UUID originatorUuid) {
return false;
}
}

4
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java

@ -533,7 +533,7 @@ public class TbMathNodeTest {
verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture());
verify(telemetryService, times(1)).saveTimeseries(assertArg(request -> {
assertThat(request.getEntries()).size().isOne();
assertThat(request.isSaveLatest()).isTrue();
assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.SAVE_ALL);
}));
TbMsg resultMsg = msgCaptor.getValue();
@ -569,7 +569,7 @@ public class TbMathNodeTest {
verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture());
verify(telemetryService, times(1)).saveTimeseries(assertArg(request -> {
assertThat(request.getEntries()).size().isOne();
assertThat(request.isSaveLatest()).isTrue();
assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.SAVE_ALL);
}));
TbMsg resultMsg = msgCaptor.getValue();

449
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java

@ -25,20 +25,23 @@ import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.ThrowingConsumer;
import org.mockito.junit.jupiter.MockitoExtension;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.rule.engine.telemetry.strategy.PersistenceStrategy;
import org.thingsboard.server.common.adaptor.JsonConverter;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.msg.TbMsgType;
@ -46,6 +49,8 @@ import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileCon
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.service.ConstraintValidator;
import java.util.ArrayList;
import java.util.List;
@ -55,24 +60,30 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.assertArg;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class TbMsgTimeseriesNodeTest {
public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("c8f34868-603a-4433-876a-7d356e5cf377"));
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("e5095e9a-04f4-44c9-b443-1cf1b97d3384"));
private final TenantProfileId TENANT_PROFILE_ID = new TenantProfileId(UUID.fromString("ab78dd78-83d0-43fa-869f-d42ec9ed1744"));
private TenantProfile tenantProfile;
private TbMsgTimeseriesNode node;
private TbMsgTimeseriesNodeConfiguration config;
private long tenantProfileDefaultStorageTtl;
@Mock
private TbContext ctxMock;
@ -81,29 +92,68 @@ public class TbMsgTimeseriesNodeTest {
@BeforeEach
public void setUp() throws TbNodeException {
node = new TbMsgTimeseriesNode();
tenantProfile = new TenantProfile(new TenantProfileId(UUID.fromString("ab78dd78-83d0-43fa-869f-d42ec9ed1744")));
var tenantProfileConfiguration = new DefaultTenantProfileConfiguration();
tenantProfileConfiguration.setDefaultStorageTtlDays(5);
var tenantProfileData = new TenantProfileData();
tenantProfileData.setConfiguration(tenantProfileConfiguration);
tenantProfile.setProfileData(tenantProfileData);
lenient().when(ctxMock.getTenantProfile()).thenReturn(tenantProfile);
lenient().when(ctxMock.getTenantId()).thenReturn(TENANT_ID);
lenient().when(ctxMock.getTelemetryService()).thenReturn(telemetryServiceMock);
node = spy(new TbMsgTimeseriesNode());
config = new TbMsgTimeseriesNodeConfiguration().defaultConfiguration();
}
@Test
public void verifyDefaultConfig() {
assertThat(config.getDefaultTTL()).isEqualTo(0L);
assertThat(config.isSkipLatestPersistence()).isFalse();
assertThat(config.getPersistenceSettings()).isInstanceOf(TbMsgTimeseriesNodeConfiguration.PersistenceSettings.OnEveryMessage.class);
assertThat(config.isUseServerTs()).isFalse();
}
@Test
public void whenInit_thenShouldAddTenantProfileListener() throws Exception {
// GIVEN-WHEN
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
// THEN
then(ctxMock).should().addTenantProfileListener(any());
}
@Test
public void givenPersistenceSettingsAreNull_whenValidatingConstraints_thenThrowsException() {
// GIVEN
config.setPersistenceSettings(null);
// WHEN-THEN
assertThatThrownBy(() -> ConstraintValidator.validateFields(config))
.isInstanceOf(DataValidationException.class)
.hasMessage("Validation error: persistenceSettings must not be null");
}
@ParameterizedTest
@EnumSource(TbMsgType.class)
public void givenMsgTypeAndEmptyMsgData_whenOnMsg_thenVerifyFailureMsg(TbMsgType msgType) throws TbNodeException {
init();
// GIVEN
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
TbMsg msg = TbMsg.newMsg()
.type(msgType)
.originator(DEVICE_ID)
.copyMetaData(TbMsgMetaData.EMPTY)
.data(TbMsg.EMPTY_JSON_ARRAY)
.build();
// WHEN
node.onMsg(ctxMock, msg);
// THEN
then(ctxMock).should().addTenantProfileListener(any());
then(ctxMock).should().getTenantProfile();
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
verify(ctxMock).tellFailure(eq(msg), throwableCaptor.capture());
@ -117,9 +167,11 @@ public class TbMsgTimeseriesNodeTest {
}
@Test
public void givenTtlFromConfigIsZeroAndUseServiceTsIsTrue_whenOnMsg_thenSaveTimeseriesUsingTenantProfileDefaultTtl() throws TbNodeException {
public void givenTtlFromConfigIsZeroAndUseServerTsIsTrue_whenOnMsg_thenSaveTimeseriesUsingTenantProfileDefaultTtl() throws TbNodeException {
// GIVEN
config.setUseServerTs(true);
init();
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
String data = """
{
@ -134,24 +186,29 @@ public class TbMsgTimeseriesNodeTest {
.data(data)
.build();
when(ctxMock.getTelemetryService()).thenReturn(telemetryServiceMock);
when(ctxMock.getTenantId()).thenReturn(TENANT_ID);
doAnswer(invocation -> {
TimeseriesSaveRequest request = invocation.getArgument(0);
request.getCallback().onSuccess(null);
return null;
}).when(telemetryServiceMock).saveTimeseries(any(TimeseriesSaveRequest.class));
// WHEN
node.onMsg(ctxMock, msg);
// THEN
then(ctxMock).should().getTenantId();
then(ctxMock).should().getTelemetryService();
then(ctxMock).should().addTenantProfileListener(any());
then(ctxMock).should().getTenantProfile();
List<TsKvEntry> expectedList = getTsKvEntriesListWithTs(data, System.currentTimeMillis());
verify(telemetryServiceMock).saveTimeseries(assertArg(request -> {
assertThat(request.getTenantId()).isEqualTo(TENANT_ID);
assertThat(request.getCustomerId()).isNull();
assertThat(request.getEntityId()).isEqualTo(DEVICE_ID);
assertThat(request.getEntries()).usingRecursiveFieldByFieldElementComparatorIgnoringFields("ts").containsExactlyElementsOf(expectedList);
assertThat(request.getTtl()).isEqualTo(tenantProfileDefaultStorageTtl);
assertThat(request.isSaveLatest()).isTrue();
assertThat(request.getTtl()).isEqualTo(extractTtlAsSeconds(tenantProfile));
assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.SAVE_ALL);
assertThat(request.getCallback()).isInstanceOf(TelemetryNodeCallback.class);
}));
verify(ctxMock).tellSuccess(msg);
@ -159,11 +216,17 @@ public class TbMsgTimeseriesNodeTest {
}
@Test
public void givenSkipLatestPersistenceIsTrueAndTtlFromConfig_whenOnMsg_thenSaveTimeseriesUsingTtlFromConfig() throws TbNodeException {
long ttlFromConfig = 5L;
config.setDefaultTTL(ttlFromConfig);
config.setSkipLatestPersistence(true);
init();
public void givenSkipLatestPersistenceSettingsAndTtlFromConfig_whenOnMsg_thenSaveTimeseriesUsingTtlFromConfig() throws TbNodeException {
// GIVEN
config.setDefaultTTL(10L);
var timeseriesStrategy = PersistenceStrategy.onEveryMessage();
var latestStrategy = PersistenceStrategy.skip();
var webSockets = PersistenceStrategy.onEveryMessage();
var persistenceSettings = new TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Advanced(timeseriesStrategy, latestStrategy, webSockets);
config.setPersistenceSettings(persistenceSettings);
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
String data = """
{
@ -180,24 +243,29 @@ public class TbMsgTimeseriesNodeTest {
.data(data)
.build();
when(ctxMock.getTelemetryService()).thenReturn(telemetryServiceMock);
when(ctxMock.getTenantId()).thenReturn(TENANT_ID);
doAnswer(invocation -> {
TimeseriesSaveRequest request = invocation.getArgument(0);
request.getCallback().onSuccess(null);
return null;
}).when(telemetryServiceMock).saveTimeseries(any(TimeseriesSaveRequest.class));
// WHEN
node.onMsg(ctxMock, msg);
// THEN
then(ctxMock).should().getTenantId();
then(ctxMock).should().getTelemetryService();
then(ctxMock).should().addTenantProfileListener(any());
then(ctxMock).should().getTenantProfile();
List<TsKvEntry> expectedList = getTsKvEntriesListWithTs(data, ts);
verify(telemetryServiceMock).saveTimeseries(assertArg(request -> {
assertThat(request.getTenantId()).isEqualTo(TENANT_ID);
assertThat(request.getCustomerId()).isNull();
assertThat(request.getEntityId()).isEqualTo(DEVICE_ID);
assertThat(request.getEntries()).containsExactlyElementsOf(expectedList);
assertThat(request.getTtl()).isEqualTo(ttlFromConfig);
assertThat(request.isSaveLatest()).isFalse();
assertThat(request.getTtl()).isEqualTo(config.getDefaultTTL());
assertThat(request.getStrategy()).isEqualTo(new TimeseriesSaveRequest.Strategy(true, false, true));
assertThat(request.getCallback()).isInstanceOf(TelemetryNodeCallback.class);
}));
verify(ctxMock).tellSuccess(msg);
@ -207,11 +275,10 @@ public class TbMsgTimeseriesNodeTest {
@ParameterizedTest
@MethodSource
public void givenTtlFromConfigAndTtlFromMd_whenOnMsg_thenVerifyTtl(String ttlFromMd, long ttlFromConfig, long expectedTtl) throws TbNodeException {
// GIVEN
config.setDefaultTTL(ttlFromConfig);
init();
when(ctxMock.getTelemetryService()).thenReturn(telemetryServiceMock);
when(ctxMock.getTenantId()).thenReturn(TENANT_ID);
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
String data = """
{
@ -227,14 +294,17 @@ public class TbMsgTimeseriesNodeTest {
.copyMetaData(metadata)
.data(data)
.build();
// WHEN
node.onMsg(ctxMock, msg);
// THEN
verify(telemetryServiceMock).saveTimeseries(assertArg(request -> {
assertThat(request.getTenantId()).isEqualTo(TENANT_ID);
assertThat(request.getCustomerId()).isNull();
assertThat(request.getEntityId()).isEqualTo(DEVICE_ID);
assertThat(request.getTtl()).isEqualTo(expectedTtl);
assertThat(request.isSaveLatest()).isTrue();
assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.SAVE_ALL);
assertThat(request.getCallback()).isInstanceOf(TelemetryNodeCallback.class);
}));
}
@ -251,26 +321,6 @@ public class TbMsgTimeseriesNodeTest {
);
}
private void init() throws TbNodeException {
var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
var tenantProfile = getTenantProfile();
when(ctxMock.getTenantProfile()).thenReturn(tenantProfile);
tenantProfile.getProfileConfiguration().ifPresent(profileConfiguration ->
tenantProfileDefaultStorageTtl = TimeUnit.DAYS.toSeconds(profileConfiguration.getDefaultStorageTtlDays()));
node.init(ctxMock, configuration);
verify(ctxMock).addTenantProfileListener(any());
}
private TenantProfile getTenantProfile() {
var tenantProfile = new TenantProfile(TENANT_PROFILE_ID);
var tenantProfileData = new TenantProfileData();
var tenantProfileConfiguration = new DefaultTenantProfileConfiguration();
tenantProfileConfiguration.setDefaultStorageTtlDays(5);
tenantProfileData.setConfiguration(tenantProfileConfiguration);
tenantProfile.setProfileData(tenantProfileData);
return tenantProfile;
}
private static List<TsKvEntry> getTsKvEntriesListWithTs(String data, long ts) {
Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToTelemetry(JsonParser.parseString(data), ts);
List<TsKvEntry> expectedList = new ArrayList<>();
@ -282,4 +332,309 @@ public class TbMsgTimeseriesNodeTest {
return expectedList;
}
@Test
public void givenOnEveryMessagePersistenceSettingsAndSameMessageTwoTimes_whenOnMsg_thenPersistSameMessageTwoTimes() throws TbNodeException {
// GIVEN
config.setPersistenceSettings(new TbMsgTimeseriesNodeConfiguration.PersistenceSettings.OnEveryMessage());
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
var msg = TbMsg.newMsg()
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.originator(DEVICE_ID)
.data(JacksonUtil.newObjectNode().put("temperature", 22.3).toString())
.metaData(new TbMsgMetaData(Map.of("ts", "123")))
.build();
// WHEN-THEN
var expectedSaveRequest = TimeseriesSaveRequest.builder()
.tenantId(TENANT_ID)
.customerId(msg.getCustomerId())
.entityId(msg.getOriginator())
.entry(new BasicTsKvEntry(123L, new DoubleDataEntry("temperature", 22.3)))
.ttl(extractTtlAsSeconds(tenantProfile))
.strategy(TimeseriesSaveRequest.Strategy.SAVE_ALL)
.build();
node.onMsg(ctxMock, msg);
then(telemetryServiceMock).should(times(1)).saveTimeseries(assertArg(
actualSaveRequest -> assertThat(actualSaveRequest).usingRecursiveComparison().ignoringFields("callback").isEqualTo(expectedSaveRequest)
));
node.onMsg(ctxMock, msg);
then(telemetryServiceMock).should(times(2)).saveTimeseries(assertArg(
actualSaveRequest -> assertThat(actualSaveRequest).usingRecursiveComparison().ignoringFields("callback").isEqualTo(expectedSaveRequest)
));
}
@Test
public void givenDeduplicatePersistenceSettingsAndSameMessageTwoTimes_whenOnMsg_thenPersistThisMessageOnlyFirstTime() throws TbNodeException {
// GIVEN
config.setPersistenceSettings(new TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Deduplicate(10));
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
var msg = TbMsg.newMsg()
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.originator(DEVICE_ID)
.data(JacksonUtil.newObjectNode().put("temperature", 22.3).toString())
.metaData(new TbMsgMetaData(Map.of("ts", "123")))
.build();
// WHEN-THEN
var expectedSaveRequest = TimeseriesSaveRequest.builder()
.tenantId(TENANT_ID)
.customerId(msg.getCustomerId())
.entityId(msg.getOriginator())
.entry(new BasicTsKvEntry(123L, new DoubleDataEntry("temperature", 22.3)))
.ttl(extractTtlAsSeconds(tenantProfile))
.strategy(TimeseriesSaveRequest.Strategy.SAVE_ALL)
.build();
node.onMsg(ctxMock, msg);
then(telemetryServiceMock).should().saveTimeseries(assertArg(
actualSaveRequest -> assertThat(actualSaveRequest).usingRecursiveComparison().ignoringFields("callback").isEqualTo(expectedSaveRequest)
));
clearInvocations(telemetryServiceMock, ctxMock);
node.onMsg(ctxMock, msg);
then(telemetryServiceMock).should(never()).saveTimeseries(any());
}
@Test
public void givenWebsocketsOnlyPersistenceSettingsAndSameMessageTwoTimes_whenOnMsg_thenSendsOnlyWsUpdateTwoTimes() throws TbNodeException {
// GIVEN
config.setPersistenceSettings(new TbMsgTimeseriesNodeConfiguration.PersistenceSettings.WebSocketsOnly());
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
var msg = TbMsg.newMsg()
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.originator(DEVICE_ID)
.data(JacksonUtil.newObjectNode().put("temperature", 22.3).toString())
.metaData(new TbMsgMetaData(Map.of("ts", "123")))
.build();
// WHEN-THEN
var expectedSaveRequest = TimeseriesSaveRequest.builder()
.tenantId(TENANT_ID)
.customerId(msg.getCustomerId())
.entityId(msg.getOriginator())
.entry(new BasicTsKvEntry(123L, new DoubleDataEntry("temperature", 22.3)))
.ttl(extractTtlAsSeconds(tenantProfile))
.strategy(TimeseriesSaveRequest.Strategy.WS_ONLY)
.build();
node.onMsg(ctxMock, msg);
then(telemetryServiceMock).should(times(1)).saveTimeseries(assertArg(
actualSaveRequest -> assertThat(actualSaveRequest).usingRecursiveComparison().ignoringFields("callback").isEqualTo(expectedSaveRequest)
));
node.onMsg(ctxMock, msg);
then(telemetryServiceMock).should(times(2)).saveTimeseries(assertArg(
actualSaveRequest -> assertThat(actualSaveRequest).usingRecursiveComparison().ignoringFields("callback").isEqualTo(expectedSaveRequest)
));
}
@Test
public void givenAdvancedPersistenceSettingsWithOnEveryMessageStrategiesForAllActionsAndSameMessageTwoTimes_whenOnMsg_thenPersistSameMessageTwoTimes() throws TbNodeException {
// GIVEN
config.setPersistenceSettings(new TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Advanced(
PersistenceStrategy.onEveryMessage(),
PersistenceStrategy.onEveryMessage(),
PersistenceStrategy.onEveryMessage()
));
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
var msg = TbMsg.newMsg()
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.originator(DEVICE_ID)
.data(JacksonUtil.newObjectNode().put("temperature", 22.3).toString())
.metaData(new TbMsgMetaData(Map.of("ts", "123")))
.build();
// WHEN-THEN
var expectedSaveRequest = TimeseriesSaveRequest.builder()
.tenantId(TENANT_ID)
.customerId(msg.getCustomerId())
.entityId(msg.getOriginator())
.entry(new BasicTsKvEntry(123L, new DoubleDataEntry("temperature", 22.3)))
.ttl(extractTtlAsSeconds(tenantProfile))
.strategy(TimeseriesSaveRequest.Strategy.SAVE_ALL)
.build();
node.onMsg(ctxMock, msg);
then(telemetryServiceMock).should(times(1)).saveTimeseries(assertArg(
actualSaveRequest -> assertThat(actualSaveRequest).usingRecursiveComparison().ignoringFields("callback").isEqualTo(expectedSaveRequest)
));
node.onMsg(ctxMock, msg);
then(telemetryServiceMock).should(times(2)).saveTimeseries(assertArg(
actualSaveRequest -> assertThat(actualSaveRequest).usingRecursiveComparison().ignoringFields("callback").isEqualTo(expectedSaveRequest)
));
}
@Test
public void givenAdvancedPersistenceSettingsWithDifferentDeduplicateStrategyForEachAction_whenOnMsg_thenEvaluatesStrategiesForEachActionsIndependently() throws TbNodeException {
// GIVEN
config.setPersistenceSettings(new TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Advanced(
PersistenceStrategy.deduplicate(1),
PersistenceStrategy.deduplicate(2),
PersistenceStrategy.deduplicate(3)
));
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
long ts1 = 500L;
long ts2 = 1500L;
long ts3 = 2500L;
// WHEN-THEN
node.onMsg(ctxMock, TbMsg.newMsg()
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.originator(DEVICE_ID)
.data(JacksonUtil.newObjectNode().put("temperature", 22.3).toString())
.metaData(new TbMsgMetaData(Map.of("ts", Long.toString(ts1))))
.build());
then(telemetryServiceMock).should().saveTimeseries(assertArg(
actualSaveRequest -> assertThat(actualSaveRequest.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.SAVE_ALL)
));
clearInvocations(telemetryServiceMock);
node.onMsg(ctxMock, TbMsg.newMsg()
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.originator(DEVICE_ID)
.data(JacksonUtil.newObjectNode().put("temperature", 22.3).toString())
.metaData(new TbMsgMetaData(Map.of("ts", Long.toString(ts2))))
.build());
then(telemetryServiceMock).should().saveTimeseries(assertArg(
actualSaveRequest -> assertThat(actualSaveRequest.getStrategy()).isEqualTo(new TimeseriesSaveRequest.Strategy(true, false, false))
));
clearInvocations(telemetryServiceMock);
node.onMsg(ctxMock, TbMsg.newMsg()
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.originator(DEVICE_ID)
.data(JacksonUtil.newObjectNode().put("temperature", 22.3).toString())
.metaData(new TbMsgMetaData(Map.of("ts", Long.toString(ts3))))
.build());
then(telemetryServiceMock).should().saveTimeseries(assertArg(
actualSaveRequest -> assertThat(actualSaveRequest.getStrategy()).isEqualTo(new TimeseriesSaveRequest.Strategy(true, true, false))
));
}
@Test
public void givenAdvancedPersistenceSettingsWithSkipStrategiesForAllActionsAndSameMessageTwoTimes_whenOnMsg_thenSkipsSameMessageTwoTimes() throws TbNodeException {
// GIVEN
config.setPersistenceSettings(new TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Advanced(
PersistenceStrategy.skip(),
PersistenceStrategy.skip(),
PersistenceStrategy.skip()
));
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
var msg = TbMsg.newMsg()
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.originator(DEVICE_ID)
.data(JacksonUtil.newObjectNode().put("temperature", 22.3).toString())
.metaData(new TbMsgMetaData(Map.of("ts", "123")))
.build();
// WHEN-THEN
node.onMsg(ctxMock, msg);
then(telemetryServiceMock).should(never()).saveTimeseries(any());
then(ctxMock).should(times(1)).tellSuccess(msg);
node.onMsg(ctxMock, msg);
then(telemetryServiceMock).should(never()).saveTimeseries(any());
then(ctxMock).should(times(2)).tellSuccess(msg);
}
private static long extractTtlAsSeconds(TenantProfile tenantProfile) {
return TimeUnit.DAYS.toSeconds(tenantProfile.getDefaultProfileConfiguration().getDefaultStorageTtlDays());
}
@Override
protected TbNode getTestNode() {
return node;
}
private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
return Stream.of(
Arguments.of(0, """
{
"defaultTTL": 0,
"useServerTs": false,
"skipLatestPersistence": false
}""",
true,
"""
{
"defaultTTL": 0,
"useServerTs": false,
"persistenceSettings": {
"type": "ON_EVERY_MESSAGE"
}
}"""),
Arguments.of(0, """
{
"defaultTTL": 0,
"useServerTs": false
}""",
true,
"""
{
"defaultTTL": 0,
"useServerTs": false,
"persistenceSettings": {
"type": "ON_EVERY_MESSAGE"
}
}"""),
Arguments.of(0, """
{
"defaultTTL": 0,
"useServerTs": false,
"skipLatestPersistence": null
}""",
true,
"""
{
"defaultTTL": 0,
"useServerTs": false,
"persistenceSettings": {
"type": "ON_EVERY_MESSAGE"
}
}"""),
Arguments.of(0, """
{
"defaultTTL": 0,
"useServerTs": false,
"skipLatestPersistence": true
}""",
true,
"""
{
"defaultTTL": 0,
"useServerTs": false,
"persistenceSettings": {
"type": "ADVANCED",
"timeseries": {
"type": "ON_EVERY_MESSAGE"
},
"latest": {
"type": "SKIP"
},
"webSockets": {
"type": "ON_EVERY_MESSAGE"
}
}
}""")
);
}
}

260
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/strategy/DeduplicatePersistenceStrategyTest.java

@ -0,0 +1,260 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.telemetry.strategy;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.Policy;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.test.util.ReflectionTestUtils;
import java.time.Duration;
import java.util.Set;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
class DeduplicatePersistenceStrategyTest {
final int deduplicationIntervalSecs = 10;
DeduplicatePersistenceStrategy strategy;
@BeforeEach
void setup() {
strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs);
}
@Test
void shouldThrowWhenDeduplicationIntervalIsLessThanOneSecond() {
assertThatThrownBy(() -> new DeduplicatePersistenceStrategy(0))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Deduplication interval must be at least 1 second(s) and at most 86400 second(s), was 0 second(s)");
}
@Test
void shouldThrowWhenDeduplicationIntervalIsMoreThan24Hours() {
assertThatThrownBy(() -> new DeduplicatePersistenceStrategy(86401))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Deduplication interval must be at least 1 second(s) and at most 86400 second(s), was 86401 second(s)");
}
@Test
void shouldUseAtLeastTenMinutesForExpireAfterAccess() {
// GIVEN
int deduplicationIntervalSecs = 1; // min deduplication interval duration
// WHEN
strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs);
// THEN
var deduplicationCache = (LoadingCache<Long, Set<UUID>>) ReflectionTestUtils.getField(strategy, "deduplicationCache");
assertThat(deduplicationCache.policy().expireAfterAccess())
.isPresent()
.map(Policy.FixedExpiration::getExpiresAfter)
.hasValue(Duration.ofMinutes(10L));
}
@Test
void shouldCalculateExpireAfterAccessAsIntervalDurationMultipliedByTen() {
// GIVEN
int deduplicationIntervalSecs = (int) Duration.ofHours(1L).toSeconds(); // max deduplication interval duration
// WHEN
strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs);
// THEN
var deduplicationCache = (LoadingCache<Long, Set<UUID>>) ReflectionTestUtils.getField(strategy, "deduplicationCache");
assertThat(deduplicationCache.policy().expireAfterAccess())
.isPresent()
.map(Policy.FixedExpiration::getExpiresAfter)
.hasValue(Duration.ofHours(10L));
}
@Test
void shouldUseAtMostTwoDaysForExpireAfterAccess() {
// GIVEN
int deduplicationIntervalSecs = (int) Duration.ofDays(1L).toSeconds(); // max deduplication interval duration
// WHEN
strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs);
// THEN
var deduplicationCache = (LoadingCache<Long, Set<UUID>>) ReflectionTestUtils.getField(strategy, "deduplicationCache");
assertThat(deduplicationCache.policy().expireAfterAccess())
.isPresent()
.map(Policy.FixedExpiration::getExpiresAfter)
.hasValue(Duration.ofDays(2L));
}
@Test
void shouldNotAllowMoreThan100DeduplicationIntervals() {
// GIVEN
int deduplicationIntervalSecs = 1; // min deduplication interval duration
// WHEN
strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs);
// THEN
var deduplicationCache = (LoadingCache<Long, Set<UUID>>) ReflectionTestUtils.getField(strategy, "deduplicationCache");
assertThat(deduplicationCache.policy().eviction())
.isPresent()
.map(Policy.Eviction::getMaximum)
.hasValue(100L);
}
@Test
void shouldCalculateMaxIntervalsAsTwoDaysDividedByIntervalDuration() {
// GIVEN
int deduplicationIntervalSecs = (int) Duration.ofHours(1L).toSeconds();
// WHEN
strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs);
// THEN
var deduplicationCache = (LoadingCache<Long, Set<UUID>>) ReflectionTestUtils.getField(strategy, "deduplicationCache");
assertThat(deduplicationCache.policy().eviction())
.isPresent()
.map(Policy.Eviction::getMaximum)
.hasValue(48L);
}
@Test
void shouldKeepAtLeastTwoDeduplicationIntervals() {
// GIVEN
int deduplicationIntervalSecs = (int) Duration.ofDays(1L).toSeconds(); // max deduplication interval duration
// WHEN
strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs);
// THEN
var deduplicationCache = (LoadingCache<Long, Set<UUID>>) ReflectionTestUtils.getField(strategy, "deduplicationCache");
assertThat(deduplicationCache.policy().eviction())
.isPresent()
.map(Policy.Eviction::getMaximum)
.hasValue(2L);
}
@Test
void shouldReturnTrueForFirstCallInInterval() {
long ts = 1_000_000L;
UUID originator = UUID.randomUUID();
assertThat(strategy.shouldPersist(ts, originator)).isTrue();
}
@Test
void shouldReturnFalseForSubsequentCallsInInterval() {
long baseTs = 1_000_000L;
UUID originator = UUID.randomUUID();
// Initial call should return true
assertThat(strategy.shouldPersist(baseTs, originator)).isTrue();
// Subsequent call within the same interval should return false for the same originator
long withinSameIntervalTs = baseTs + 1000L;
assertThat(strategy.shouldPersist(withinSameIntervalTs, originator)).isFalse();
}
@Test
void shouldHandleMultipleOriginatorsIndependently() {
long baseTs = 1_000_000L;
UUID originator1 = UUID.randomUUID();
UUID originator2 = UUID.randomUUID();
// First call for different originators in the same interval should return true independently
assertThat(strategy.shouldPersist(baseTs, originator1)).isTrue();
assertThat(strategy.shouldPersist(baseTs, originator2)).isTrue();
// Subsequent calls for the same originators within the same interval should return false
assertThat(strategy.shouldPersist(baseTs + 500L, originator1)).isFalse();
assertThat(strategy.shouldPersist(baseTs + 500L, originator2)).isFalse();
}
@Test
void shouldHandleEdgeCaseTimestamps() {
long minTs = Long.MIN_VALUE;
long maxTs = Long.MAX_VALUE;
UUID originator = UUID.randomUUID();
assertThat(strategy.shouldPersist(minTs, originator)).isTrue();
assertThat(strategy.shouldPersist(minTs + 1L, originator)).isFalse();
assertThat(strategy.shouldPersist(maxTs, originator)).isTrue();
assertThat(strategy.shouldPersist(maxTs - 1L, originator)).isFalse();
}
@Test
void shouldResetDeduplicationAtIntervalBoundaries() {
UUID originator = UUID.randomUUID();
// check 1st interval
long firstIntervalStart = 0L;
long firstIntervalEnd = firstIntervalStart + Duration.ofSeconds(deduplicationIntervalSecs).toMillis() - 1L;
long firstIntervalMiddle = calculateMiddle(firstIntervalStart, firstIntervalEnd);
assertThat(strategy.shouldPersist(firstIntervalStart, originator)).isTrue();
assertThat(strategy.shouldPersist(firstIntervalStart + 1, originator)).isFalse();
assertThat(strategy.shouldPersist(firstIntervalMiddle, originator)).isFalse();
assertThat(strategy.shouldPersist(firstIntervalEnd - 1, originator)).isFalse();
assertThat(strategy.shouldPersist(firstIntervalEnd, originator)).isFalse();
// check 2nd interval
long secondIntervalStart = firstIntervalEnd + 1L;
long secondIntervalEnd = secondIntervalStart + Duration.ofSeconds(deduplicationIntervalSecs).toMillis() - 1L;
long secondIntervalMiddle = calculateMiddle(secondIntervalStart, secondIntervalEnd);
assertThat(strategy.shouldPersist(secondIntervalStart, originator)).isTrue();
assertThat(strategy.shouldPersist(secondIntervalStart + 1, originator)).isFalse();
assertThat(strategy.shouldPersist(secondIntervalMiddle, originator)).isFalse();
assertThat(strategy.shouldPersist(secondIntervalEnd - 1, originator)).isFalse();
assertThat(strategy.shouldPersist(secondIntervalEnd, originator)).isFalse();
}
@Test
void shouldHandleMultipleOriginatorsOverMultipleIntervals() {
UUID originator1 = UUID.randomUUID();
UUID originator2 = UUID.randomUUID();
long baseTs = 0L;
// First interval for both originators
assertThat(strategy.shouldPersist(baseTs, originator1)).isTrue();
assertThat(strategy.shouldPersist(baseTs, originator2)).isTrue();
// Move to the next interval
long nextIntervalTs = baseTs + Duration.ofSeconds(10).toMillis();
// Each originator should be allowed again in the new interval
assertThat(strategy.shouldPersist(nextIntervalTs, originator1)).isTrue();
assertThat(strategy.shouldPersist(nextIntervalTs, originator2)).isTrue();
// Subsequent calls in the same new interval should return false
assertThat(strategy.shouldPersist(nextIntervalTs + 500L, originator1)).isFalse();
assertThat(strategy.shouldPersist(nextIntervalTs + 500L, originator2)).isFalse();
}
private static long calculateMiddle(long start, long end) {
return start + (end - start) / 2;
}
}

48
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/strategy/OnEveryMessagePersistenceStrategyTest.java

@ -0,0 +1,48 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.telemetry.strategy;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.UUID;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
class OnEveryMessagePersistenceStrategyTest {
@ParameterizedTest
@MethodSource("edgeCaseProvider")
void shouldAlwaysReturnTrueForAnyInput(long timestamp, UUID originator) {
var onEveryMessage = OnEveryMessagePersistenceStrategy.getInstance();
assertThat(onEveryMessage.shouldPersist(timestamp, originator)).isTrue();
}
private static Stream<Arguments> edgeCaseProvider() {
return Stream.of(
Arguments.of(Long.MIN_VALUE, new UUID(0L, 0L)),
Arguments.of(Long.MAX_VALUE, new UUID(Long.MAX_VALUE, Long.MAX_VALUE)),
Arguments.of(0L, new UUID(0L, 0L)),
Arguments.of(-1L, new UUID(-1L, -1L)),
Arguments.of(1L, new UUID(1L, 1L)),
Arguments.of(42L, UUID.randomUUID()),
Arguments.of(1000L, null)
);
}
}

55
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/strategy/PersistenceStrategyTest.java

@ -0,0 +1,55 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.telemetry.strategy;
import org.junit.jupiter.api.Test;
import org.springframework.test.util.ReflectionTestUtils;
import java.time.Duration;
import static org.assertj.core.api.Assertions.assertThat;
class PersistenceStrategyTest {
@Test
void testOnEveryMessageReturnsCorrectInstance() {
PersistenceStrategy strategy = PersistenceStrategy.onEveryMessage();
assertThat(strategy)
.isNotNull()
.isInstanceOf(OnEveryMessagePersistenceStrategy.class);
}
@Test
void testDeduplicateReturnsCorrectInstance() {
int validDeduplicationIntervalSecs = 5;
PersistenceStrategy strategy = PersistenceStrategy.deduplicate(validDeduplicationIntervalSecs);
assertThat(strategy)
.isNotNull()
.isInstanceOf(DeduplicatePersistenceStrategy.class);
long actualDeduplicationIntervalMillis = (long) ReflectionTestUtils.getField(strategy, "deduplicationIntervalMillis");
assertThat(actualDeduplicationIntervalMillis).isEqualTo(Duration.ofSeconds(validDeduplicationIntervalSecs).toMillis());
}
@Test
void testSkipReturnsCorrectInstance() {
PersistenceStrategy strategy = PersistenceStrategy.skip();
assertThat(strategy)
.isNotNull()
.isInstanceOf(SkipPersistenceStrategy.class);
}
}

48
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/strategy/SkipPersistenceStrategyTest.java

@ -0,0 +1,48 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.telemetry.strategy;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.UUID;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
class SkipPersistenceStrategyTest {
@ParameterizedTest
@MethodSource("edgeCaseProvider")
void shouldAlwaysReturnFalseForAnyInput(long timestamp, UUID originator) {
var skipStrategy = SkipPersistenceStrategy.getInstance();
assertThat(skipStrategy.shouldPersist(timestamp, originator)).isFalse();
}
private static Stream<Arguments> edgeCaseProvider() {
return Stream.of(
Arguments.of(Long.MIN_VALUE, new UUID(0L, 0L)),
Arguments.of(Long.MAX_VALUE, new UUID(Long.MAX_VALUE, Long.MAX_VALUE)),
Arguments.of(0L, new UUID(0L, 0L)),
Arguments.of(-1L, new UUID(-1L, -1L)),
Arguments.of(1L, new UUID(1L, 1L)),
Arguments.of(42L, UUID.randomUUID()),
Arguments.of(1000L, null)
);
}
}

10
ui-ngx/src/app/modules/home/components/rule-node/action/action-rule-node-config.module.ts

@ -42,6 +42,12 @@ import { DeleteAttributesConfigComponent } from './delete-attributes-config.comp
import { MathFunctionConfigComponent } from './math-function-config.component';
import { DeviceStateConfigComponent } from './device-state-config.component';
import { SendRestApiCallReplyConfigComponent } from './send-rest-api-call-reply-config.component';
import {
AdvancedPersistenceSettingComponent
} from '@home/components/rule-node/action/advanced-persistence-setting.component';
import {
AdvancedPersistenceSettingRowComponent
} from '@home/components/rule-node/action/advanced-persistence-setting-row.component';
@NgModule({
declarations: [
@ -67,7 +73,9 @@ import { SendRestApiCallReplyConfigComponent } from './send-rest-api-call-reply-
PushToEdgeConfigComponent,
PushToCloudConfigComponent,
MathFunctionConfigComponent,
DeviceStateConfigComponent
DeviceStateConfigComponent,
AdvancedPersistenceSettingComponent,
AdvancedPersistenceSettingRowComponent,
],
imports: [
CommonModule,

40
ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting-row.component.html

@ -0,0 +1,40 @@
<!--
Copyright © 2016-2024 The Thingsboard Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<section [formGroup]="persistenceSettingRowForm" class="tb-form-panel stroked no-gap no-padding-bottom">
<div class="tb-form-panel-title mb-4">{{ title }}</div>
<mat-form-field>
<mat-label translate>rule-node-config.save-time-series.strategy</mat-label>
<mat-select formControlName="type">
@for (strategy of persistenceStrategies; track strategy) {
<mat-option [value]="strategy">{{ PersistenceTypeTranslationMap.get(strategy) | translate }}</mat-option>
}
</mat-select>
</mat-form-field>
@if(persistenceSettingRowForm.get('type').value === PersistenceType.DEDUPLICATE) {
<tb-time-unit-input
required
labelText="{{ 'rule-node-config.save-time-series.deduplication-interval' | translate }}"
requiredText="{{ 'rule-node-config.save-time-series.deduplication-interval-required' | translate }}"
minErrorText="{{ 'rule-node-config.save-time-series.deduplication-interval-min-max-range' | translate }}"
maxErrorText="{{ 'rule-node-config.save-time-series.deduplication-interval-min-max-range' | translate }}"
[minTime]="1"
[maxTime]="maxDeduplicateTime"
formControlName="deduplicationIntervalSecs">
</tb-time-unit-input>
}
</section>

114
ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting-row.component.ts

@ -0,0 +1,114 @@
///
/// Copyright © 2016-2024 The Thingsboard Authors
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
import { Component, forwardRef, Input } from '@angular/core';
import {
ControlValueAccessor,
FormBuilder,
NG_VALIDATORS,
NG_VALUE_ACCESSOR,
ValidationErrors,
Validator
} from '@angular/forms';
import {
AdvancedPersistenceConfig,
defaultAdvancedPersistenceConfig,
maxDeduplicateTimeSecs,
PersistenceType,
PersistenceTypeTranslationMap
} from '@home/components/rule-node/action/timeseries-config.models';
import { isDefinedAndNotNull } from '@core/utils';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
@Component({
selector: 'tb-advanced-persistence-setting-row',
templateUrl: './advanced-persistence-setting-row.component.html',
providers: [{
provide: NG_VALUE_ACCESSOR,
useExisting: forwardRef(() => AdvancedPersistenceSettingRowComponent),
multi: true
},{
provide: NG_VALIDATORS,
useExisting: forwardRef(() => AdvancedPersistenceSettingRowComponent),
multi: true
}]
})
export class AdvancedPersistenceSettingRowComponent implements ControlValueAccessor, Validator {
@Input()
title: string;
persistenceSettingRowForm = this.fb.group({
type: [defaultAdvancedPersistenceConfig.type],
deduplicationIntervalSecs: [{value: 60, disabled: true}]
});
PersistenceType = PersistenceType;
persistenceStrategies = [PersistenceType.ON_EVERY_MESSAGE, PersistenceType.DEDUPLICATE, PersistenceType.SKIP];
PersistenceTypeTranslationMap = PersistenceTypeTranslationMap;
maxDeduplicateTime = maxDeduplicateTimeSecs;
private propagateChange: (value: any) => void = () => {};
constructor(private fb: FormBuilder) {
this.persistenceSettingRowForm.get('type').valueChanges.pipe(
takeUntilDestroyed()
).subscribe(() => this.updatedValidation());
this.persistenceSettingRowForm.valueChanges.pipe(
takeUntilDestroyed()
).subscribe((value) => this.propagateChange(value));
}
registerOnChange(fn: any) {
this.propagateChange = fn;
}
registerOnTouched(_fn: any) {
}
setDisabledState(isDisabled: boolean) {
if (isDisabled) {
this.persistenceSettingRowForm.disable({emitEvent: false});
} else {
this.persistenceSettingRowForm.enable({emitEvent: false});
this.updatedValidation();
}
}
validate(): ValidationErrors | null {
return this.persistenceSettingRowForm.valid ? null : {
persistenceSettingRow: false
};
}
writeValue(value: AdvancedPersistenceConfig) {
if (isDefinedAndNotNull(value)) {
this.persistenceSettingRowForm.patchValue(value, {emitEvent: false});
} else {
this.persistenceSettingRowForm.patchValue(defaultAdvancedPersistenceConfig);
}
}
private updatedValidation() {
if (this.persistenceSettingRowForm.get('type').value === PersistenceType.DEDUPLICATE) {
this.persistenceSettingRowForm.get('deduplicationIntervalSecs').enable({emitEvent: false});
} else {
this.persistenceSettingRowForm.get('deduplicationIntervalSecs').disable({emitEvent: false})
}
}
}

31
ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting.component.html

@ -0,0 +1,31 @@
<!--
Copyright © 2016-2024 The Thingsboard Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<section [formGroup]="persistenceForm" class="tb-form-panel no-border no-padding">
<tb-advanced-persistence-setting-row
formControlName="timeseries"
title="{{ 'rule-node-config.save-time-series.time-series' | translate }}"
></tb-advanced-persistence-setting-row>
<tb-advanced-persistence-setting-row
formControlName="latest"
title="{{ 'rule-node-config.save-time-series.latest' | translate }}"
></tb-advanced-persistence-setting-row>
<tb-advanced-persistence-setting-row
formControlName="webSockets"
title="{{ 'rule-node-config.save-time-series.web-sockets' | translate }}"
></tb-advanced-persistence-setting-row>
</section>

83
ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting.component.ts

@ -0,0 +1,83 @@
///
/// Copyright © 2016-2024 The Thingsboard Authors
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
import {
ControlValueAccessor,
FormBuilder,
NG_VALIDATORS,
NG_VALUE_ACCESSOR,
ValidationErrors,
Validator
} from '@angular/forms';
import { Component, forwardRef } from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { AdvancedPersistenceStrategy } from '@home/components/rule-node/action/timeseries-config.models';
@Component({
selector: 'tb-advanced-persistence-settings',
templateUrl: './advanced-persistence-setting.component.html',
providers: [{
provide: NG_VALUE_ACCESSOR,
useExisting: forwardRef(() => AdvancedPersistenceSettingComponent),
multi: true
},{
provide: NG_VALIDATORS,
useExisting: forwardRef(() => AdvancedPersistenceSettingComponent),
multi: true
}]
})
export class AdvancedPersistenceSettingComponent implements ControlValueAccessor, Validator {
persistenceForm = this.fb.group({
timeseries: [null],
latest: [null],
webSockets: [null]
});
private propagateChange: (value: any) => void = () => {};
constructor(private fb: FormBuilder) {
this.persistenceForm.valueChanges.pipe(
takeUntilDestroyed()
).subscribe(value => this.propagateChange(value));
}
registerOnChange(fn: any) {
this.propagateChange = fn;
}
registerOnTouched(_fn: any) {
}
setDisabledState(isDisabled: boolean) {
if (isDisabled) {
this.persistenceForm.disable({emitEvent: false});
} else {
this.persistenceForm.enable({emitEvent: false});
}
}
validate(): ValidationErrors | null {
return this.persistenceForm.valid ? null : {
persistenceForm: false
};
}
writeValue(value: AdvancedPersistenceStrategy) {
this.persistenceForm.patchValue(value, {emitEvent: false});
}
}

98
ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.component.html

@ -16,35 +16,75 @@
-->
<section [formGroup]="timeseriesConfigForm" class="tb-form-panel no-border no-padding">
<mat-form-field class="mat-block flex-1">
<mat-label translate>rule-node-config.default-ttl</mat-label>
<input type="number" min="0" step="1" matInput formControlName="defaultTTL" required>
<mat-icon class="help-icon margin-8 cursor-pointer"
aria-hidden="false"
aria-label="help-icon"
matSuffix
matTooltip="{{ 'rule-node-config.default-ttl-hint' | translate }}">
help
</mat-icon>
<mat-error *ngIf="timeseriesConfigForm.get('defaultTTL').hasError('required')">
{{ 'rule-node-config.default-ttl-required' | translate }}
</mat-error>
<mat-error *ngIf="timeseriesConfigForm.get('defaultTTL').hasError('min')">
{{ 'rule-node-config.min-default-ttl-message' | translate }}
</mat-error>
</mat-form-field>
<div class="tb-form-panel stroked">
<div tb-hint-tooltip-icon="{{ 'rule-node-config.use-server-ts-hint' | translate}}"
class="tb-form-row no-border no-padding">
<mat-slide-toggle class="mat-slide" formControlName="useServerTs">
{{ 'rule-node-config.use-server-ts' | translate }}
</mat-slide-toggle>
</div>
<div tb-hint-tooltip-icon="{{ 'rule-node-config.skip-latest-persistence-hint' | translate}}"
class="tb-form-row no-border no-padding">
<mat-slide-toggle class="mat-slide" formControlName="skipLatestPersistence">
{{ 'rule-node-config.skip-latest-persistence' | translate }}
</mat-slide-toggle>
<div class="tb-form-panel stroked no-padding-bottom no-gap" formGroupName="persistenceSettings">
<div class="mb-4 flex flex-row items-center justify-between">
<div class="tb-form-panel-title" tb-hint-tooltip-icon="{{ 'rule-node-config.save-time-series.persistence-settings-hint' | translate}}" translate>
rule-node-config.save-time-series.persistence-settings
</div>
<tb-toggle-select appearance="fill" selectMediaBreakpoint="xs"
formControlName="isAdvanced">
<tb-toggle-option [value]=false>{{ 'rule-node-config.basic-mode' | translate}}</tb-toggle-option>
<tb-toggle-option [value]=true>{{ 'rule-node-config.advanced-mode' | translate }}</tb-toggle-option>
</tb-toggle-select>
</div>
@if(!timeseriesConfigForm.get('persistenceSettings.isAdvanced').value) {
<mat-form-field>
<mat-label translate>rule-node-config.save-time-series.strategy</mat-label>
<mat-select formControlName="type">
@for (strategy of persistenceStrategies; track strategy) {
<mat-option [value]="strategy">{{ PersistenceTypeTranslationMap.get(strategy) | translate }}</mat-option>
}
</mat-select>
</mat-form-field>
@if(timeseriesConfigForm.get('persistenceSettings.type').value === PersistenceType.DEDUPLICATE) {
<tb-time-unit-input
required
labelText="{{ 'rule-node-config.save-time-series.deduplication-interval' | translate }}"
requiredText="{{ 'rule-node-config.save-time-series.deduplication-interval-required' | translate }}"
minErrorText="{{ 'rule-node-config.save-time-series.deduplication-interval-min-max-range' | translate }}"
maxErrorText="{{ 'rule-node-config.save-time-series.deduplication-interval-min-max-range' | translate }}"
[maxTime]="maxDeduplicateTime"
[minTime]="1"
formControlName="deduplicationIntervalSecs">
</tb-time-unit-input>
}
}
@else {
<tb-advanced-persistence-settings
class="mb-4"
formControlName="advanced"
></tb-advanced-persistence-settings>
}
</div>
<section class="tb-form-panel stroked">
<mat-expansion-panel class="tb-settings">
<mat-expansion-panel-header>
<mat-panel-title translate>rule-node-config.advanced-settings</mat-panel-title>
</mat-expansion-panel-header>
<ng-template matExpansionPanelContent>
<div tb-hint-tooltip-icon="{{ 'rule-node-config.use-server-ts-hint' | translate}}"
class="tb-form-row no-border no-padding">
<mat-slide-toggle class="mat-slide" formControlName="useServerTs">
{{ 'rule-node-config.use-server-ts' | translate }}
</mat-slide-toggle>
</div>
<tb-time-unit-input
required
subscriptSizing="dynamic"
labelText="{{ 'rule-node-config.default-ttl' | translate }}"
requiredText="{{ 'rule-node-config.default-ttl-required' | translate }}"
minErrorText="{{ 'rule-node-config.min-default-ttl-message' | translate }}"
formControlName="defaultTTL">
<mat-icon class="mr-2 cursor-pointer"
aria-hidden="false"
aria-label="help-icon"
matSuffix
matTooltip="{{ 'rule-node-config.default-ttl-hint' | translate }}">
help
</mat-icon>
</tb-time-unit-input>
</ng-template>
</mat-expansion-panel>
</section>
</section>

105
ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.component.ts

@ -15,8 +15,18 @@
///
import { Component } from '@angular/core';
import { UntypedFormBuilder, UntypedFormGroup, Validators } from '@angular/forms';
import { RuleNodeConfiguration, RuleNodeConfigurationComponent } from '@shared/models/rule-node.models';
import { FormBuilder, FormGroup, Validators } from '@angular/forms';
import { RuleNodeConfigurationComponent } from '@shared/models/rule-node.models';
import {
defaultAdvancedPersistenceStrategy,
maxDeduplicateTimeSecs,
PersistenceSettings,
PersistenceSettingsForm,
PersistenceType,
PersistenceTypeTranslationMap,
TimeseriesNodeConfiguration,
TimeseriesNodeConfigurationForm
} from '@home/components/rule-node/action/timeseries-config.models';
@Component({
selector: 'tb-action-node-timeseries-config',
@ -25,21 +35,98 @@ import { RuleNodeConfiguration, RuleNodeConfigurationComponent } from '@shared/m
})
export class TimeseriesConfigComponent extends RuleNodeConfigurationComponent {
timeseriesConfigForm: UntypedFormGroup;
timeseriesConfigForm: FormGroup;
constructor(private fb: UntypedFormBuilder) {
PersistenceType = PersistenceType;
persistenceStrategies = [PersistenceType.ON_EVERY_MESSAGE, PersistenceType.DEDUPLICATE, PersistenceType.WEBSOCKETS_ONLY];
PersistenceTypeTranslationMap = PersistenceTypeTranslationMap;
maxDeduplicateTime = maxDeduplicateTimeSecs
constructor(private fb: FormBuilder) {
super();
}
protected configForm(): UntypedFormGroup {
protected configForm(): FormGroup {
return this.timeseriesConfigForm;
}
protected onConfigurationSet(configuration: RuleNodeConfiguration) {
protected validatorTriggers(): string[] {
return ['persistenceSettings.isAdvanced', 'persistenceSettings.type'];
}
protected prepareInputConfig(config: TimeseriesNodeConfiguration): TimeseriesNodeConfigurationForm {
let persistenceSettings: PersistenceSettingsForm;
if (config?.persistenceSettings) {
const isAdvanced = config?.persistenceSettings?.type === PersistenceType.ADVANCED;
persistenceSettings = {
type: isAdvanced ? PersistenceType.ON_EVERY_MESSAGE : config.persistenceSettings.type,
isAdvanced: isAdvanced,
deduplicationIntervalSecs: config.persistenceSettings?.deduplicationIntervalSecs ?? 60,
advanced: isAdvanced ? config.persistenceSettings : defaultAdvancedPersistenceStrategy
}
} else {
persistenceSettings = {
type: PersistenceType.ON_EVERY_MESSAGE,
isAdvanced: false,
deduplicationIntervalSecs: 60,
advanced: defaultAdvancedPersistenceStrategy
};
}
return {
...config,
persistenceSettings: persistenceSettings
}
}
protected prepareOutputConfig(config: TimeseriesNodeConfigurationForm): TimeseriesNodeConfiguration {
let persistenceSettings: PersistenceSettings;
if (config.persistenceSettings.isAdvanced) {
persistenceSettings = {
...config.persistenceSettings.advanced,
type: PersistenceType.ADVANCED
};
} else {
persistenceSettings = {
type: config.persistenceSettings.type,
deduplicationIntervalSecs: config.persistenceSettings?.deduplicationIntervalSecs
};
}
return {
...config,
persistenceSettings
};
}
protected onConfigurationSet(config: TimeseriesNodeConfigurationForm) {
this.timeseriesConfigForm = this.fb.group({
defaultTTL: [configuration ? configuration.defaultTTL : null, [Validators.required, Validators.min(0)]],
skipLatestPersistence: [configuration ? configuration.skipLatestPersistence : false, []],
useServerTs: [configuration ? configuration.useServerTs : false, []]
persistenceSettings: this.fb.group({
isAdvanced: [config?.persistenceSettings?.isAdvanced ?? false],
type: [config?.persistenceSettings?.type ?? PersistenceType.ON_EVERY_MESSAGE],
deduplicationIntervalSecs: [
{value: config?.persistenceSettings?.deduplicationIntervalSecs ?? 60, disabled: true},
[Validators.required, Validators.max(maxDeduplicateTimeSecs)]
],
advanced: [{value: null, disabled: true}]
}),
defaultTTL: [config?.defaultTTL ?? null, [Validators.required, Validators.min(0)]],
useServerTs: [config?.useServerTs ?? false]
});
}
protected updateValidators(emitEvent: boolean, _trigger?: string) {
const persistenceForm = this.timeseriesConfigForm.get('persistenceSettings') as FormGroup;
const isAdvanced: boolean = persistenceForm.get('isAdvanced').value;
const type: PersistenceType = persistenceForm.get('type').value;
if (!isAdvanced && type === PersistenceType.DEDUPLICATE) {
persistenceForm.get('deduplicationIntervalSecs').enable({emitEvent});
} else {
persistenceForm.get('deduplicationIntervalSecs').disable({emitEvent});
}
if (isAdvanced) {
persistenceForm.get('advanced').enable({emitEvent});
} else {
persistenceForm.get('advanced').disable({emitEvent});
}
}
}

78
ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.models.ts

@ -0,0 +1,78 @@
///
/// Copyright © 2016-2024 The Thingsboard Authors
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
import { DAY, SECOND } from '@shared/models/time/time.models';
export const maxDeduplicateTimeSecs = DAY / SECOND;
export interface TimeseriesNodeConfiguration {
persistenceSettings: PersistenceSettings;
defaultTTL: number;
useServerTs: boolean;
}
export interface TimeseriesNodeConfigurationForm extends Omit<TimeseriesNodeConfiguration, 'persistenceSettings'> {
persistenceSettings: PersistenceSettingsForm
}
export type PersistenceSettings = BasicPersistenceSettings & Partial<DeduplicatePersistenceStrategy> & Partial<AdvancedPersistenceStrategy>;
export type PersistenceSettingsForm = Omit<PersistenceSettings, keyof AdvancedPersistenceStrategy> & {
isAdvanced: boolean;
advanced?: Partial<AdvancedPersistenceStrategy>;
type: PersistenceType;
};
export enum PersistenceType {
ON_EVERY_MESSAGE = 'ON_EVERY_MESSAGE',
DEDUPLICATE = 'DEDUPLICATE',
WEBSOCKETS_ONLY = 'WEBSOCKETS_ONLY',
ADVANCED = 'ADVANCED',
SKIP = 'SKIP'
}
export const PersistenceTypeTranslationMap = new Map<PersistenceType, string>([
[PersistenceType.ON_EVERY_MESSAGE, 'rule-node-config.save-time-series.strategy-type.every-message'],
[PersistenceType.DEDUPLICATE, 'rule-node-config.save-time-series.strategy-type.deduplicate'],
[PersistenceType.WEBSOCKETS_ONLY, 'rule-node-config.save-time-series.strategy-type.web-sockets-only'],
[PersistenceType.SKIP, 'rule-node-config.save-time-series.strategy-type.skip'],
])
export interface BasicPersistenceSettings {
type: PersistenceType;
}
export interface DeduplicatePersistenceStrategy extends BasicPersistenceSettings{
deduplicationIntervalSecs: number;
}
export interface AdvancedPersistenceStrategy extends BasicPersistenceSettings{
timeseries: AdvancedPersistenceConfig;
latest: AdvancedPersistenceConfig;
webSockets: AdvancedPersistenceConfig;
}
export type AdvancedPersistenceConfig = WithOptional<DeduplicatePersistenceStrategy, 'deduplicationIntervalSecs'>;
export const defaultAdvancedPersistenceConfig: AdvancedPersistenceConfig = {
type: PersistenceType.ON_EVERY_MESSAGE
}
export const defaultAdvancedPersistenceStrategy: Omit<AdvancedPersistenceStrategy, 'type'> = {
timeseries: defaultAdvancedPersistenceConfig,
latest: defaultAdvancedPersistenceConfig,
webSockets: defaultAdvancedPersistenceConfig,
}

7
ui-ngx/src/app/modules/home/components/rule-node/common/common-rule-node-config.module.ts

@ -33,6 +33,7 @@ import { RelationsQueryConfigOldComponent } from './relations-query-config-old.c
import { SelectAttributesComponent } from './select-attributes.component';
import { AlarmStatusSelectComponent } from './alarm-status-select.component';
import { ExampleHintComponent } from './example-hint.component';
import { TimeUnitInputComponent } from './time-unit-input.component';
@NgModule({
declarations: [
@ -50,7 +51,8 @@ import { ExampleHintComponent } from './example-hint.component';
RelationsQueryConfigOldComponent,
SelectAttributesComponent,
AlarmStatusSelectComponent,
ExampleHintComponent
ExampleHintComponent,
TimeUnitInputComponent
],
imports: [
CommonModule,
@ -72,7 +74,8 @@ import { ExampleHintComponent } from './example-hint.component';
RelationsQueryConfigOldComponent,
SelectAttributesComponent,
AlarmStatusSelectComponent,
ExampleHintComponent
ExampleHintComponent,
TimeUnitInputComponent
]
})

44
ui-ngx/src/app/modules/home/components/rule-node/common/time-unit-input.component.html

@ -0,0 +1,44 @@
<!--
Copyright © 2016-2024 The Thingsboard Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<section [formGroup]="timeInputForm" class="flex gap-4">
<mat-form-field class="max-w-66% flex-full" subscriptSizing="dynamic">
<mat-label *ngIf="labelText">{{ labelText }}</mat-label>
<input type="number" min="0" step="1" matInput formControlName="time">
<div matSuffix>
<ng-content select="[matSuffix]"></ng-content>
</div>
<mat-hint *ngIf="subscriptSizing === 'fixed'"></mat-hint>
<mat-error *ngIf="timeInputForm.get('time').hasError('required') && requiredText">
{{ requiredText }}
</mat-error>
<mat-error *ngIf="timeInputForm.get('time').hasError('min') && minErrorText">
{{ minErrorText }}
</mat-error>
<mat-error *ngIf="timeInputForm.get('time').hasError('max') && maxErrorText">
{{ maxErrorText }}
</mat-error>
</mat-form-field>
<mat-form-field class="h-fit max-w-33% flex-full" [subscriptSizing]="subscriptSizing">
<mat-label translate>rule-node-config.units</mat-label>
<mat-select formControlName="timeUnit">
@for (timeUnit of timeUnits; track timeUnit) {
<mat-option [value]="timeUnit">{{ timeUnitTranslations.get(timeUnit) | translate }}</mat-option>
}
</mat-select>
</mat-form-field>
</section>

199
ui-ngx/src/app/modules/home/components/rule-node/common/time-unit-input.component.ts

@ -0,0 +1,199 @@
///
/// Copyright © 2016-2024 The Thingsboard Authors
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
import { Component, DestroyRef, forwardRef, Input, OnInit } from '@angular/core';
import {
AbstractControl,
ControlValueAccessor,
FormBuilder,
NG_VALIDATORS,
NG_VALUE_ACCESSOR,
ValidationErrors,
Validator,
Validators
} from '@angular/forms';
import { TimeUnit, timeUnitTranslations } from '../rule-node-config.models';
import { isDefinedAndNotNull, isNumeric } from '@core/utils';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { coerceBoolean, coerceNumber } from '@shared/decorators/coercion';
import { DAY, HOUR, MINUTE, SECOND } from '@shared/models/time/time.models';
import { SubscriptSizing } from '@angular/material/form-field';
interface TimeUnitInputModel {
time: number;
timeUnit: TimeUnit
}
@Component({
selector: 'tb-time-unit-input',
templateUrl: './time-unit-input.component.html',
providers: [{
provide: NG_VALUE_ACCESSOR,
useExisting: forwardRef(() => TimeUnitInputComponent),
multi: true
},{
provide: NG_VALIDATORS,
useExisting: forwardRef(() => TimeUnitInputComponent),
multi: true
}]
})
export class TimeUnitInputComponent implements ControlValueAccessor, Validator, OnInit {
@Input()
labelText: string;
@Input()
@coerceBoolean()
required: boolean;
@Input()
requiredText: string;
@Input()
@coerceNumber()
minTime = 0;
@Input()
minErrorText: string;
@Input()
@coerceNumber()
maxTime: number;
@Input()
maxErrorText: string;
@Input()
subscriptSizing: SubscriptSizing = 'fixed';
timeUnits = Object.values(TimeUnit).filter(item => item !== TimeUnit.MILLISECONDS) as TimeUnit[];
timeUnitTranslations = timeUnitTranslations;
timeInputForm = this.fb.group({
time: [0],
timeUnit: [TimeUnit.SECONDS]
});
private timeIntervalsInSec = new Map<TimeUnit, number>([
[TimeUnit.DAYS, DAY/SECOND],
[TimeUnit.HOURS, HOUR/SECOND],
[TimeUnit.MINUTES, MINUTE/SECOND],
[TimeUnit.SECONDS, SECOND/SECOND],
]);
private modelValue: number;
private propagateChange: (value: any) => void = () => {};
constructor(private fb: FormBuilder,
private destroyRef: DestroyRef) {
}
ngOnInit() {
if(this.required || this.maxTime) {
const timeControl = this.timeInputForm.get('time');
const validators = [Validators.pattern(/^\d*$/)];
if (this.required) {
validators.push(Validators.required);
}
if (this.maxTime) {
validators.push((control: AbstractControl) =>
Validators.max(Math.floor(this.maxTime / this.timeIntervalsInSec.get(this.timeInputForm.get('timeUnit').value)))(control)
);
}
if (isDefinedAndNotNull(this.minTime)) {
validators.push(Validators.min(this.minTime));
}
timeControl.setValidators(validators);
timeControl.updateValueAndValidity({ emitEvent: false });
}
this.timeInputForm.get('timeUnit').valueChanges.pipe(
takeUntilDestroyed(this.destroyRef)
).subscribe(() => {
this.timeInputForm.get('time').updateValueAndValidity({onlySelf: true});
this.timeInputForm.get('time').markAsTouched({onlySelf: true});
});
this.timeInputForm.valueChanges.pipe(
takeUntilDestroyed(this.destroyRef)
).subscribe(value => {
this.updatedModel(value);
});
}
registerOnChange(fn: any) {
this.propagateChange = fn;
}
registerOnTouched(_fn: any) {
}
setDisabledState(isDisabled: boolean) {
if (isDisabled) {
this.timeInputForm.disable({emitEvent: false});
} else {
this.timeInputForm.enable({emitEvent: false});
if(this.timeInputForm.invalid) {
setTimeout(() => this.updatedModel(this.timeInputForm.value, true))
}
}
}
writeValue(sec: number) {
if (sec !== this.modelValue) {
if (isDefinedAndNotNull(sec) && isNumeric(sec) && Number(sec) !== 0) {
this.timeInputForm.patchValue(this.parseTime(sec), {emitEvent: false});
this.modelValue = sec;
} else {
this.timeInputForm.patchValue({
time: 0,
timeUnit: TimeUnit.SECONDS
}, {emitEvent: false});
this.modelValue = 0;
}
}
}
validate(): ValidationErrors | null {
return this.timeInputForm.valid ? null : {
timeInput: false
};
}
private updatedModel(value: Partial<TimeUnitInputModel>, forceUpdated = false) {
const time = value.time * this.timeIntervalsInSec.get(value.timeUnit);
if (this.modelValue !== time || forceUpdated) {
this.modelValue = time;
this.propagateChange(time);
}
}
private parseTime(value: number): TimeUnitInputModel {
for (const [timeUnit, timeValue] of this.timeIntervalsInSec) {
const calc = value / timeValue;
if (Number.isInteger(calc)) {
return {
time: calc,
timeUnit: timeUnit
}
}
}
}
}

28
ui-ngx/src/assets/locale/locale.constant-en_US.json

@ -4604,7 +4604,7 @@
"originator-entity": "Entity by name pattern",
"clone-message": "Clone message",
"transform": "Transform",
"default-ttl": "Default TTL in seconds",
"default-ttl": "Default TTL",
"default-ttl-required": "Default TTL is required.",
"default-ttl-hint": "Rule node will fetch Time-to-Live (TTL) value from the message metadata. If no value is present, it defaults to the TTL specified in the configuration. If the value is set to 0, the TTL from the tenant profile configuration will be applied.",
"default-ttl-zero-hint": "TTL will not be applied if its value is set to 0.",
@ -4969,10 +4969,8 @@
"general-pattern-hint": "Use ${metadataKey} for value from metadata, $[messageKey] for value from message body.",
"alarm-severity-pattern-hint": "Use <code><span style=\"color: #000;\">$&#123;</span>metadataKey<span style=\"color: #000;\">&#125;</span></code> for value from metadata, <code><span style=\"color: #000;\">$[</span>messageKey<span style=\"color: #000;\">]</span></code> for value from message body. Alarm severity should be system (CRITICAL, MAJOR etc.)",
"output-node-name-hint": "The <b>rule node name</b> corresponds to the <b>relation type</b> of the output message, and it is used to forward messages to other rule nodes in the caller rule chain.",
"skip-latest-persistence": "Skip latest persistence",
"skip-latest-persistence-hint": "Rule node will not update values for incoming keys for the latest time series data. Useful for highly loaded use-cases to decrease the pressure on the DB.",
"use-server-ts": "Use server ts",
"use-server-ts-hint": "Rule node will use the timestamp of message processing instead of the timestamp from the message. Useful for all sorts of sequential processing if you merge messages from multiple sources (devices, assets, etc).",
"use-server-ts": "Use server timestamp",
"use-server-ts-hint": "Use the server’s current timestamp for time series data that lacks an explicit timestamp. This helps maintain proper ordering when processing messages from multiple sources or when messages arrive out of sequence.",
"kv-map-pattern-hint": "All input fields support templatization. Use $[messageKey] to extract value from the message and ${metadataKey} to extract value from the metadata.",
"kv-map-single-pattern-hint": "Input field support templatization. Use $[messageKey] to extract value from the message and ${metadataKey} to extract value from the metadata.",
"shared-scope": "Shared scope",
@ -5130,8 +5128,28 @@
"request-timeout-required": "Request timeout is required",
"request-timeout-min": "Min request timeout is 0",
"request-timeout-hint": "The amount of time to wait in seconds for the request to complete before giving up and timing out. A value of 0 means infinity, and is not recommended.",
"units": "Units",
"tell-failure-aws-lambda": "Tell Failure if AWS Lambda function execution raises exception",
"tell-failure-aws-lambda-hint": "Rule node forces failure of message processing if AWS Lambda function execution raises exception.",
"basic-mode": "Basic",
"advanced-mode": "Advanced",
"save-time-series": {
"persistence-settings": "Persistence settings",
"persistence-settings-hint": "Define how and when time series data is saved. In Basic mode, apply a single persistence strategy to all actions or enable only WebSockets updates. Advanced mode allows you to configure individual persistence strategies for each action.",
"strategy": "Strategy",
"deduplication-interval": "Deduplication interval",
"deduplication-interval-required": "Deduplication interval is required",
"deduplication-interval-min-max-range": "Deduplication interval should be at least 1 second and at most 1 day",
"strategy-type": {
"every-message": "On every message",
"skip": "Skip",
"deduplicate": "Deduplicate",
"web-sockets-only": "WebSockets only"
},
"time-series": "Time series",
"latest": "Latest",
"web-sockets": "WebSockets"
},
"key-val": {
"key": "Key",
"value": "Value",

Loading…
Cancel
Save