From 3fea0f88f36d5e2d14376281d6b877d32d39a06e Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 12 Dec 2023 14:03:48 +0100 Subject: [PATCH 1/8] moved queueName from rule-node config to rule-node --- .../main/data/upgrade/3.6.2/schema_update.sql | 19 +++++++ .../actors/ruleChain/DefaultTbContext.java | 4 +- .../install/ThingsboardInstallService.java | 5 ++ .../AnnotationComponentDiscoveryService.java | 1 + .../install/SqlDatabaseUpgradeService.java | 3 ++ .../update/DefaultDataUpdateService.java | 49 +++++++++++++++++++ .../impl/RuleChainImportService.java | 21 +++++++- .../data/plugin/ComponentDescriptor.java | 7 +++ .../server/common/data/rule/RuleNode.java | 6 ++- .../server/dao/model/ModelConstants.java | 2 + .../model/sql/ComponentDescriptorEntity.java | 5 ++ .../server/dao/model/sql/RuleNodeEntity.java | 5 ++ ...ctComponentDescriptorInsertRepository.java | 3 +- ...qlComponentDescriptorInsertRepository.java | 4 +- .../main/resources/sql/schema-entities.sql | 4 +- .../thingsboard/rule/engine/api/RuleNode.java | 2 + .../rule/engine/debug/TbMsgGeneratorNode.java | 9 ++-- .../TbMsgGeneratorNodeConfiguration.java | 1 - .../deduplication/TbMsgDeduplicationNode.java | 5 +- .../TbMsgDeduplicationNodeConfiguration.java | 1 - .../rule/engine/flow/TbCheckpointNode.java | 9 ++-- .../flow/TbCheckpointNodeConfiguration.java | 30 ------------ .../transform/TbMsgDeduplicationNodeTest.java | 17 +++++-- 23 files changed, 158 insertions(+), 54 deletions(-) create mode 100644 application/src/main/data/upgrade/3.6.2/schema_update.sql delete mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeConfiguration.java diff --git a/application/src/main/data/upgrade/3.6.2/schema_update.sql b/application/src/main/data/upgrade/3.6.2/schema_update.sql new file mode 100644 index 0000000000..903137be79 --- /dev/null +++ b/application/src/main/data/upgrade/3.6.2/schema_update.sql @@ -0,0 +1,19 @@ +-- +-- Copyright © 2016-2023 The Thingsboard Authors +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +ALTER TABLE rule_node ADD COLUMN IF NOT EXISTS queue_name varchar(255); + +ALTER TABLE component_descriptor ADD COLUMN IF NOT EXISTS has_queue_name boolean DEFAULT false; diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 28963b383a..bf61cf7a59 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -185,7 +185,7 @@ class DefaultTbContext implements TbContext { @Override public void enqueue(TbMsg tbMsg, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, nodeCtx.getSelf().getQueueName(), getTenantId(), tbMsg.getOriginator()); enqueue(tpi, tbMsg, onFailure, onSuccess); } @@ -310,7 +310,7 @@ class DefaultTbContext implements TbContext { @Override public boolean isLocalEntity(EntityId entityId) { - return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), entityId).isMyPartition(); + return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, nodeCtx.getSelf().getQueueName(), getTenantId(), entityId).isMyPartition(); } private void scheduleMsgWithDelay(TbActorMsg msg, long delayInMs, TbActorRef target) { diff --git a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java index 3fc68b9cc3..1be7fdecc4 100644 --- a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java +++ b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java @@ -277,6 +277,11 @@ public class ThingsboardInstallService { } else { log.info("Skipping images migration. Run the upgrade with fromVersion as '3.6.2-images' to migrate"); } + break; + case "3.6.2": + log.info("Upgrading ThingsBoard from version 3.6.2 to 3.6.3 ..."); + databaseEntitiesUpgradeService.upgradeDatabase("3.6.2"); + dataUpdateService.updateData("3.6.2"); //TODO DON'T FORGET to update switch statement in the CacheCleanupService if you need to clear the cache break; default: diff --git a/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java index d5f150ce24..571d37f05b 100644 --- a/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java +++ b/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java @@ -189,6 +189,7 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe scannedComponent.setName(ruleNodeAnnotation.name()); scannedComponent.setScope(ruleNodeAnnotation.scope()); scannedComponent.setClusteringMode(ruleNodeAnnotation.clusteringMode()); + scannedComponent.setHasQueueName(ruleNodeAnnotation.hasQueueName()); NodeDefinition nodeDefinition = prepareNodeDefinition(clazz, ruleNodeAnnotation); ObjectNode configurationDescriptor = JacksonUtil.newObjectNode(); JsonNode node = JacksonUtil.valueToTree(nodeDefinition); diff --git a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java index c2444390f6..4a96d0d2c3 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java @@ -772,6 +772,9 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService } }); break; + case "3.6.2": + updateSchema("3.6.2", 3006002, "3.6.3", 3006003, null); + break; default: throw new RuntimeException("Unable to upgrade SQL database, unsupported fromVersion: " + fromVersion); } diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java index 7d08fdca00..c57d38b66d 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java @@ -227,11 +227,25 @@ public class DefaultDataUpdateService implements DataUpdateService { log.info("Updating data from version 3.6.0 to 3.6.1 ..."); migrateDeviceConnectivity(); break; + case "3.6.2": + updateRuleNodesQueueName(); + break; default: throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion); } } + private void updateRuleNodesQueueName() { + String[] ruleNodeTypes = { + "org.thingsboard.rule.engine.debug.TbMsgGeneratorNode", + "org.thingsboard.rule.engine.flow.TbCheckpointNode", + "org.thingsboard.rule.engine.deduplication.TbMsgDeduplicationNode" + }; + for (String ruleNodeType : ruleNodeTypes) { + ruleNodeQueueNameUpdater.updateEntities(ruleNodeType); + } + } + private void migrateEdgeEvents(String logPrefix) { boolean skipEdgeEventsMigration = getEnv("TB_SKIP_EDGE_EVENTS_MIGRATION", false); if (!skipEdgeEventsMigration) { @@ -792,4 +806,39 @@ public class DefaultDataUpdateService implements DataUpdateService { } } + private final PaginatedUpdater ruleNodeQueueNameUpdater = + new PaginatedUpdater<>() { + + @Override + protected String getName() { + return "RuleNode queue name updater"; + } + + @Override + protected boolean forceReportTotal() { + return true; + } + + @Override + protected PageData findEntities(String type, PageLink pageLink) { + return ruleChainService.findAllRuleNodesByType(type, pageLink); + } + + @Override + protected void updateEntity(RuleNode ruleNode) { + try { + ObjectNode configuration = (ObjectNode) ruleNode.getConfiguration(); + JsonNode queueName = configuration.remove("queueName"); + if (queueName != null) { + if (!queueName.isNull()) { + ruleNode.setQueueName(queueName.asText()); + } + ruleChainService.saveRuleNode(null, ruleNode); + } + } catch (Exception e) { + log.error("Unable to update RuleNode", e); + } + } + }; + } diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/RuleChainImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/RuleChainImportService.java index d2673595ff..f3a01913a2 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/RuleChainImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/RuleChainImportService.java @@ -15,13 +15,14 @@ */ package org.thingsboard.server.service.sync.ie.importing.impl; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.audit.ActionType; -import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; @@ -84,12 +85,14 @@ public class RuleChainImportService extends BaseEntityImportService { node.setRuleChainId(null); node.setExternalId(node.getId()); node.setId(null); + setQueueName(node); }); } @@ -105,6 +108,22 @@ public class RuleChainImportService extends BaseEntityImportService clazz = Class.forName(ruleNode.getType()); + org.thingsboard.rule.engine.api.RuleNode ruleNodeAnnotation = clazz.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class); + if (ruleNodeAnnotation.hasQueueName()) { + ObjectNode configuration = (ObjectNode) ruleNode.getConfiguration(); + JsonNode queueName = configuration.remove("queueName"); + if (queueName != null && !queueName.isNull()) { + ruleNode.setQueueName(queueName.asText()); + } + } + } catch (ClassNotFoundException e) { + log.warn("[{}] RuleNode class not found [{}]", ruleNode.getName(), ruleNode.getType()); + } + } + @Override protected RuleChain saveOrUpdate(EntitiesImportCtx ctx, RuleChain ruleChain, RuleChainExportData exportData, IdProvider idProvider) { ruleChain = ruleChainService.saveRuleChain(ruleChain); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentDescriptor.java b/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentDescriptor.java index aab0a8f858..0e9b31ec09 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentDescriptor.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentDescriptor.java @@ -55,6 +55,8 @@ public class ComponentDescriptor extends BaseData { @Length(fieldName = "actions") @ApiModelProperty(position = 10, value = "Rule Node Actions. Deprecated. Always null.", accessMode = ApiModelProperty.AccessMode.READ_ONLY) @Getter @Setter private String actions; + @ApiModelProperty(position = 11, value = "Indicates that the RuleNode is support queue name.", accessMode = ApiModelProperty.AccessMode.READ_ONLY, example = "true") + @Getter @Setter private boolean hasQueueName; public ComponentDescriptor() { super(); @@ -74,6 +76,7 @@ public class ComponentDescriptor extends BaseData { this.configurationDescriptor = plugin.getConfigurationDescriptor(); this.configurationVersion = plugin.getConfigurationVersion(); this.actions = plugin.getActions(); + this.hasQueueName = plugin.isHasQueueName(); } @ApiModelProperty(position = 1, value = "JSON object with the descriptor Id. " + @@ -104,6 +107,8 @@ public class ComponentDescriptor extends BaseData { if (!Objects.equals(actions, that.actions)) return false; if (!Objects.equals(configurationDescriptor, that.configurationDescriptor)) return false; if (configurationVersion != that.configurationVersion) return false; + if (clusteringMode != that.clusteringMode) return false; + if (hasQueueName != that.isHasQueueName()) return false; return Objects.equals(clazz, that.clazz); } @@ -115,6 +120,8 @@ public class ComponentDescriptor extends BaseData { result = 31 * result + (name != null ? name.hashCode() : 0); result = 31 * result + (clazz != null ? clazz.hashCode() : 0); result = 31 * result + (actions != null ? actions.hashCode() : 0); + result = 31 * result + (clusteringMode != null ? clusteringMode.hashCode() : 0); + result = 31 * result + (hasQueueName ? 1 : 0); return result; } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java index 3fc1ceb628..3123ab1577 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java @@ -50,9 +50,11 @@ public class RuleNode extends BaseDataWithAdditionalInfo implements private boolean debugMode; @ApiModelProperty(position = 7, value = "Enable/disable singleton mode. ", example = "false") private boolean singletonMode; - @ApiModelProperty(position = 8, value = "Version of rule node configuration. ", example = "0") + @ApiModelProperty(position = 8, value = "Queue name. ", example = "Main") + private String queueName; + @ApiModelProperty(position = 9, value = "Version of rule node configuration. ", example = "0") private int configurationVersion; - @ApiModelProperty(position = 9, value = "JSON with the rule node configuration. Structure depends on the rule node implementation.", dataType = "com.fasterxml.jackson.databind.JsonNode") + @ApiModelProperty(position = 10, value = "JSON with the rule node configuration. Structure depends on the rule node implementation.", dataType = "com.fasterxml.jackson.databind.JsonNode") private transient JsonNode configuration; @JsonIgnore private byte[] configurationBytes; diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java index 21cea59ee3..2a9672de76 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java @@ -354,6 +354,7 @@ public class ModelConstants { public static final String COMPONENT_DESCRIPTOR_CONFIGURATION_DESCRIPTOR_PROPERTY = "configuration_descriptor"; public static final String COMPONENT_DESCRIPTOR_CONFIGURATION_VERSION_PROPERTY = "configuration_version"; public static final String COMPONENT_DESCRIPTOR_ACTIONS_PROPERTY = "actions"; + public static final String COMPONENT_DESCRIPTOR_HAS_QUEUE_NAME_PROPERTY = "has_queue_name"; /** * Event constants. @@ -389,6 +390,7 @@ public class ModelConstants { public static final String DEBUG_MODE = "debug_mode"; public static final String SINGLETON_MODE = "singleton_mode"; + public static final String QUEUE_NAME = "queue_name"; /** * Rule chain constants. diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/ComponentDescriptorEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/ComponentDescriptorEntity.java index a476d69c97..c4fa05ef03 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/ComponentDescriptorEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/ComponentDescriptorEntity.java @@ -70,6 +70,9 @@ public class ComponentDescriptorEntity extends BaseSqlEntity { @Column(name = ModelConstants.SINGLETON_MODE) private boolean singletonMode; + @Column(name = ModelConstants.QUEUE_NAME) + private String queueName; + @Column(name = ModelConstants.EXTERNAL_ID_PROPERTY) private UUID externalId; @@ -84,6 +87,7 @@ public class RuleNodeEntity extends BaseSqlEntity { this.name = ruleNode.getName(); this.debugMode = ruleNode.isDebugMode(); this.singletonMode = ruleNode.isSingletonMode(); + this.queueName = ruleNode.getQueueName(); this.configurationVersion = ruleNode.getConfigurationVersion(); this.configuration = ruleNode.getConfiguration(); this.additionalInfo = ruleNode.getAdditionalInfo(); @@ -103,6 +107,7 @@ public class RuleNodeEntity extends BaseSqlEntity { ruleNode.setName(name); ruleNode.setDebugMode(debugMode); ruleNode.setSingletonMode(singletonMode); + ruleNode.setQueueName(queueName); ruleNode.setConfigurationVersion(configurationVersion); ruleNode.setConfiguration(configuration); ruleNode.setAdditionalInfo(additionalInfo); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/component/AbstractComponentDescriptorInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/component/AbstractComponentDescriptorInsertRepository.java index ad8a07be08..0c2587dd5b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/component/AbstractComponentDescriptorInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/component/AbstractComponentDescriptorInsertRepository.java @@ -77,7 +77,8 @@ public abstract class AbstractComponentDescriptorInsertRepository implements Com .setParameter("name", entity.getName()) .setParameter("scope", entity.getScope().name()) .setParameter("type", entity.getType().name()) - .setParameter("clustering_mode", entity.getClusteringMode().name()); + .setParameter("clustering_mode", entity.getClusteringMode().name()) + .setParameter("has_queue_name", entity.isHasQueueName()); } private ComponentDescriptorEntity processSaveOrUpdate(ComponentDescriptorEntity entity, String query) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/component/SqlComponentDescriptorInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/component/SqlComponentDescriptorInsertRepository.java index c462f263f1..5a3038ab0e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/component/SqlComponentDescriptorInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/component/SqlComponentDescriptorInsertRepository.java @@ -44,10 +44,10 @@ public class SqlComponentDescriptorInsertRepository extends AbstractComponentDes } private static String getInsertOrUpdateStatement(String conflictKeyStatement, String updateKeyStatement) { - return "INSERT INTO component_descriptor (id, created_time, actions, clazz, configuration_descriptor, configuration_version, name, scope, type, clustering_mode) VALUES (:id, :created_time, :actions, :clazz, :configuration_descriptor, :configuration_version, :name, :scope, :type, :clustering_mode) ON CONFLICT " + conflictKeyStatement + " DO UPDATE SET " + updateKeyStatement + " returning *"; + return "INSERT INTO component_descriptor (id, created_time, actions, clazz, configuration_descriptor, configuration_version, name, scope, type, clustering_mode, has_queue_name) VALUES (:id, :created_time, :actions, :clazz, :configuration_descriptor, :configuration_version, :name, :scope, :type, :clustering_mode, :has_queue_name) ON CONFLICT " + conflictKeyStatement + " DO UPDATE SET " + updateKeyStatement + " returning *"; } private static String getUpdateStatement(String id) { - return "actions = :actions, " + id + ",created_time = :created_time, configuration_descriptor = :configuration_descriptor, configuration_version = :configuration_version, name = :name, scope = :scope, type = :type, clustering_mode = :clustering_mode"; + return "actions = :actions, " + id + ",created_time = :created_time, configuration_descriptor = :configuration_descriptor, configuration_version = :configuration_version, name = :name, scope = :scope, type = :type, clustering_mode = :clustering_mode, has_queue_name = :has_queue_name"; } } diff --git a/dao/src/main/resources/sql/schema-entities.sql b/dao/src/main/resources/sql/schema-entities.sql index 35d1d49cda..402e0d23b5 100644 --- a/dao/src/main/resources/sql/schema-entities.sql +++ b/dao/src/main/resources/sql/schema-entities.sql @@ -126,7 +126,8 @@ CREATE TABLE IF NOT EXISTS component_descriptor ( name varchar(255), scope varchar(255), type varchar(255), - clustering_mode varchar(255) + clustering_mode varchar(255), + has_queue_name boolean DEFAULT false ); CREATE TABLE IF NOT EXISTS customer ( @@ -187,6 +188,7 @@ CREATE TABLE IF NOT EXISTS rule_node ( name varchar(255), debug_mode boolean, singleton_mode boolean, + queue_name varchar(255), external_id uuid ); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleNode.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleNode.java index 7037f2f3fe..65fe9457ca 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleNode.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleNode.java @@ -42,6 +42,8 @@ public @interface RuleNode { ComponentClusteringMode clusteringMode() default ComponentClusteringMode.ENABLED; + boolean hasQueueName() default false; + boolean inEnabled() default true; boolean outEnabled() default true; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java index b31c98bc0f..0b60f38981 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java @@ -50,6 +50,7 @@ import static org.thingsboard.common.util.DonAsynchron.withCallback; type = ComponentType.ACTION, name = "generator", configClazz = TbMsgGeneratorNodeConfiguration.class, + hasQueueName = true, nodeDescription = "Periodically generates messages", nodeDetails = "Generates messages with configurable period. Javascript function used for message generation.", inEnabled = false, @@ -69,6 +70,7 @@ public class TbMsgGeneratorNode implements TbNode { private UUID nextTickId; private TbMsg prevMsg; private final AtomicBoolean initialized = new AtomicBoolean(false); + private String queueName; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { @@ -76,6 +78,7 @@ public class TbMsgGeneratorNode implements TbNode { this.config = TbNodeUtils.convert(configuration, TbMsgGeneratorNodeConfiguration.class); this.delay = TimeUnit.SECONDS.toMillis(config.getPeriodInSeconds()); this.currentMsgCount = 0; + this.queueName = ctx.getSelf().getQueueName(); if (!StringUtils.isEmpty(config.getOriginatorId())) { originatorId = EntityIdFactory.getByTypeAndUuid(config.getOriginatorType(), config.getOriginatorId()); ctx.checkTenantEntity(originatorId); @@ -137,7 +140,7 @@ public class TbMsgGeneratorNode implements TbNode { } lastScheduledTs = lastScheduledTs + delay; long curDelay = Math.max(0L, (lastScheduledTs - curTs)); - TbMsg tickMsg = ctx.newMsg(config.getQueueName(), TbMsgType.GENERATOR_NODE_SELF_MSG, ctx.getSelfId(), + TbMsg tickMsg = ctx.newMsg(queueName, TbMsgType.GENERATOR_NODE_SELF_MSG, ctx.getSelfId(), getCustomerIdFromMsg(msg), TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING); nextTickId = tickMsg.getId(); ctx.tellSelf(tickMsg, curDelay); @@ -146,14 +149,14 @@ public class TbMsgGeneratorNode implements TbNode { private ListenableFuture generate(TbContext ctx, TbMsg msg) { log.trace("generate, config {}", config); if (prevMsg == null) { - prevMsg = ctx.newMsg(config.getQueueName(), TbMsg.EMPTY_STRING, originatorId, msg.getCustomerId(), TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + prevMsg = ctx.newMsg(queueName, TbMsg.EMPTY_STRING, originatorId, msg.getCustomerId(), TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); } if (initialized.get()) { ctx.logJsEvalRequest(); return Futures.transformAsync(scriptEngine.executeGenerateAsync(prevMsg), generated -> { log.trace("generate process response, generated {}, config {}", generated, config); ctx.logJsEvalResponse(); - prevMsg = ctx.newMsg(config.getQueueName(), generated.getType(), originatorId, msg.getCustomerId(), generated.getMetaData(), generated.getData()); + prevMsg = ctx.newMsg(queueName, generated.getType(), originatorId, msg.getCustomerId(), generated.getMetaData(), generated.getData()); return Futures.immediateFuture(prevMsg); }, MoreExecutors.directExecutor()); //usually it runs on js-executor-remote-callback thread pool } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeConfiguration.java index 5f12603832..027f4aa708 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeConfiguration.java @@ -36,7 +36,6 @@ public class TbMsgGeneratorNodeConfiguration implements NodeConfiguration deduplicationMap; private long deduplicationInterval; + private String queueName; public TbMsgDeduplicationNode() { this.deduplicationMap = new HashMap<>(); @@ -75,6 +77,7 @@ public class TbMsgDeduplicationNode implements TbNode { public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { this.config = TbNodeUtils.convert(configuration, TbMsgDeduplicationNodeConfiguration.class); this.deduplicationInterval = TimeUnit.SECONDS.toMillis(config.getInterval()); + this.queueName = ctx.getSelf().getQueueName(); } @Override @@ -132,7 +135,7 @@ public class TbMsgDeduplicationNode implements TbNode { } } deduplicationResults.add(TbMsg.newMsg( - config.getQueueName(), + queueName, config.getOutMsgType(), deduplicationId, getMetadata(), diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNodeConfiguration.java index 9560247033..08e81d8a6e 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNodeConfiguration.java @@ -26,7 +26,6 @@ public class TbMsgDeduplicationNodeConfiguration implements NodeConfiguration { - - private String queueName; - - @Override - public TbCheckpointNodeConfiguration defaultConfiguration() { - return new TbCheckpointNodeConfiguration(); - } -} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java index be6aea6fa5..4e3ae2a015 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java @@ -40,6 +40,7 @@ import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.msg.TbNodeConnectionType; +import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -241,7 +242,7 @@ public class TbMsgDeduplicationNodeTest { config.setInterval(deduplicationInterval); config.setStrategy(DeduplicationStrategy.ALL); config.setOutMsgType(TbMsgType.POST_ATTRIBUTES_REQUEST.name()); - config.setQueueName(DataConstants.HP_QUEUE_NAME); + setQueueName(DataConstants.HP_QUEUE_NAME); nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); node.init(ctx, nodeConfiguration); @@ -268,7 +269,7 @@ public class TbMsgDeduplicationNodeTest { Assertions.assertEquals(getMergedData(inputMsgs), outMessage.getData()); Assertions.assertEquals(deviceId, outMessage.getOriginator()); Assertions.assertEquals(config.getOutMsgType(), outMessage.getType()); - Assertions.assertEquals(config.getQueueName(), outMessage.getQueueName()); + Assertions.assertEquals(DataConstants.HP_QUEUE_NAME, outMessage.getQueueName()); } @Test @@ -281,7 +282,7 @@ public class TbMsgDeduplicationNodeTest { config.setInterval(deduplicationInterval); config.setStrategy(DeduplicationStrategy.ALL); config.setOutMsgType(TbMsgType.POST_ATTRIBUTES_REQUEST.name()); - config.setQueueName(DataConstants.HP_QUEUE_NAME); + setQueueName(DataConstants.HP_QUEUE_NAME); nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); node.init(ctx, nodeConfiguration); @@ -316,13 +317,13 @@ public class TbMsgDeduplicationNodeTest { Assertions.assertEquals(getMergedData(firstMsgPack), firstMsg.getData()); Assertions.assertEquals(deviceId, firstMsg.getOriginator()); Assertions.assertEquals(config.getOutMsgType(), firstMsg.getType()); - Assertions.assertEquals(config.getQueueName(), firstMsg.getQueueName()); + Assertions.assertEquals(DataConstants.HP_QUEUE_NAME, firstMsg.getQueueName()); TbMsg secondMsg = resultMsgs.get(1); Assertions.assertEquals(getMergedData(secondMsgPack), secondMsg.getData()); Assertions.assertEquals(deviceId, secondMsg.getOriginator()); Assertions.assertEquals(config.getOutMsgType(), secondMsg.getType()); - Assertions.assertEquals(config.getQueueName(), secondMsg.getQueueName()); + Assertions.assertEquals(DataConstants.HP_QUEUE_NAME, secondMsg.getQueueName()); } @Test @@ -429,4 +430,10 @@ public class TbMsgDeduplicationNodeTest { return JacksonUtil.toString(mergedData); } + private void setQueueName(String queueName) { + RuleNode ruleNode = new RuleNode(); + ruleNode.setQueueName(queueName); + when(ctx.getSelf()).thenReturn(ruleNode); + } + } From b65391e6ac5fc1cb3ec861feb371db1d3041ca02 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 12 Dec 2023 16:47:19 +0100 Subject: [PATCH 2/8] fixed TbMsgDeduplicationNodeTest --- .../rule/engine/transform/TbMsgDeduplicationNodeTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java index 4e3ae2a015..ac3f2acfd5 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java @@ -94,6 +94,7 @@ public class TbMsgDeduplicationNodeTest { when(ctx.getSelfId()).thenReturn(ruleNodeId); when(ctx.getTenantId()).thenReturn(tenantId); + when(ctx.getSelf()).thenReturn(new RuleNode()); doAnswer((Answer) invocationOnMock -> { TbMsgType type = (TbMsgType) (invocationOnMock.getArguments())[1]; From e8dc2ddc2d388936a7d7d1b4351b793c74b5eeb7 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 12 Dec 2023 20:47:25 +0100 Subject: [PATCH 3/8] used TbNodeUpgradeUtils instead of rule-node upgrade --- .../install/ThingsboardInstallService.java | 1 - .../update/DefaultDataUpdateService.java | 49 ------------------- .../impl/RuleChainImportService.java | 20 -------- .../server/utils/TbNodeUpgradeUtils.java | 6 +++ .../rule/engine/api/util/TbNodeUtils.java | 1 + .../rule/engine/debug/TbMsgGeneratorNode.java | 21 ++++++++ .../deduplication/TbMsgDeduplicationNode.java | 20 ++++++++ .../rule/engine/flow/TbCheckpointNode.java | 22 +++++++++ 8 files changed, 70 insertions(+), 70 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java index 1be7fdecc4..054c7a6751 100644 --- a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java +++ b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java @@ -281,7 +281,6 @@ public class ThingsboardInstallService { case "3.6.2": log.info("Upgrading ThingsBoard from version 3.6.2 to 3.6.3 ..."); databaseEntitiesUpgradeService.upgradeDatabase("3.6.2"); - dataUpdateService.updateData("3.6.2"); //TODO DON'T FORGET to update switch statement in the CacheCleanupService if you need to clear the cache break; default: diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java index c57d38b66d..7d08fdca00 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java @@ -227,25 +227,11 @@ public class DefaultDataUpdateService implements DataUpdateService { log.info("Updating data from version 3.6.0 to 3.6.1 ..."); migrateDeviceConnectivity(); break; - case "3.6.2": - updateRuleNodesQueueName(); - break; default: throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion); } } - private void updateRuleNodesQueueName() { - String[] ruleNodeTypes = { - "org.thingsboard.rule.engine.debug.TbMsgGeneratorNode", - "org.thingsboard.rule.engine.flow.TbCheckpointNode", - "org.thingsboard.rule.engine.deduplication.TbMsgDeduplicationNode" - }; - for (String ruleNodeType : ruleNodeTypes) { - ruleNodeQueueNameUpdater.updateEntities(ruleNodeType); - } - } - private void migrateEdgeEvents(String logPrefix) { boolean skipEdgeEventsMigration = getEnv("TB_SKIP_EDGE_EVENTS_MIGRATION", false); if (!skipEdgeEventsMigration) { @@ -806,39 +792,4 @@ public class DefaultDataUpdateService implements DataUpdateService { } } - private final PaginatedUpdater ruleNodeQueueNameUpdater = - new PaginatedUpdater<>() { - - @Override - protected String getName() { - return "RuleNode queue name updater"; - } - - @Override - protected boolean forceReportTotal() { - return true; - } - - @Override - protected PageData findEntities(String type, PageLink pageLink) { - return ruleChainService.findAllRuleNodesByType(type, pageLink); - } - - @Override - protected void updateEntity(RuleNode ruleNode) { - try { - ObjectNode configuration = (ObjectNode) ruleNode.getConfiguration(); - JsonNode queueName = configuration.remove("queueName"); - if (queueName != null) { - if (!queueName.isNull()) { - ruleNode.setQueueName(queueName.asText()); - } - ruleChainService.saveRuleNode(null, ruleNode); - } - } catch (Exception e) { - log.error("Unable to update RuleNode", e); - } - } - }; - } diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/RuleChainImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/RuleChainImportService.java index f3a01913a2..bb930f9634 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/RuleChainImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/RuleChainImportService.java @@ -15,8 +15,6 @@ */ package org.thingsboard.server.service.sync.ie.importing.impl; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -85,14 +83,12 @@ public class RuleChainImportService extends BaseEntityImportService { node.setRuleChainId(null); node.setExternalId(node.getId()); node.setId(null); - setQueueName(node); }); } @@ -108,22 +104,6 @@ public class RuleChainImportService extends BaseEntityImportService clazz = Class.forName(ruleNode.getType()); - org.thingsboard.rule.engine.api.RuleNode ruleNodeAnnotation = clazz.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class); - if (ruleNodeAnnotation.hasQueueName()) { - ObjectNode configuration = (ObjectNode) ruleNode.getConfiguration(); - JsonNode queueName = configuration.remove("queueName"); - if (queueName != null && !queueName.isNull()) { - ruleNode.setQueueName(queueName.asText()); - } - } - } catch (ClassNotFoundException e) { - log.warn("[{}] RuleNode class not found [{}]", ruleNode.getName(), ruleNode.getType()); - } - } - @Override protected RuleChain saveOrUpdate(EntitiesImportCtx ctx, RuleChain ruleChain, RuleChainExportData exportData, IdProvider idProvider) { ruleChain = ruleChainService.saveRuleChain(ruleChain); diff --git a/application/src/main/java/org/thingsboard/server/utils/TbNodeUpgradeUtils.java b/application/src/main/java/org/thingsboard/server/utils/TbNodeUpgradeUtils.java index 530679683d..23c75b430b 100644 --- a/application/src/main/java/org/thingsboard/server/utils/TbNodeUpgradeUtils.java +++ b/application/src/main/java/org/thingsboard/server/utils/TbNodeUpgradeUtils.java @@ -25,6 +25,8 @@ import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.service.component.RuleNodeClassInfo; +import static org.thingsboard.rule.engine.api.util.TbNodeUtils.QUEUE_NAME; + @Slf4j public class TbNodeUpgradeUtils { @@ -44,9 +46,13 @@ public class TbNodeUpgradeUtils { } else { var tbVersionedNode = getTbVersionedNode(nodeInfo); try { + JsonNode queueName = oldConfiguration.get(QUEUE_NAME); TbPair upgradeResult = tbVersionedNode.upgrade(configurationVersion, oldConfiguration); if (upgradeResult.getFirst()) { node.setConfiguration(upgradeResult.getSecond()); + if (nodeInfo.getAnnotation().hasQueueName() && queueName != null && queueName.isTextual()) { + node.setQueueName(queueName.asText()); + } } } catch (Exception e) { try { diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java index 396d810b00..cce5649336 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java @@ -36,6 +36,7 @@ import java.util.stream.Collectors; */ public class TbNodeUtils { + public static final String QUEUE_NAME = "queueName"; private static final Pattern DATA_PATTERN = Pattern.compile("(\\$\\[)(.*?)(])"); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java index 0b60f38981..80170a28f1 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java @@ -15,6 +15,8 @@ */ package org.thingsboard.rule.engine.debug; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; @@ -35,6 +37,7 @@ import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.script.ScriptLanguage; +import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; @@ -44,12 +47,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static org.thingsboard.common.util.DonAsynchron.withCallback; +import static org.thingsboard.rule.engine.api.util.TbNodeUtils.QUEUE_NAME; @Slf4j @RuleNode( type = ComponentType.ACTION, name = "generator", configClazz = TbMsgGeneratorNodeConfiguration.class, + version = 1, hasQueueName = true, nodeDescription = "Periodically generates messages", nodeDetails = "Generates messages with configurable period. Javascript function used for message generation.", @@ -177,4 +182,20 @@ public class TbMsgGeneratorNode implements TbNode { scriptEngine = null; } } + + @Override + public TbPair upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException { + boolean hasChanges = false; + switch (fromVersion) { + case 0: + if (oldConfiguration.has(QUEUE_NAME)) { + hasChanges = true; + ((ObjectNode) oldConfiguration).remove(QUEUE_NAME); + } + break; + default: + break; + } + return new TbPair<>(hasChanges, oldConfiguration); + } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java index a7a8b44b21..8f818583e8 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java @@ -15,6 +15,7 @@ */ package org.thingsboard.rule.engine.deduplication; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.extern.slf4j.Slf4j; @@ -43,10 +44,13 @@ import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import static org.thingsboard.rule.engine.api.util.TbNodeUtils.QUEUE_NAME; + @RuleNode( type = ComponentType.TRANSFORMATION, name = "deduplication", configClazz = TbMsgDeduplicationNodeConfiguration.class, + version = 1, hasQueueName = true, nodeDescription = "Deduplicate messages within the same originator entity for a configurable period " + "based on a specified deduplication strategy.", @@ -94,6 +98,22 @@ public class TbMsgDeduplicationNode implements TbNode { deduplicationMap.clear(); } + @Override + public TbPair upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException { + boolean hasChanges = false; + switch (fromVersion) { + case 0: + if (oldConfiguration.has(QUEUE_NAME)) { + hasChanges = true; + ((ObjectNode) oldConfiguration).remove(QUEUE_NAME); + } + break; + default: + break; + } + return new TbPair<>(hasChanges, oldConfiguration); + } + private void processOnRegularMsg(TbContext ctx, TbMsg msg) { EntityId id = msg.getOriginator(); DeduplicationData deduplicationMsgs = deduplicationMap.computeIfAbsent(id, k -> new DeduplicationData()); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java index eb658ada68..8fe2de58b2 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java @@ -15,6 +15,8 @@ */ package org.thingsboard.rule.engine.flow; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.extern.slf4j.Slf4j; import org.thingsboard.rule.engine.api.EmptyNodeConfiguration; import org.thingsboard.rule.engine.api.RuleNode; @@ -25,13 +27,17 @@ import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.TbMsg; +import static org.thingsboard.rule.engine.api.util.TbNodeUtils.QUEUE_NAME; + @Slf4j @RuleNode( type = ComponentType.FLOW, name = "checkpoint", configClazz = EmptyNodeConfiguration.class, + version = 1, hasQueueName = true, nodeDescription = "transfers the message to another queue", nodeDetails = "After successful transfer incoming message is automatically acknowledged. Queue name is configurable.", @@ -52,4 +58,20 @@ public class TbCheckpointNode implements TbNode { ctx.enqueueForTellNext(msg, queueName, TbNodeConnectionType.SUCCESS, () -> ctx.ack(msg), error -> ctx.tellFailure(msg, error)); } + @Override + public TbPair upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException { + boolean hasChanges = false; + switch (fromVersion) { + case 0: + if (oldConfiguration.has(QUEUE_NAME)) { + hasChanges = true; + ((ObjectNode) oldConfiguration).remove(QUEUE_NAME); + } + break; + default: + break; + } + return new TbPair<>(hasChanges, oldConfiguration); + } + } From 1e2c7f95791f4340e1156608634b19f7089f94a9 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 13 Dec 2023 10:35:13 +0100 Subject: [PATCH 4/8] refactoring --- .../server/actors/ruleChain/DefaultTbContext.java | 9 +++++++-- .../org/thingsboard/rule/engine/api/TbContext.java | 2 ++ .../rule/engine/debug/TbMsgGeneratorNode.java | 2 +- .../engine/deduplication/TbMsgDeduplicationNode.java | 2 +- .../rule/engine/flow/TbCheckpointNode.java | 2 +- .../engine/transform/TbMsgDeduplicationNodeTest.java | 11 ++--------- 6 files changed, 14 insertions(+), 14 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index bf61cf7a59..0d1a65d899 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -185,7 +185,7 @@ class DefaultTbContext implements TbContext { @Override public void enqueue(TbMsg tbMsg, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, nodeCtx.getSelf().getQueueName(), getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator()); enqueue(tpi, tbMsg, onFailure, onSuccess); } @@ -310,7 +310,7 @@ class DefaultTbContext implements TbContext { @Override public boolean isLocalEntity(EntityId entityId) { - return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, nodeCtx.getSelf().getQueueName(), getTenantId(), entityId).isMyPartition(); + return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), entityId).isMyPartition(); } private void scheduleMsgWithDelay(TbActorMsg msg, long delayInMs, TbActorRef target) { @@ -509,6 +509,11 @@ class DefaultTbContext implements TbContext { return ruleChainName; } + @Override + public String getQueueName() { + return getSelf().getQueueName(); + } + @Override public TenantId getTenantId() { return nodeCtx.getTenantId(); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 40ed2c378a..241e20115f 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -255,6 +255,8 @@ public interface TbContext { String getRuleChainName(); + String getQueueName(); + TenantId getTenantId(); AttributesService getAttributesService(); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java index 80170a28f1..cec7aca0a3 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java @@ -83,7 +83,7 @@ public class TbMsgGeneratorNode implements TbNode { this.config = TbNodeUtils.convert(configuration, TbMsgGeneratorNodeConfiguration.class); this.delay = TimeUnit.SECONDS.toMillis(config.getPeriodInSeconds()); this.currentMsgCount = 0; - this.queueName = ctx.getSelf().getQueueName(); + this.queueName = ctx.getQueueName(); if (!StringUtils.isEmpty(config.getOriginatorId())) { originatorId = EntityIdFactory.getByTypeAndUuid(config.getOriginatorType(), config.getOriginatorId()); ctx.checkTenantEntity(originatorId); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java index 8f818583e8..17b72cd27a 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java @@ -81,7 +81,7 @@ public class TbMsgDeduplicationNode implements TbNode { public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { this.config = TbNodeUtils.convert(configuration, TbMsgDeduplicationNodeConfiguration.class); this.deduplicationInterval = TimeUnit.SECONDS.toMillis(config.getInterval()); - this.queueName = ctx.getSelf().getQueueName(); + this.queueName = ctx.getQueueName(); } @Override diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java index 8fe2de58b2..9616d8a167 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java @@ -50,7 +50,7 @@ public class TbCheckpointNode implements TbNode { @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { - this.queueName = ctx.getSelf().getQueueName(); + this.queueName = ctx.getQueueName(); } @Override diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java index ac3f2acfd5..8e8340c713 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java @@ -94,7 +94,6 @@ public class TbMsgDeduplicationNodeTest { when(ctx.getSelfId()).thenReturn(ruleNodeId); when(ctx.getTenantId()).thenReturn(tenantId); - when(ctx.getSelf()).thenReturn(new RuleNode()); doAnswer((Answer) invocationOnMock -> { TbMsgType type = (TbMsgType) (invocationOnMock.getArguments())[1]; @@ -240,10 +239,10 @@ public class TbMsgDeduplicationNodeTest { awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); invokeTellSelf(wantedNumberOfTellSelfInvocation); + when(ctx.getQueueName()).thenReturn(DataConstants.HP_QUEUE_NAME); config.setInterval(deduplicationInterval); config.setStrategy(DeduplicationStrategy.ALL); config.setOutMsgType(TbMsgType.POST_ATTRIBUTES_REQUEST.name()); - setQueueName(DataConstants.HP_QUEUE_NAME); nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); node.init(ctx, nodeConfiguration); @@ -280,10 +279,10 @@ public class TbMsgDeduplicationNodeTest { awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); invokeTellSelf(wantedNumberOfTellSelfInvocation, true, 3); + when(ctx.getQueueName()).thenReturn(DataConstants.HP_QUEUE_NAME); config.setInterval(deduplicationInterval); config.setStrategy(DeduplicationStrategy.ALL); config.setOutMsgType(TbMsgType.POST_ATTRIBUTES_REQUEST.name()); - setQueueName(DataConstants.HP_QUEUE_NAME); nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); node.init(ctx, nodeConfiguration); @@ -431,10 +430,4 @@ public class TbMsgDeduplicationNodeTest { return JacksonUtil.toString(mergedData); } - private void setQueueName(String queueName) { - RuleNode ruleNode = new RuleNode(); - ruleNode.setQueueName(queueName); - when(ctx.getSelf()).thenReturn(ruleNode); - } - } From 65a8b0a34c4972793f1c41f32761853d640fd871 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 13 Dec 2023 23:13:05 +0100 Subject: [PATCH 5/8] added test for singleton mqtt node --- .../server/msa/ContainerTestSuite.java | 2 + .../server/msa/TestProperties.java | 9 + .../server/msa/TestRestClient.java | 41 ++++ .../server/msa/rule/node/MqttNodeTest.java | 204 ++++++++++++++++++ .../resources/MqttRuleNodeTestMetadata.json | 79 +++++++ .../resources/docker-compose.mosquitto.yml | 25 +++ .../test/resources/mosquitto/mosquitto.conf | 18 ++ 7 files changed, 378 insertions(+) create mode 100644 msa/black-box-tests/src/test/java/org/thingsboard/server/msa/rule/node/MqttNodeTest.java create mode 100644 msa/black-box-tests/src/test/resources/MqttRuleNodeTestMetadata.json create mode 100644 msa/black-box-tests/src/test/resources/docker-compose.mosquitto.yml create mode 100644 msa/black-box-tests/src/test/resources/mosquitto/mosquitto.conf diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java index dd3948c495..142b42e5b2 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java @@ -107,6 +107,7 @@ public class ContainerTestSuite { List composeFiles = new ArrayList<>(Arrays.asList( new File(targetDir + "docker-compose.yml"), new File(targetDir + "docker-compose.volumes.yml"), + new File(targetDir + "docker-compose.mosquitto.yml"), new File(targetDir + (IS_HYBRID_MODE ? "docker-compose.hybrid.yml" : "docker-compose.postgres.yml")), new File(targetDir + (IS_HYBRID_MODE ? "docker-compose.hybrid-test-extras.yml" : "docker-compose.postgres-test-extras.yml")), new File(targetDir + "docker-compose.postgres.volumes.yml"), @@ -162,6 +163,7 @@ public class ContainerTestSuite { .withEnv(queueEnv) .withEnv("LOAD_BALANCER_NAME", "") .withExposedService("haproxy", 80, Wait.forHttp("/swagger-ui.html").withStartupTimeout(CONTAINER_STARTUP_TIMEOUT)) + .withExposedService("broker", 1883) .waitingFor("tb-core1", Wait.forLogMessage(TB_CORE_LOG_REGEXP, 1).withStartupTimeout(CONTAINER_STARTUP_TIMEOUT)) .waitingFor("tb-core2", Wait.forLogMessage(TB_CORE_LOG_REGEXP, 1).withStartupTimeout(CONTAINER_STARTUP_TIMEOUT)) .waitingFor("tb-http-transport1", Wait.forLogMessage(TRANSPORTS_LOG_REGEXP, 1).withStartupTimeout(CONTAINER_STARTUP_TIMEOUT)) diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestProperties.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestProperties.java index a70d1022ce..26f9da3618 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestProperties.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestProperties.java @@ -48,4 +48,13 @@ public class TestProperties { } return System.getProperty("tb.wsUrl", "ws://localhost:8080"); } + + public static String getMqttBrokerUrl() { + if (instance.isActive()) { + String host = instance.getTestContainer().getServiceHost("broker", 1883); + Integer port = instance.getTestContainer().getServicePort("broker", 1883); + return "tcp://" + host + ":" + port; + } + return System.getProperty("mqtt.broker", "tcp://localhost:1883"); + } } diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java index f04331e01e..a675ef4a20 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java @@ -31,10 +31,12 @@ import org.thingsboard.server.common.data.Dashboard; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.EntityView; +import org.thingsboard.server.common.data.EventInfo; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.asset.AssetProfile; +import org.thingsboard.server.common.data.event.EventType; import org.thingsboard.server.common.data.id.AlarmId; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.AssetProfileId; @@ -45,9 +47,11 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.RuleChainId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.rule.RuleChain; @@ -527,4 +531,41 @@ public class TestRestClient { .then() .statusCode(HTTP_OK); } + + public PageData getEvents(EntityId entityId, EventType eventType, TenantId tenantId, TimePageLink pageLink) { + Map params = new HashMap<>(); + params.put("entityType", entityId.getEntityType().name()); + params.put("entityId", entityId.getId().toString()); + params.put("eventType", eventType.name()); + params.put("tenantId", tenantId.getId().toString()); + addTimePageLinkToParam(params, pageLink); + + return given().spec(requestSpec) + .get("/api/events/{entityType}/{entityId}/{eventType}?tenantId={tenantId}&" + getTimeUrlParams(pageLink), params) + .then() + .statusCode(HTTP_OK) + .extract() + .as(new TypeRef<>() {}); + } + + private void addTimePageLinkToParam(Map params, TimePageLink pageLink) { + this.addPageLinkToParam(params, pageLink); + if (pageLink.getStartTime() != null) { + params.put("startTime", String.valueOf(pageLink.getStartTime())); + } + if (pageLink.getEndTime() != null) { + params.put("endTime", String.valueOf(pageLink.getEndTime())); + } + } + + private String getTimeUrlParams(TimePageLink pageLink) { + String urlParams = getUrlParams(pageLink); + if (pageLink.getStartTime() != null) { + urlParams += "&startTime={startTime}"; + } + if (pageLink.getEndTime() != null) { + urlParams += "&endTime={endTime}"; + } + return urlParams; + } } diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/rule/node/MqttNodeTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/rule/node/MqttNodeTest.java new file mode 100644 index 0000000000..5aa0ae48fb --- /dev/null +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/rule/node/MqttNodeTest.java @@ -0,0 +1,204 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.msa.rule.node; + +import com.fasterxml.jackson.databind.JsonNode; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; +import org.eclipse.paho.client.mqttv3.IMqttMessageListener; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.EventInfo; +import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.event.EventType; +import org.thingsboard.server.common.data.id.RuleChainId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.page.TimePageLink; +import org.thingsboard.server.common.data.rule.NodeConnectionInfo; +import org.thingsboard.server.common.data.rule.RuleChain; +import org.thingsboard.server.common.data.rule.RuleChainMetaData; +import org.thingsboard.server.common.data.rule.RuleNode; +import org.thingsboard.server.common.data.security.DeviceCredentials; +import org.thingsboard.server.msa.AbstractContainerTest; +import org.thingsboard.server.msa.DisableUIListeners; +import org.thingsboard.server.msa.TestProperties; +import org.thingsboard.server.msa.WsClient; +import org.thingsboard.server.msa.mapper.WsTelemetryResponse; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.fail; +import static org.thingsboard.server.msa.prototypes.DevicePrototypes.defaultDevicePrototype; + +@DisableUIListeners +@Slf4j +public class MqttNodeTest extends AbstractContainerTest { + + private static final String TOPIC = "tb/mqtt/device"; + + private Device device; + + @BeforeMethod + public void setUp() { + testRestClient.login("tenant@thingsboard.org", "tenant"); + device = testRestClient.postDevice("", defaultDevicePrototype("mqtt_")); + } + + @AfterMethod + public void tearDown() { + testRestClient.deleteDeviceIfExists(device.getId()); + } + + @Test + public void telemetryUpload() throws Exception { + RuleChainId defaultRuleChainId = getDefaultRuleChainId(); + + createRootRuleChainWithTestNode("MqttRuleNodeTestMetadata.json", "org.thingsboard.rule.engine.mqtt.TbMqttNode", 2); + + DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId()); + + WsClient wsClient = subscribeToWebSocket(device.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS); + + MqttMessageListener messageListener = new MqttMessageListener(); + MqttClient responseClient = new MqttClient(TestProperties.getMqttBrokerUrl(), StringUtils.randomAlphanumeric(10), new MemoryPersistence()); + responseClient.connect(); + responseClient.subscribe(TOPIC, messageListener); + + MqttClient mqttClient = new MqttClient("tcp://localhost:1883", StringUtils.randomAlphanumeric(10), new MemoryPersistence()); + MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); + mqttConnectOptions.setUserName(deviceCredentials.getCredentialsId()); + mqttClient.connect(mqttConnectOptions); + mqttClient.publish("v1/devices/me/telemetry", new MqttMessage(createPayload().toString().getBytes())); + + WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage(); + log.info("Received telemetry: {}", actualLatestTelemetry); + wsClient.closeBlocking(); + + assertThat(actualLatestTelemetry.getData()).hasSize(4); + assertThat(actualLatestTelemetry.getLatestValues().keySet()).containsOnlyOnceElementsOf(Arrays.asList("booleanKey", "stringKey", "doubleKey", "longKey")); + + assertThat(actualLatestTelemetry.getDataValuesByKey("booleanKey").get(1)).isEqualTo(Boolean.TRUE.toString()); + assertThat(actualLatestTelemetry.getDataValuesByKey("stringKey").get(1)).isEqualTo("value1"); + assertThat(actualLatestTelemetry.getDataValuesByKey("doubleKey").get(1)).isEqualTo(Double.toString(42.0)); + assertThat(actualLatestTelemetry.getDataValuesByKey("longKey").get(1)).isEqualTo(Long.toString(73)); + + Awaitility + .await() + .alias("Get integration events") + .atMost(10, TimeUnit.SECONDS) + .until(() -> messageListener.getEvents().size() > 0); + + BlockingQueue events = messageListener.getEvents(); + JsonNode actual = JacksonUtil.toJsonNode(Objects.requireNonNull(events.poll()).message); + + assertThat(actual.get("stringKey").asText()).isEqualTo("value1"); + assertThat(actual.get("booleanKey").asBoolean()).isEqualTo(Boolean.TRUE); + assertThat(actual.get("doubleKey").asDouble()).isEqualTo(42.0); + assertThat(actual.get("longKey").asLong()).isEqualTo(73); + + testRestClient.setRootRuleChain(defaultRuleChainId); + } + + @Data + private class MqttMessageListener implements IMqttMessageListener { + private final BlockingQueue events; + + private MqttMessageListener() { + events = new ArrayBlockingQueue<>(100); + } + + @Override + public void messageArrived(String s, MqttMessage mqttMessage) { + log.info("MQTT message [{}], topic [{}]", mqttMessage.toString(), s); + events.add(new MqttEvent(s, mqttMessage.toString())); + } + + public BlockingQueue getEvents() { + return events; + } + } + + @Data + private class MqttEvent { + private final String topic; + private final String message; + } + + private RuleChainId getDefaultRuleChainId() { + PageData ruleChains = testRestClient.getRuleChains(new PageLink(40, 0)); + + Optional defaultRuleChain = ruleChains.getData() + .stream() + .filter(RuleChain::isRoot) + .findFirst(); + if (!defaultRuleChain.isPresent()) { + fail("Root rule chain wasn't found"); + } + return defaultRuleChain.get().getId(); + } + + protected RuleChainId createRootRuleChainWithTestNode(String ruleChainMetadataFile, String ruleNodeType, int eventsCount) throws Exception { + RuleChain newRuleChain = new RuleChain(); + newRuleChain.setName("testRuleChain"); + RuleChain ruleChain = testRestClient.postRuleChain(newRuleChain); + + JsonNode configuration = JacksonUtil.OBJECT_MAPPER.readTree(this.getClass().getClassLoader().getResourceAsStream(ruleChainMetadataFile)); + RuleChainMetaData ruleChainMetaData = new RuleChainMetaData(); + ruleChainMetaData.setRuleChainId(ruleChain.getId()); + ruleChainMetaData.setFirstNodeIndex(configuration.get("firstNodeIndex").asInt()); + ruleChainMetaData.setNodes(Arrays.asList(JacksonUtil.OBJECT_MAPPER.treeToValue(configuration.get("nodes"), RuleNode[].class))); + ruleChainMetaData.setConnections(Arrays.asList(JacksonUtil.OBJECT_MAPPER.treeToValue(configuration.get("connections"), NodeConnectionInfo[].class))); + + ruleChainMetaData = testRestClient.postRuleChainMetadata(ruleChainMetaData); + + testRestClient.setRootRuleChain(ruleChain.getId()); + + RuleNode node = ruleChainMetaData.getNodes().stream().filter(ruleNode -> ruleNode.getType().equals(ruleNodeType)).findFirst().get(); + + Awaitility + .await() + .alias("Get events from rule chain") + .atMost(10, TimeUnit.SECONDS) + .until(() -> { + PageData events = testRestClient.getEvents(node.getId(), EventType.LC_EVENT, ruleChain.getTenantId(), new TimePageLink(1024)); + List eventInfos = events.getData().stream().filter(eventInfo -> + "STARTED".equals(eventInfo.getBody().get("event").asText()) && + "true".equals(eventInfo.getBody().get("success").asText())) + .collect(Collectors.toList()); + + return eventInfos.size() == eventsCount; + }); + + return ruleChain.getId(); + } +} diff --git a/msa/black-box-tests/src/test/resources/MqttRuleNodeTestMetadata.json b/msa/black-box-tests/src/test/resources/MqttRuleNodeTestMetadata.json new file mode 100644 index 0000000000..e6c9f5a53f --- /dev/null +++ b/msa/black-box-tests/src/test/resources/MqttRuleNodeTestMetadata.json @@ -0,0 +1,79 @@ +{ + "firstNodeIndex": 2, + "nodes": [ + { + "additionalInfo": { + "description": "", + "layoutX": 626, + "layoutY": 152 + }, + "type": "org.thingsboard.rule.engine.mqtt.TbMqttNode", + "name": "test mqtt", + "debugMode": true, + "singletonMode": true, + "queueName": "HighPriority", + "configurationVersion": 0, + "configuration": { + "topicPattern": "tb/mqtt/device", + "host": "broker", + "port": 1883, + "connectTimeoutSec": 10, + "clientId": null, + "cleanSession": true, + "retainedMessage": false, + "ssl": false, + "credentials": { + "type": "anonymous" + } + }, + "externalId": null + }, + { + "additionalInfo": { + "description": "", + "layoutX": 949, + "layoutY": 153 + }, + "type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", + "name": "save timeseries", + "debugMode": true, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "defaultTTL": 0, + "skipLatestPersistence": false, + "useServerTs": false + }, + "externalId": null + }, + { + "additionalInfo": { + "description": "", + "layoutX": 305, + "layoutY": 151 + }, + "type": "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode", + "name": "swatch", + "debugMode": false, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "version": 0 + }, + "externalId": null + } + ], + "connections": [ + { + "fromIndex": 0, + "toIndex": 1, + "type": "Success" + }, + { + "fromIndex": 2, + "toIndex": 0, + "type": "Post telemetry" + } + ], + "ruleChainConnections": null +} \ No newline at end of file diff --git a/msa/black-box-tests/src/test/resources/docker-compose.mosquitto.yml b/msa/black-box-tests/src/test/resources/docker-compose.mosquitto.yml new file mode 100644 index 0000000000..10675c9a23 --- /dev/null +++ b/msa/black-box-tests/src/test/resources/docker-compose.mosquitto.yml @@ -0,0 +1,25 @@ +# +# Copyright © 2016-2023 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +version: '3.0' +services: + broker: + image: eclipse-mosquitto + volumes: + - ./mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf + ports: + - "1883" + restart: always diff --git a/msa/black-box-tests/src/test/resources/mosquitto/mosquitto.conf b/msa/black-box-tests/src/test/resources/mosquitto/mosquitto.conf new file mode 100644 index 0000000000..7a40d338fc --- /dev/null +++ b/msa/black-box-tests/src/test/resources/mosquitto/mosquitto.conf @@ -0,0 +1,18 @@ +# +# Copyright © 2016-2023 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +listener 1883 +allow_anonymous true From 4101fc3963c7269455076fb6d83af16ba0cd7016 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Thu, 14 Dec 2023 21:14:31 +0100 Subject: [PATCH 6/8] added upgrade tests, and minor fixes --- .../install/ThingsboardInstallService.java | 1 - .../server/utils/TbNodeUpgradeUtils.java | 2 +- .../server/common/data/DataConstants.java | 1 + .../data/plugin/ComponentDescriptor.java | 2 +- .../resources/MqttRuleNodeTestMetadata.json | 4 +- .../rule/engine/api/util/TbNodeUtils.java | 2 - .../rule/engine/debug/TbMsgGeneratorNode.java | 2 +- .../deduplication/TbMsgDeduplicationNode.java | 4 +- .../rule/engine/flow/TbCheckpointNode.java | 2 +- .../engine/AbstractRuleNodeUpgradeTest.java | 52 ++++++++++++++++++ .../engine/debug/TbMsgGeneratorNodeTest.java | 53 ++++++++++++++++++ .../engine/flow/TbCheckpointNodeTest.java | 55 +++++++++++++++++++ .../telemetry/TbMsgAttributesNodeTest.java | 25 ++------- .../transform/TbMsgDeduplicationNodeTest.java | 35 +++++++++++- 14 files changed, 207 insertions(+), 33 deletions(-) create mode 100644 rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/AbstractRuleNodeUpgradeTest.java create mode 100644 rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeTest.java create mode 100644 rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeTest.java diff --git a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java index 054c7a6751..4c9677494b 100644 --- a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java +++ b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java @@ -277,7 +277,6 @@ public class ThingsboardInstallService { } else { log.info("Skipping images migration. Run the upgrade with fromVersion as '3.6.2-images' to migrate"); } - break; case "3.6.2": log.info("Upgrading ThingsBoard from version 3.6.2 to 3.6.3 ..."); databaseEntitiesUpgradeService.upgradeDatabase("3.6.2"); diff --git a/application/src/main/java/org/thingsboard/server/utils/TbNodeUpgradeUtils.java b/application/src/main/java/org/thingsboard/server/utils/TbNodeUpgradeUtils.java index 23c75b430b..e0ecfdd038 100644 --- a/application/src/main/java/org/thingsboard/server/utils/TbNodeUpgradeUtils.java +++ b/application/src/main/java/org/thingsboard/server/utils/TbNodeUpgradeUtils.java @@ -25,7 +25,7 @@ import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.service.component.RuleNodeClassInfo; -import static org.thingsboard.rule.engine.api.util.TbNodeUtils.QUEUE_NAME; +import static org.thingsboard.server.common.data.DataConstants.QUEUE_NAME; @Slf4j public class TbNodeUpgradeUtils { diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index ca48ef364a..14142de3db 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -135,5 +135,6 @@ public class DataConstants { public static final String HP_QUEUE_TOPIC = "tb_rule_engine.hp"; public static final String SQ_QUEUE_NAME = "SequentialByOriginator"; public static final String SQ_QUEUE_TOPIC = "tb_rule_engine.sq"; + public static final String QUEUE_NAME = "queueName"; } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentDescriptor.java b/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentDescriptor.java index 0e9b31ec09..1558080d42 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentDescriptor.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentDescriptor.java @@ -55,7 +55,7 @@ public class ComponentDescriptor extends BaseData { @Length(fieldName = "actions") @ApiModelProperty(position = 10, value = "Rule Node Actions. Deprecated. Always null.", accessMode = ApiModelProperty.AccessMode.READ_ONLY) @Getter @Setter private String actions; - @ApiModelProperty(position = 11, value = "Indicates that the RuleNode is support queue name.", accessMode = ApiModelProperty.AccessMode.READ_ONLY, example = "true") + @ApiModelProperty(position = 11, value = "Indicates that the RuleNode supports queue name configuration.", accessMode = ApiModelProperty.AccessMode.READ_ONLY, example = "true") @Getter @Setter private boolean hasQueueName; public ComponentDescriptor() { diff --git a/msa/black-box-tests/src/test/resources/MqttRuleNodeTestMetadata.json b/msa/black-box-tests/src/test/resources/MqttRuleNodeTestMetadata.json index e6c9f5a53f..7a5015add3 100644 --- a/msa/black-box-tests/src/test/resources/MqttRuleNodeTestMetadata.json +++ b/msa/black-box-tests/src/test/resources/MqttRuleNodeTestMetadata.json @@ -53,7 +53,7 @@ "layoutY": 151 }, "type": "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode", - "name": "swatch", + "name": "switch", "debugMode": false, "singletonMode": false, "configurationVersion": 0, @@ -76,4 +76,4 @@ } ], "ruleChainConnections": null -} \ No newline at end of file +} diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java index cce5649336..7c3be84410 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java @@ -36,8 +36,6 @@ import java.util.stream.Collectors; */ public class TbNodeUtils { - public static final String QUEUE_NAME = "queueName"; - private static final Pattern DATA_PATTERN = Pattern.compile("(\\$\\[)(.*?)(])"); public static T convert(TbNodeConfiguration configuration, Class clazz) throws TbNodeException { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java index cec7aca0a3..689e14429f 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java @@ -47,7 +47,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static org.thingsboard.common.util.DonAsynchron.withCallback; -import static org.thingsboard.rule.engine.api.util.TbNodeUtils.QUEUE_NAME; +import static org.thingsboard.server.common.data.DataConstants.QUEUE_NAME; @Slf4j @RuleNode( diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java index 17b72cd27a..dc7f2ee07f 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java @@ -44,7 +44,7 @@ import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import static org.thingsboard.rule.engine.api.util.TbNodeUtils.QUEUE_NAME; +import static org.thingsboard.server.common.data.DataConstants.QUEUE_NAME; @RuleNode( type = ComponentType.TRANSFORMATION, @@ -177,7 +177,7 @@ public class TbMsgDeduplicationNode implements TbNode { } if (resultMsg != null) { deduplicationResults.add(TbMsg.newMsg( - resultMsg.getQueueName(), + queueName, resultMsg.getType(), resultMsg.getOriginator(), resultMsg.getCustomerId(), diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java index 9616d8a167..c7e775f5a4 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java @@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.TbMsg; -import static org.thingsboard.rule.engine.api.util.TbNodeUtils.QUEUE_NAME; +import static org.thingsboard.server.common.data.DataConstants.QUEUE_NAME; @Slf4j @RuleNode( diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/AbstractRuleNodeUpgradeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/AbstractRuleNodeUpgradeTest.java new file mode 100644 index 0000000000..6e665357bc --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/AbstractRuleNodeUpgradeTest.java @@ -0,0 +1,52 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.api.TbNode; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.util.TbPair; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.BDDMockito.willCallRealMethod; + +public abstract class AbstractRuleNodeUpgradeTest { + + protected abstract TbNode getTestNode(); + + @ParameterizedTest + @MethodSource + public void givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig(int givenVersion, String givenConfigStr, boolean hasChanges, String expectedConfigStr) throws TbNodeException { + // GIVEN + willCallRealMethod().given(getTestNode()).upgrade(anyInt(), any()); + JsonNode givenConfig = JacksonUtil.toJsonNode(givenConfigStr); + JsonNode expectedConfig = JacksonUtil.toJsonNode(expectedConfigStr); + + // WHEN + TbPair upgradeResult = getTestNode().upgrade(givenVersion, givenConfig); + + // THEN + assertThat(upgradeResult.getFirst()).isEqualTo(hasChanges); + ObjectNode upgradedConfig = (ObjectNode) upgradeResult.getSecond(); + assertThat(upgradedConfig).isEqualTo(expectedConfig); + } +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeTest.java new file mode 100644 index 0000000000..2e188bf002 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeTest.java @@ -0,0 +1,53 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.debug; + +import org.junit.jupiter.params.provider.Arguments; +import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; +import org.thingsboard.rule.engine.api.TbNode; + +import java.util.stream.Stream; + +import static org.mockito.Mockito.spy; + +public class TbMsgGeneratorNodeTest extends AbstractRuleNodeUpgradeTest { + + // Rule nodes upgrade + private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { + return Stream.of( + // default config for version 0 + Arguments.of(0, + "{\"msgCount\":0,\"periodInSeconds\":1,\"originatorId\":null,\"originatorType\":null, \"queueName\":null, \"scriptLang\":\"TBEL\",\"jsScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\",\"tbelScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\"}", + true, + "{\"msgCount\":0,\"periodInSeconds\":1,\"originatorId\":null,\"originatorType\":null, \"scriptLang\":\"TBEL\",\"jsScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\",\"tbelScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\"}"), + // default config for version 0 with queueName + Arguments.of(0, + "{\"msgCount\":0,\"periodInSeconds\":1,\"originatorId\":null,\"originatorType\":null, \"queueName\":\"Main\", \"scriptLang\":\"TBEL\",\"jsScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\",\"tbelScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\"}", + true, + "{\"msgCount\":0,\"periodInSeconds\":1,\"originatorId\":null,\"originatorType\":null, \"scriptLang\":\"TBEL\",\"jsScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\",\"tbelScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\"}"), + // default config for version 1 with upgrade from version 0 + Arguments.of(0, + "{\"msgCount\":0,\"periodInSeconds\":1,\"originatorId\":null,\"originatorType\":null, \"scriptLang\":\"TBEL\",\"jsScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\",\"tbelScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\"}", + false, + "{\"msgCount\":0,\"periodInSeconds\":1,\"originatorId\":null,\"originatorType\":null, \"scriptLang\":\"TBEL\",\"jsScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\",\"tbelScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\"}") + ); + } + + @Override + protected TbNode getTestNode() { + return spy(TbMsgGeneratorNode.class); + } +} \ No newline at end of file diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeTest.java new file mode 100644 index 0000000000..edb7a7c3b1 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeTest.java @@ -0,0 +1,55 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.flow; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.params.provider.Arguments; +import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; +import org.thingsboard.rule.engine.api.TbNode; + +import java.util.stream.Stream; + +import static org.mockito.Mockito.spy; + +@Slf4j +public class TbCheckpointNodeTest extends AbstractRuleNodeUpgradeTest { + + // Rule nodes upgrade + private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { + return Stream.of( + // default config for version 0 + Arguments.of(0, + "{\"queueName\":null}", + true, + "{}"), + // default config for version 0 with queueName + Arguments.of(0, + "{\"queueName\":\"Main\"}", + true, + "{}"), + // default config for version 1 with upgrade from version 0 + Arguments.of(0, + "{}", + false, + "{}") + ); + } + + @Override + protected TbNode getTestNode() { + return spy(TbCheckpointNode.class); + } +} \ No newline at end of file diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeTest.java index fe2edc941b..d59e416fcf 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeTest.java @@ -15,7 +15,6 @@ */ package org.thingsboard.rule.engine.telemetry; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeEach; @@ -25,8 +24,10 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.server.common.data.DataConstants; @@ -40,7 +41,6 @@ import org.thingsboard.server.common.data.kv.JsonDataEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.msg.TbMsgType; -import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -53,7 +53,6 @@ import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.willCallRealMethod; @@ -65,7 +64,7 @@ import static org.mockito.Mockito.when; import static org.thingsboard.server.common.data.DataConstants.NOTIFY_DEVICE_METADATA_KEY; @Slf4j -class TbMsgAttributesNodeTest { +class TbMsgAttributesNodeTest extends AbstractRuleNodeUpgradeTest { private TenantId tenantId; private DeviceId deviceId; @@ -223,21 +222,9 @@ class TbMsgAttributesNodeTest { ); } - @ParameterizedTest - @MethodSource - void givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig(int givenVersion, String givenConfigStr, boolean hasChanges, String expectedConfigStr) throws TbNodeException { - // GIVEN - willCallRealMethod().given(node).upgrade(anyInt(), any()); - JsonNode givenConfig = JacksonUtil.toJsonNode(givenConfigStr); - JsonNode expectedConfig = JacksonUtil.toJsonNode(expectedConfigStr); - - // WHEN - TbPair upgradeResult = node.upgrade(givenVersion, givenConfig); - - // THEN - assertThat(upgradeResult.getFirst()).isEqualTo(hasChanges); - ObjectNode upgradedConfig = (ObjectNode) upgradeResult.getSecond(); - assertThat(upgradedConfig).isEqualTo(expectedConfig); + @Override + protected TbNode getTestNode() { + return node; } } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java index 8e8340c713..b5bbcdc0fe 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java @@ -22,12 +22,15 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.provider.Arguments; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.stubbing.Answer; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.deduplication.DeduplicationStrategy; @@ -40,7 +43,6 @@ import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.msg.TbNodeConnectionType; -import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -56,6 +58,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import java.util.stream.Stream; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -69,7 +72,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @Slf4j -public class TbMsgDeduplicationNodeTest { +public class TbMsgDeduplicationNodeTest extends AbstractRuleNodeUpgradeTest { private TbContext ctx; @@ -293,7 +296,7 @@ public class TbMsgDeduplicationNodeTest { for (TbMsg msg : firstMsgPack) { node.onMsg(ctx, msg); } - long firstPackDeduplicationPackEndTs = firstMsgPack.get(0).getMetaDataTs() + TimeUnit.SECONDS.toMillis(deduplicationInterval); + long firstPackDeduplicationPackEndTs = firstMsgPack.get(0).getMetaDataTs() + TimeUnit.SECONDS.toMillis(deduplicationInterval); List secondMsgPack = getTbMsgs(deviceId, msgCount / 2, firstPackDeduplicationPackEndTs, 500); for (TbMsg msg : secondMsgPack) { @@ -386,6 +389,27 @@ public class TbMsgDeduplicationNodeTest { Assertions.assertEquals(msgWithLatestTsInSecondPack.getType(), actualMsg.getType()); } + // Rule nodes upgrade + private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { + return Stream.of( + // default config for version 0 + Arguments.of(0, + "{\"interval\":60,\"strategy\":\"FIRST\",\"outMsgType\":null,\"maxPendingMsgs\":100,\"maxRetries\":3, \"queueName\":null}", + true, + "{\"interval\":60,\"strategy\":\"FIRST\",\"outMsgType\":null,\"maxPendingMsgs\":100,\"maxRetries\":3}"), + // default config for version 0 with queueName + Arguments.of(0, + "{\"interval\":60,\"strategy\":\"FIRST\",\"outMsgType\":null,\"maxPendingMsgs\":100,\"maxRetries\":3, \"queueName\":\"Main\"}", + true, + "{\"interval\":60,\"strategy\":\"FIRST\",\"outMsgType\":null,\"maxPendingMsgs\":100,\"maxRetries\":3}"), + // default config for version 1 with upgrade from version 0 + Arguments.of(0, + "{\"interval\":60,\"strategy\":\"FIRST\",\"outMsgType\":null,\"maxPendingMsgs\":100,\"maxRetries\":3}", + false, + "{\"interval\":60,\"strategy\":\"FIRST\",\"outMsgType\":null,\"maxPendingMsgs\":100,\"maxRetries\":3}") + ); + } + private TbMsg getMsgWithLatestTs(List firstMsgPack) { int indexOfLastMsgInArray = firstMsgPack.size() - 1; int indexToSetMaxTs = new Random().nextInt(indexOfLastMsgInArray) + 1; @@ -430,4 +454,9 @@ public class TbMsgDeduplicationNodeTest { return JacksonUtil.toString(mergedData); } + @Override + protected TbNode getTestNode() { + return node; + } + } From fe6ca79c4154cd16ff243a26a822beb1dbe07522 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Mon, 18 Dec 2023 15:31:23 +0100 Subject: [PATCH 7/8] minor improvements --- .../deduplication/TbMsgDeduplicationNode.java | 2 +- .../engine/debug/TbMsgGeneratorNodeTest.java | 2 +- .../engine/flow/TbCheckpointNodeTest.java | 2 +- .../transform/TbMsgDeduplicationNodeTest.java | 23 +++++++++++++++++-- 4 files changed, 24 insertions(+), 5 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java index dc7f2ee07f..4e83b1887b 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java @@ -177,7 +177,7 @@ public class TbMsgDeduplicationNode implements TbNode { } if (resultMsg != null) { deduplicationResults.add(TbMsg.newMsg( - queueName, + queueName != null ? queueName : resultMsg.getQueueName(), resultMsg.getType(), resultMsg.getOriginator(), resultMsg.getCustomerId(), diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeTest.java index 2e188bf002..f0ba55e03b 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeTest.java @@ -50,4 +50,4 @@ public class TbMsgGeneratorNodeTest extends AbstractRuleNodeUpgradeTest { protected TbNode getTestNode() { return spy(TbMsgGeneratorNode.class); } -} \ No newline at end of file +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeTest.java index edb7a7c3b1..c7d50528d5 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeTest.java @@ -52,4 +52,4 @@ public class TbCheckpointNodeTest extends AbstractRuleNodeUpgradeTest { protected TbNode getTestNode() { return spy(TbCheckpointNode.class); } -} \ No newline at end of file +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java index b5bbcdc0fe..89eda98090 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java @@ -22,7 +22,9 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.stubbing.Answer; @@ -143,13 +145,24 @@ public class TbMsgDeduplicationNodeTest extends AbstractRuleNodeUpgradeTest { node.destroy(); } - @Test - public void given_100_messages_strategy_first_then_verifyOutput() throws TbNodeException, ExecutionException, InterruptedException { + private static Stream given_100_messages_strategy_first_then_verifyOutput() { + return Stream.of( + Arguments.of((String) null), + Arguments.of(DataConstants.MAIN_QUEUE_NAME), + Arguments.of(DataConstants.HP_QUEUE_NAME) + ); + } + + @ParameterizedTest + @MethodSource + public void given_100_messages_strategy_first_then_verifyOutput(String queueName) throws TbNodeException, ExecutionException, InterruptedException { int wantedNumberOfTellSelfInvocation = 2; int msgCount = 100; awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); invokeTellSelf(wantedNumberOfTellSelfInvocation); + when(ctx.getQueueName()).thenReturn(queueName); + config.setInterval(deduplicationInterval); config.setMaxPendingMsgs(msgCount); nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); @@ -186,6 +199,12 @@ public class TbMsgDeduplicationNodeTest extends AbstractRuleNodeUpgradeTest { Assertions.assertEquals(firstMsg.getData(), actualMsg.getData()); Assertions.assertEquals(firstMsg.getMetaData(), actualMsg.getMetaData()); Assertions.assertEquals(firstMsg.getType(), actualMsg.getType()); + + if (queueName == null) { + Assertions.assertEquals(firstMsg.getQueueName(), actualMsg.getQueueName()); + } else { + Assertions.assertEquals(ctx.getQueueName(), actualMsg.getQueueName()); + } } @Test From 560baa7a0ea111e0c2210c44ac69c4dfc0efccbb Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 9 Jan 2024 11:34:44 +0100 Subject: [PATCH 8/8] license:format --- .../java/org/thingsboard/server/msa/rule/node/MqttNodeTest.java | 2 +- .../src/test/resources/docker-compose.mosquitto.yml | 2 +- msa/black-box-tests/src/test/resources/mosquitto/mosquitto.conf | 2 +- .../thingsboard/rule/engine/AbstractRuleNodeUpgradeTest.java | 2 +- .../thingsboard/rule/engine/debug/TbMsgGeneratorNodeTest.java | 2 +- .../org/thingsboard/rule/engine/flow/TbCheckpointNodeTest.java | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/rule/node/MqttNodeTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/rule/node/MqttNodeTest.java index 5aa0ae48fb..07a0691924 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/rule/node/MqttNodeTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/rule/node/MqttNodeTest.java @@ -1,5 +1,5 @@ /** - * Copyright © 2016-2023 The Thingsboard Authors + * Copyright © 2016-2024 The Thingsboard Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/msa/black-box-tests/src/test/resources/docker-compose.mosquitto.yml b/msa/black-box-tests/src/test/resources/docker-compose.mosquitto.yml index 10675c9a23..1936f57d49 100644 --- a/msa/black-box-tests/src/test/resources/docker-compose.mosquitto.yml +++ b/msa/black-box-tests/src/test/resources/docker-compose.mosquitto.yml @@ -1,5 +1,5 @@ # -# Copyright © 2016-2023 The Thingsboard Authors +# Copyright © 2016-2024 The Thingsboard Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/msa/black-box-tests/src/test/resources/mosquitto/mosquitto.conf b/msa/black-box-tests/src/test/resources/mosquitto/mosquitto.conf index 7a40d338fc..7136ad64b0 100644 --- a/msa/black-box-tests/src/test/resources/mosquitto/mosquitto.conf +++ b/msa/black-box-tests/src/test/resources/mosquitto/mosquitto.conf @@ -1,5 +1,5 @@ # -# Copyright © 2016-2023 The Thingsboard Authors +# Copyright © 2016-2024 The Thingsboard Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/AbstractRuleNodeUpgradeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/AbstractRuleNodeUpgradeTest.java index 6e665357bc..f2509a9d68 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/AbstractRuleNodeUpgradeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/AbstractRuleNodeUpgradeTest.java @@ -1,5 +1,5 @@ /** - * Copyright © 2016-2023 The Thingsboard Authors + * Copyright © 2016-2024 The Thingsboard Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeTest.java index f0ba55e03b..13f2b2ad42 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeTest.java @@ -1,5 +1,5 @@ /** - * Copyright © 2016-2023 The Thingsboard Authors + * Copyright © 2016-2024 The Thingsboard Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeTest.java index c7d50528d5..545f6112d8 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeTest.java @@ -1,5 +1,5 @@ /** - * Copyright © 2016-2023 The Thingsboard Authors + * Copyright © 2016-2024 The Thingsboard Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.