diff --git a/application/src/main/data/upgrade/3.6.2/schema_update.sql b/application/src/main/data/upgrade/3.6.2/schema_update.sql index a18b5e1069..6ae5e45134 100644 --- a/application/src/main/data/upgrade/3.6.2/schema_update.sql +++ b/application/src/main/data/upgrade/3.6.2/schema_update.sql @@ -21,3 +21,10 @@ DROP INDEX IF EXISTS idx_rule_node_type_configuration_version; CREATE INDEX IF NOT EXISTS idx_rule_node_type_id_configuration_version ON rule_node(type, id, configuration_version); -- RULE NODE INDEXES UPDATE END + +-- RULE NODE QUEUE UPDATE START + +ALTER TABLE rule_node ADD COLUMN IF NOT EXISTS queue_name varchar(255); +ALTER TABLE component_descriptor ADD COLUMN IF NOT EXISTS has_queue_name boolean DEFAULT false; + +-- RULE NODE QUEUE UPDATE END diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index badac3509e..fafa9d60ea 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -186,7 +186,7 @@ class DefaultTbContext implements TbContext { @Override public void enqueue(TbMsg tbMsg, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator()); enqueue(tpi, tbMsg, onFailure, onSuccess); } @@ -311,7 +311,7 @@ class DefaultTbContext implements TbContext { @Override public boolean isLocalEntity(EntityId entityId) { - return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), entityId).isMyPartition(); + return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), entityId).isMyPartition(); } private void scheduleMsgWithDelay(TbActorMsg msg, long delayInMs, TbActorRef target) { @@ -510,6 +510,11 @@ class DefaultTbContext implements TbContext { return ruleChainName; } + @Override + public String getQueueName() { + return getSelf().getQueueName(); + } + @Override public TenantId getTenantId() { return nodeCtx.getTenantId(); diff --git a/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java index a598a3b981..84da68385e 100644 --- a/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java +++ b/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java @@ -189,6 +189,7 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe scannedComponent.setName(ruleNodeAnnotation.name()); scannedComponent.setScope(ruleNodeAnnotation.scope()); scannedComponent.setClusteringMode(ruleNodeAnnotation.clusteringMode()); + scannedComponent.setHasQueueName(ruleNodeAnnotation.hasQueueName()); NodeDefinition nodeDefinition = prepareNodeDefinition(clazz, ruleNodeAnnotation); ObjectNode configurationDescriptor = JacksonUtil.newObjectNode(); JsonNode node = JacksonUtil.valueToTree(nodeDefinition); diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/RuleChainImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/RuleChainImportService.java index bc156c3d01..bdeb93c329 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/RuleChainImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/RuleChainImportService.java @@ -21,7 +21,6 @@ import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.audit.ActionType; -import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; diff --git a/application/src/main/java/org/thingsboard/server/utils/TbNodeUpgradeUtils.java b/application/src/main/java/org/thingsboard/server/utils/TbNodeUpgradeUtils.java index 157bbbe816..c1e7473434 100644 --- a/application/src/main/java/org/thingsboard/server/utils/TbNodeUpgradeUtils.java +++ b/application/src/main/java/org/thingsboard/server/utils/TbNodeUpgradeUtils.java @@ -25,6 +25,8 @@ import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.service.component.RuleNodeClassInfo; +import static org.thingsboard.server.common.data.DataConstants.QUEUE_NAME; + @Slf4j public class TbNodeUpgradeUtils { @@ -44,9 +46,13 @@ public class TbNodeUpgradeUtils { } else { var tbVersionedNode = getTbVersionedNode(nodeInfo); try { + JsonNode queueName = oldConfiguration.get(QUEUE_NAME); TbPair upgradeResult = tbVersionedNode.upgrade(configurationVersion, oldConfiguration); if (upgradeResult.getFirst()) { node.setConfiguration(upgradeResult.getSecond()); + if (nodeInfo.getAnnotation().hasQueueName() && queueName != null && queueName.isTextual()) { + node.setQueueName(queueName.asText()); + } } } catch (Exception e) { try { diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index 54fd39906e..53b5d2ec63 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -136,5 +136,6 @@ public class DataConstants { public static final String HP_QUEUE_TOPIC = "tb_rule_engine.hp"; public static final String SQ_QUEUE_NAME = "SequentialByOriginator"; public static final String SQ_QUEUE_TOPIC = "tb_rule_engine.sq"; + public static final String QUEUE_NAME = "queueName"; } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentDescriptor.java b/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentDescriptor.java index 639aba31e2..ee117a6a86 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentDescriptor.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentDescriptor.java @@ -55,6 +55,8 @@ public class ComponentDescriptor extends BaseData { @Length(fieldName = "actions") @ApiModelProperty(position = 10, value = "Rule Node Actions. Deprecated. Always null.", accessMode = ApiModelProperty.AccessMode.READ_ONLY) @Getter @Setter private String actions; + @ApiModelProperty(position = 11, value = "Indicates that the RuleNode supports queue name configuration.", accessMode = ApiModelProperty.AccessMode.READ_ONLY, example = "true") + @Getter @Setter private boolean hasQueueName; public ComponentDescriptor() { super(); @@ -74,6 +76,7 @@ public class ComponentDescriptor extends BaseData { this.configurationDescriptor = plugin.getConfigurationDescriptor(); this.configurationVersion = plugin.getConfigurationVersion(); this.actions = plugin.getActions(); + this.hasQueueName = plugin.isHasQueueName(); } @ApiModelProperty(position = 1, value = "JSON object with the descriptor Id. " + @@ -104,6 +107,8 @@ public class ComponentDescriptor extends BaseData { if (!Objects.equals(actions, that.actions)) return false; if (!Objects.equals(configurationDescriptor, that.configurationDescriptor)) return false; if (configurationVersion != that.configurationVersion) return false; + if (clusteringMode != that.clusteringMode) return false; + if (hasQueueName != that.isHasQueueName()) return false; return Objects.equals(clazz, that.clazz); } @@ -115,6 +120,8 @@ public class ComponentDescriptor extends BaseData { result = 31 * result + (name != null ? name.hashCode() : 0); result = 31 * result + (clazz != null ? clazz.hashCode() : 0); result = 31 * result + (actions != null ? actions.hashCode() : 0); + result = 31 * result + (clusteringMode != null ? clusteringMode.hashCode() : 0); + result = 31 * result + (hasQueueName ? 1 : 0); return result; } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java index 97349951ec..4d1e696d1a 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java @@ -50,9 +50,11 @@ public class RuleNode extends BaseDataWithAdditionalInfo implements private boolean debugMode; @ApiModelProperty(position = 7, value = "Enable/disable singleton mode. ", example = "false") private boolean singletonMode; - @ApiModelProperty(position = 8, value = "Version of rule node configuration. ", example = "0") + @ApiModelProperty(position = 8, value = "Queue name. ", example = "Main") + private String queueName; + @ApiModelProperty(position = 9, value = "Version of rule node configuration. ", example = "0") private int configurationVersion; - @ApiModelProperty(position = 9, value = "JSON with the rule node configuration. Structure depends on the rule node implementation.", dataType = "com.fasterxml.jackson.databind.JsonNode") + @ApiModelProperty(position = 10, value = "JSON with the rule node configuration. Structure depends on the rule node implementation.", dataType = "com.fasterxml.jackson.databind.JsonNode") private transient JsonNode configuration; @JsonIgnore private byte[] configurationBytes; diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java index 2bd943862c..2fe56f5d1f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java @@ -354,6 +354,7 @@ public class ModelConstants { public static final String COMPONENT_DESCRIPTOR_CONFIGURATION_DESCRIPTOR_PROPERTY = "configuration_descriptor"; public static final String COMPONENT_DESCRIPTOR_CONFIGURATION_VERSION_PROPERTY = "configuration_version"; public static final String COMPONENT_DESCRIPTOR_ACTIONS_PROPERTY = "actions"; + public static final String COMPONENT_DESCRIPTOR_HAS_QUEUE_NAME_PROPERTY = "has_queue_name"; /** * Event constants. @@ -389,6 +390,7 @@ public class ModelConstants { public static final String DEBUG_MODE = "debug_mode"; public static final String SINGLETON_MODE = "singleton_mode"; + public static final String QUEUE_NAME = "queue_name"; /** * Rule chain constants. diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/ComponentDescriptorEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/ComponentDescriptorEntity.java index 84ee924041..8938fc5ad4 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/ComponentDescriptorEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/ComponentDescriptorEntity.java @@ -70,6 +70,9 @@ public class ComponentDescriptorEntity extends BaseSqlEntity { @Column(name = ModelConstants.SINGLETON_MODE) private boolean singletonMode; + @Column(name = ModelConstants.QUEUE_NAME) + private String queueName; + @Column(name = ModelConstants.EXTERNAL_ID_PROPERTY) private UUID externalId; @@ -84,6 +87,7 @@ public class RuleNodeEntity extends BaseSqlEntity { this.name = ruleNode.getName(); this.debugMode = ruleNode.isDebugMode(); this.singletonMode = ruleNode.isSingletonMode(); + this.queueName = ruleNode.getQueueName(); this.configurationVersion = ruleNode.getConfigurationVersion(); this.configuration = ruleNode.getConfiguration(); this.additionalInfo = ruleNode.getAdditionalInfo(); @@ -103,6 +107,7 @@ public class RuleNodeEntity extends BaseSqlEntity { ruleNode.setName(name); ruleNode.setDebugMode(debugMode); ruleNode.setSingletonMode(singletonMode); + ruleNode.setQueueName(queueName); ruleNode.setConfigurationVersion(configurationVersion); ruleNode.setConfiguration(configuration); ruleNode.setAdditionalInfo(additionalInfo); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/component/AbstractComponentDescriptorInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/component/AbstractComponentDescriptorInsertRepository.java index 16e99f98b2..dc2759acd1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/component/AbstractComponentDescriptorInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/component/AbstractComponentDescriptorInsertRepository.java @@ -77,7 +77,8 @@ public abstract class AbstractComponentDescriptorInsertRepository implements Com .setParameter("name", entity.getName()) .setParameter("scope", entity.getScope().name()) .setParameter("type", entity.getType().name()) - .setParameter("clustering_mode", entity.getClusteringMode().name()); + .setParameter("clustering_mode", entity.getClusteringMode().name()) + .setParameter("has_queue_name", entity.isHasQueueName()); } private ComponentDescriptorEntity processSaveOrUpdate(ComponentDescriptorEntity entity, String query) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/component/SqlComponentDescriptorInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/component/SqlComponentDescriptorInsertRepository.java index 2d4d1089ea..0352cdcda3 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/component/SqlComponentDescriptorInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/component/SqlComponentDescriptorInsertRepository.java @@ -44,10 +44,10 @@ public class SqlComponentDescriptorInsertRepository extends AbstractComponentDes } private static String getInsertOrUpdateStatement(String conflictKeyStatement, String updateKeyStatement) { - return "INSERT INTO component_descriptor (id, created_time, actions, clazz, configuration_descriptor, configuration_version, name, scope, type, clustering_mode) VALUES (:id, :created_time, :actions, :clazz, :configuration_descriptor, :configuration_version, :name, :scope, :type, :clustering_mode) ON CONFLICT " + conflictKeyStatement + " DO UPDATE SET " + updateKeyStatement + " returning *"; + return "INSERT INTO component_descriptor (id, created_time, actions, clazz, configuration_descriptor, configuration_version, name, scope, type, clustering_mode, has_queue_name) VALUES (:id, :created_time, :actions, :clazz, :configuration_descriptor, :configuration_version, :name, :scope, :type, :clustering_mode, :has_queue_name) ON CONFLICT " + conflictKeyStatement + " DO UPDATE SET " + updateKeyStatement + " returning *"; } private static String getUpdateStatement(String id) { - return "actions = :actions, " + id + ",created_time = :created_time, configuration_descriptor = :configuration_descriptor, configuration_version = :configuration_version, name = :name, scope = :scope, type = :type, clustering_mode = :clustering_mode"; + return "actions = :actions, " + id + ",created_time = :created_time, configuration_descriptor = :configuration_descriptor, configuration_version = :configuration_version, name = :name, scope = :scope, type = :type, clustering_mode = :clustering_mode, has_queue_name = :has_queue_name"; } } diff --git a/dao/src/main/resources/sql/schema-entities.sql b/dao/src/main/resources/sql/schema-entities.sql index 9dc6c6a6b1..2352ea2eb3 100644 --- a/dao/src/main/resources/sql/schema-entities.sql +++ b/dao/src/main/resources/sql/schema-entities.sql @@ -126,7 +126,8 @@ CREATE TABLE IF NOT EXISTS component_descriptor ( name varchar(255), scope varchar(255), type varchar(255), - clustering_mode varchar(255) + clustering_mode varchar(255), + has_queue_name boolean DEFAULT false ); CREATE TABLE IF NOT EXISTS customer ( @@ -187,6 +188,7 @@ CREATE TABLE IF NOT EXISTS rule_node ( name varchar(255), debug_mode boolean, singleton_mode boolean, + queue_name varchar(255), external_id uuid ); diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java index b9f8c3fdb3..15c3ccbbba 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java @@ -107,6 +107,7 @@ public class ContainerTestSuite { List composeFiles = new ArrayList<>(Arrays.asList( new File(targetDir + "docker-compose.yml"), new File(targetDir + "docker-compose.volumes.yml"), + new File(targetDir + "docker-compose.mosquitto.yml"), new File(targetDir + (IS_HYBRID_MODE ? "docker-compose.hybrid.yml" : "docker-compose.postgres.yml")), new File(targetDir + (IS_HYBRID_MODE ? "docker-compose.hybrid-test-extras.yml" : "docker-compose.postgres-test-extras.yml")), new File(targetDir + "docker-compose.postgres.volumes.yml"), @@ -162,6 +163,7 @@ public class ContainerTestSuite { .withEnv(queueEnv) .withEnv("LOAD_BALANCER_NAME", "") .withExposedService("haproxy", 80, Wait.forHttp("/swagger-ui.html").withStartupTimeout(CONTAINER_STARTUP_TIMEOUT)) + .withExposedService("broker", 1883) .waitingFor("tb-core1", Wait.forLogMessage(TB_CORE_LOG_REGEXP, 1).withStartupTimeout(CONTAINER_STARTUP_TIMEOUT)) .waitingFor("tb-core2", Wait.forLogMessage(TB_CORE_LOG_REGEXP, 1).withStartupTimeout(CONTAINER_STARTUP_TIMEOUT)) .waitingFor("tb-http-transport1", Wait.forLogMessage(TRANSPORTS_LOG_REGEXP, 1).withStartupTimeout(CONTAINER_STARTUP_TIMEOUT)) diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestProperties.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestProperties.java index 25e3acd16c..d33c740603 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestProperties.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestProperties.java @@ -48,4 +48,13 @@ public class TestProperties { } return System.getProperty("tb.wsUrl", "ws://localhost:8080"); } + + public static String getMqttBrokerUrl() { + if (instance.isActive()) { + String host = instance.getTestContainer().getServiceHost("broker", 1883); + Integer port = instance.getTestContainer().getServicePort("broker", 1883); + return "tcp://" + host + ":" + port; + } + return System.getProperty("mqtt.broker", "tcp://localhost:1883"); + } } diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java index 25390e6306..305e8706f2 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java @@ -31,10 +31,12 @@ import org.thingsboard.server.common.data.Dashboard; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.EntityView; +import org.thingsboard.server.common.data.EventInfo; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.asset.AssetProfile; +import org.thingsboard.server.common.data.event.EventType; import org.thingsboard.server.common.data.id.AlarmId; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.AssetProfileId; @@ -45,9 +47,11 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.RuleChainId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.rule.RuleChain; @@ -527,4 +531,41 @@ public class TestRestClient { .then() .statusCode(HTTP_OK); } + + public PageData getEvents(EntityId entityId, EventType eventType, TenantId tenantId, TimePageLink pageLink) { + Map params = new HashMap<>(); + params.put("entityType", entityId.getEntityType().name()); + params.put("entityId", entityId.getId().toString()); + params.put("eventType", eventType.name()); + params.put("tenantId", tenantId.getId().toString()); + addTimePageLinkToParam(params, pageLink); + + return given().spec(requestSpec) + .get("/api/events/{entityType}/{entityId}/{eventType}?tenantId={tenantId}&" + getTimeUrlParams(pageLink), params) + .then() + .statusCode(HTTP_OK) + .extract() + .as(new TypeRef<>() {}); + } + + private void addTimePageLinkToParam(Map params, TimePageLink pageLink) { + this.addPageLinkToParam(params, pageLink); + if (pageLink.getStartTime() != null) { + params.put("startTime", String.valueOf(pageLink.getStartTime())); + } + if (pageLink.getEndTime() != null) { + params.put("endTime", String.valueOf(pageLink.getEndTime())); + } + } + + private String getTimeUrlParams(TimePageLink pageLink) { + String urlParams = getUrlParams(pageLink); + if (pageLink.getStartTime() != null) { + urlParams += "&startTime={startTime}"; + } + if (pageLink.getEndTime() != null) { + urlParams += "&endTime={endTime}"; + } + return urlParams; + } } diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/rule/node/MqttNodeTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/rule/node/MqttNodeTest.java new file mode 100644 index 0000000000..07a0691924 --- /dev/null +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/rule/node/MqttNodeTest.java @@ -0,0 +1,204 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.msa.rule.node; + +import com.fasterxml.jackson.databind.JsonNode; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; +import org.eclipse.paho.client.mqttv3.IMqttMessageListener; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.EventInfo; +import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.event.EventType; +import org.thingsboard.server.common.data.id.RuleChainId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.page.TimePageLink; +import org.thingsboard.server.common.data.rule.NodeConnectionInfo; +import org.thingsboard.server.common.data.rule.RuleChain; +import org.thingsboard.server.common.data.rule.RuleChainMetaData; +import org.thingsboard.server.common.data.rule.RuleNode; +import org.thingsboard.server.common.data.security.DeviceCredentials; +import org.thingsboard.server.msa.AbstractContainerTest; +import org.thingsboard.server.msa.DisableUIListeners; +import org.thingsboard.server.msa.TestProperties; +import org.thingsboard.server.msa.WsClient; +import org.thingsboard.server.msa.mapper.WsTelemetryResponse; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.fail; +import static org.thingsboard.server.msa.prototypes.DevicePrototypes.defaultDevicePrototype; + +@DisableUIListeners +@Slf4j +public class MqttNodeTest extends AbstractContainerTest { + + private static final String TOPIC = "tb/mqtt/device"; + + private Device device; + + @BeforeMethod + public void setUp() { + testRestClient.login("tenant@thingsboard.org", "tenant"); + device = testRestClient.postDevice("", defaultDevicePrototype("mqtt_")); + } + + @AfterMethod + public void tearDown() { + testRestClient.deleteDeviceIfExists(device.getId()); + } + + @Test + public void telemetryUpload() throws Exception { + RuleChainId defaultRuleChainId = getDefaultRuleChainId(); + + createRootRuleChainWithTestNode("MqttRuleNodeTestMetadata.json", "org.thingsboard.rule.engine.mqtt.TbMqttNode", 2); + + DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId()); + + WsClient wsClient = subscribeToWebSocket(device.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS); + + MqttMessageListener messageListener = new MqttMessageListener(); + MqttClient responseClient = new MqttClient(TestProperties.getMqttBrokerUrl(), StringUtils.randomAlphanumeric(10), new MemoryPersistence()); + responseClient.connect(); + responseClient.subscribe(TOPIC, messageListener); + + MqttClient mqttClient = new MqttClient("tcp://localhost:1883", StringUtils.randomAlphanumeric(10), new MemoryPersistence()); + MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); + mqttConnectOptions.setUserName(deviceCredentials.getCredentialsId()); + mqttClient.connect(mqttConnectOptions); + mqttClient.publish("v1/devices/me/telemetry", new MqttMessage(createPayload().toString().getBytes())); + + WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage(); + log.info("Received telemetry: {}", actualLatestTelemetry); + wsClient.closeBlocking(); + + assertThat(actualLatestTelemetry.getData()).hasSize(4); + assertThat(actualLatestTelemetry.getLatestValues().keySet()).containsOnlyOnceElementsOf(Arrays.asList("booleanKey", "stringKey", "doubleKey", "longKey")); + + assertThat(actualLatestTelemetry.getDataValuesByKey("booleanKey").get(1)).isEqualTo(Boolean.TRUE.toString()); + assertThat(actualLatestTelemetry.getDataValuesByKey("stringKey").get(1)).isEqualTo("value1"); + assertThat(actualLatestTelemetry.getDataValuesByKey("doubleKey").get(1)).isEqualTo(Double.toString(42.0)); + assertThat(actualLatestTelemetry.getDataValuesByKey("longKey").get(1)).isEqualTo(Long.toString(73)); + + Awaitility + .await() + .alias("Get integration events") + .atMost(10, TimeUnit.SECONDS) + .until(() -> messageListener.getEvents().size() > 0); + + BlockingQueue events = messageListener.getEvents(); + JsonNode actual = JacksonUtil.toJsonNode(Objects.requireNonNull(events.poll()).message); + + assertThat(actual.get("stringKey").asText()).isEqualTo("value1"); + assertThat(actual.get("booleanKey").asBoolean()).isEqualTo(Boolean.TRUE); + assertThat(actual.get("doubleKey").asDouble()).isEqualTo(42.0); + assertThat(actual.get("longKey").asLong()).isEqualTo(73); + + testRestClient.setRootRuleChain(defaultRuleChainId); + } + + @Data + private class MqttMessageListener implements IMqttMessageListener { + private final BlockingQueue events; + + private MqttMessageListener() { + events = new ArrayBlockingQueue<>(100); + } + + @Override + public void messageArrived(String s, MqttMessage mqttMessage) { + log.info("MQTT message [{}], topic [{}]", mqttMessage.toString(), s); + events.add(new MqttEvent(s, mqttMessage.toString())); + } + + public BlockingQueue getEvents() { + return events; + } + } + + @Data + private class MqttEvent { + private final String topic; + private final String message; + } + + private RuleChainId getDefaultRuleChainId() { + PageData ruleChains = testRestClient.getRuleChains(new PageLink(40, 0)); + + Optional defaultRuleChain = ruleChains.getData() + .stream() + .filter(RuleChain::isRoot) + .findFirst(); + if (!defaultRuleChain.isPresent()) { + fail("Root rule chain wasn't found"); + } + return defaultRuleChain.get().getId(); + } + + protected RuleChainId createRootRuleChainWithTestNode(String ruleChainMetadataFile, String ruleNodeType, int eventsCount) throws Exception { + RuleChain newRuleChain = new RuleChain(); + newRuleChain.setName("testRuleChain"); + RuleChain ruleChain = testRestClient.postRuleChain(newRuleChain); + + JsonNode configuration = JacksonUtil.OBJECT_MAPPER.readTree(this.getClass().getClassLoader().getResourceAsStream(ruleChainMetadataFile)); + RuleChainMetaData ruleChainMetaData = new RuleChainMetaData(); + ruleChainMetaData.setRuleChainId(ruleChain.getId()); + ruleChainMetaData.setFirstNodeIndex(configuration.get("firstNodeIndex").asInt()); + ruleChainMetaData.setNodes(Arrays.asList(JacksonUtil.OBJECT_MAPPER.treeToValue(configuration.get("nodes"), RuleNode[].class))); + ruleChainMetaData.setConnections(Arrays.asList(JacksonUtil.OBJECT_MAPPER.treeToValue(configuration.get("connections"), NodeConnectionInfo[].class))); + + ruleChainMetaData = testRestClient.postRuleChainMetadata(ruleChainMetaData); + + testRestClient.setRootRuleChain(ruleChain.getId()); + + RuleNode node = ruleChainMetaData.getNodes().stream().filter(ruleNode -> ruleNode.getType().equals(ruleNodeType)).findFirst().get(); + + Awaitility + .await() + .alias("Get events from rule chain") + .atMost(10, TimeUnit.SECONDS) + .until(() -> { + PageData events = testRestClient.getEvents(node.getId(), EventType.LC_EVENT, ruleChain.getTenantId(), new TimePageLink(1024)); + List eventInfos = events.getData().stream().filter(eventInfo -> + "STARTED".equals(eventInfo.getBody().get("event").asText()) && + "true".equals(eventInfo.getBody().get("success").asText())) + .collect(Collectors.toList()); + + return eventInfos.size() == eventsCount; + }); + + return ruleChain.getId(); + } +} diff --git a/msa/black-box-tests/src/test/resources/MqttRuleNodeTestMetadata.json b/msa/black-box-tests/src/test/resources/MqttRuleNodeTestMetadata.json new file mode 100644 index 0000000000..7a5015add3 --- /dev/null +++ b/msa/black-box-tests/src/test/resources/MqttRuleNodeTestMetadata.json @@ -0,0 +1,79 @@ +{ + "firstNodeIndex": 2, + "nodes": [ + { + "additionalInfo": { + "description": "", + "layoutX": 626, + "layoutY": 152 + }, + "type": "org.thingsboard.rule.engine.mqtt.TbMqttNode", + "name": "test mqtt", + "debugMode": true, + "singletonMode": true, + "queueName": "HighPriority", + "configurationVersion": 0, + "configuration": { + "topicPattern": "tb/mqtt/device", + "host": "broker", + "port": 1883, + "connectTimeoutSec": 10, + "clientId": null, + "cleanSession": true, + "retainedMessage": false, + "ssl": false, + "credentials": { + "type": "anonymous" + } + }, + "externalId": null + }, + { + "additionalInfo": { + "description": "", + "layoutX": 949, + "layoutY": 153 + }, + "type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", + "name": "save timeseries", + "debugMode": true, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "defaultTTL": 0, + "skipLatestPersistence": false, + "useServerTs": false + }, + "externalId": null + }, + { + "additionalInfo": { + "description": "", + "layoutX": 305, + "layoutY": 151 + }, + "type": "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode", + "name": "switch", + "debugMode": false, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "version": 0 + }, + "externalId": null + } + ], + "connections": [ + { + "fromIndex": 0, + "toIndex": 1, + "type": "Success" + }, + { + "fromIndex": 2, + "toIndex": 0, + "type": "Post telemetry" + } + ], + "ruleChainConnections": null +} diff --git a/msa/black-box-tests/src/test/resources/docker-compose.mosquitto.yml b/msa/black-box-tests/src/test/resources/docker-compose.mosquitto.yml new file mode 100644 index 0000000000..1936f57d49 --- /dev/null +++ b/msa/black-box-tests/src/test/resources/docker-compose.mosquitto.yml @@ -0,0 +1,25 @@ +# +# Copyright © 2016-2024 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +version: '3.0' +services: + broker: + image: eclipse-mosquitto + volumes: + - ./mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf + ports: + - "1883" + restart: always diff --git a/msa/black-box-tests/src/test/resources/mosquitto/mosquitto.conf b/msa/black-box-tests/src/test/resources/mosquitto/mosquitto.conf new file mode 100644 index 0000000000..7136ad64b0 --- /dev/null +++ b/msa/black-box-tests/src/test/resources/mosquitto/mosquitto.conf @@ -0,0 +1,18 @@ +# +# Copyright © 2016-2024 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +listener 1883 +allow_anonymous true diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleNode.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleNode.java index fafd793312..2568b06560 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleNode.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleNode.java @@ -42,6 +42,8 @@ public @interface RuleNode { ComponentClusteringMode clusteringMode() default ComponentClusteringMode.ENABLED; + boolean hasQueueName() default false; + boolean inEnabled() default true; boolean outEnabled() default true; diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 7db63ae7fc..a1d7d9a97d 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -256,6 +256,8 @@ public interface TbContext { String getRuleChainName(); + String getQueueName(); + TenantId getTenantId(); AttributesService getAttributesService(); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java index 945d80525c..e600ab8ef6 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java @@ -36,7 +36,6 @@ import java.util.stream.Collectors; */ public class TbNodeUtils { - private static final Pattern DATA_PATTERN = Pattern.compile("(\\$\\[)(.*?)(])"); public static T convert(TbNodeConfiguration configuration, Class clazz) throws TbNodeException { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java index f0af634f6a..f33cc5f69d 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java @@ -15,6 +15,8 @@ */ package org.thingsboard.rule.engine.debug; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; @@ -35,6 +37,7 @@ import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.script.ScriptLanguage; +import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; @@ -44,12 +47,15 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static org.thingsboard.common.util.DonAsynchron.withCallback; +import static org.thingsboard.server.common.data.DataConstants.QUEUE_NAME; @Slf4j @RuleNode( type = ComponentType.ACTION, name = "generator", configClazz = TbMsgGeneratorNodeConfiguration.class, + version = 1, + hasQueueName = true, nodeDescription = "Periodically generates messages", nodeDetails = "Generates messages with configurable period. Javascript function used for message generation.", inEnabled = false, @@ -69,6 +75,7 @@ public class TbMsgGeneratorNode implements TbNode { private UUID nextTickId; private TbMsg prevMsg; private final AtomicBoolean initialized = new AtomicBoolean(false); + private String queueName; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { @@ -76,6 +83,7 @@ public class TbMsgGeneratorNode implements TbNode { this.config = TbNodeUtils.convert(configuration, TbMsgGeneratorNodeConfiguration.class); this.delay = TimeUnit.SECONDS.toMillis(config.getPeriodInSeconds()); this.currentMsgCount = 0; + this.queueName = ctx.getQueueName(); if (!StringUtils.isEmpty(config.getOriginatorId())) { originatorId = EntityIdFactory.getByTypeAndUuid(config.getOriginatorType(), config.getOriginatorId()); ctx.checkTenantEntity(originatorId); @@ -137,7 +145,7 @@ public class TbMsgGeneratorNode implements TbNode { } lastScheduledTs = lastScheduledTs + delay; long curDelay = Math.max(0L, (lastScheduledTs - curTs)); - TbMsg tickMsg = ctx.newMsg(config.getQueueName(), TbMsgType.GENERATOR_NODE_SELF_MSG, ctx.getSelfId(), + TbMsg tickMsg = ctx.newMsg(queueName, TbMsgType.GENERATOR_NODE_SELF_MSG, ctx.getSelfId(), getCustomerIdFromMsg(msg), TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING); nextTickId = tickMsg.getId(); ctx.tellSelf(tickMsg, curDelay); @@ -146,14 +154,14 @@ public class TbMsgGeneratorNode implements TbNode { private ListenableFuture generate(TbContext ctx, TbMsg msg) { log.trace("generate, config {}", config); if (prevMsg == null) { - prevMsg = ctx.newMsg(config.getQueueName(), TbMsg.EMPTY_STRING, originatorId, msg.getCustomerId(), TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + prevMsg = ctx.newMsg(queueName, TbMsg.EMPTY_STRING, originatorId, msg.getCustomerId(), TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); } if (initialized.get()) { ctx.logJsEvalRequest(); return Futures.transformAsync(scriptEngine.executeGenerateAsync(prevMsg), generated -> { log.trace("generate process response, generated {}, config {}", generated, config); ctx.logJsEvalResponse(); - prevMsg = ctx.newMsg(config.getQueueName(), generated.getType(), originatorId, msg.getCustomerId(), generated.getMetaData(), generated.getData()); + prevMsg = ctx.newMsg(queueName, generated.getType(), originatorId, msg.getCustomerId(), generated.getMetaData(), generated.getData()); return Futures.immediateFuture(prevMsg); }, MoreExecutors.directExecutor()); //usually it runs on js-executor-remote-callback thread pool } @@ -174,4 +182,20 @@ public class TbMsgGeneratorNode implements TbNode { scriptEngine = null; } } + + @Override + public TbPair upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException { + boolean hasChanges = false; + switch (fromVersion) { + case 0: + if (oldConfiguration.has(QUEUE_NAME)) { + hasChanges = true; + ((ObjectNode) oldConfiguration).remove(QUEUE_NAME); + } + break; + default: + break; + } + return new TbPair<>(hasChanges, oldConfiguration); + } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeConfiguration.java index de4dae0dc8..017b6e4381 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeConfiguration.java @@ -36,7 +36,6 @@ public class TbMsgGeneratorNodeConfiguration implements NodeConfiguration deduplicationMap; private long deduplicationInterval; + private String queueName; public TbMsgDeduplicationNode() { this.deduplicationMap = new HashMap<>(); @@ -75,6 +81,7 @@ public class TbMsgDeduplicationNode implements TbNode { public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { this.config = TbNodeUtils.convert(configuration, TbMsgDeduplicationNodeConfiguration.class); this.deduplicationInterval = TimeUnit.SECONDS.toMillis(config.getInterval()); + this.queueName = ctx.getQueueName(); } @Override @@ -91,6 +98,22 @@ public class TbMsgDeduplicationNode implements TbNode { deduplicationMap.clear(); } + @Override + public TbPair upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException { + boolean hasChanges = false; + switch (fromVersion) { + case 0: + if (oldConfiguration.has(QUEUE_NAME)) { + hasChanges = true; + ((ObjectNode) oldConfiguration).remove(QUEUE_NAME); + } + break; + default: + break; + } + return new TbPair<>(hasChanges, oldConfiguration); + } + private void processOnRegularMsg(TbContext ctx, TbMsg msg) { EntityId id = msg.getOriginator(); DeduplicationData deduplicationMsgs = deduplicationMap.computeIfAbsent(id, k -> new DeduplicationData()); @@ -132,7 +155,7 @@ public class TbMsgDeduplicationNode implements TbNode { } } deduplicationResults.add(TbMsg.newMsg( - config.getQueueName(), + queueName, config.getOutMsgType(), deduplicationId, getMetadata(), @@ -154,7 +177,7 @@ public class TbMsgDeduplicationNode implements TbNode { } if (resultMsg != null) { deduplicationResults.add(TbMsg.newMsg( - resultMsg.getQueueName(), + queueName != null ? queueName : resultMsg.getQueueName(), resultMsg.getType(), resultMsg.getOriginator(), resultMsg.getCustomerId(), diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNodeConfiguration.java index 72733ac161..1885c6d370 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNodeConfiguration.java @@ -26,7 +26,6 @@ public class TbMsgDeduplicationNodeConfiguration implements NodeConfiguration ctx.ack(msg), error -> ctx.tellFailure(msg, error)); } + @Override + public TbPair upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException { + boolean hasChanges = false; + switch (fromVersion) { + case 0: + if (oldConfiguration.has(QUEUE_NAME)) { + hasChanges = true; + ((ObjectNode) oldConfiguration).remove(QUEUE_NAME); + } + break; + default: + break; + } + return new TbPair<>(hasChanges, oldConfiguration); + } + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeConfiguration.java deleted file mode 100644 index 681cc67cb0..0000000000 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeConfiguration.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Copyright © 2016-2024 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.rule.engine.flow; - -import lombok.Data; -import org.thingsboard.rule.engine.api.NodeConfiguration; - -@Data -public class TbCheckpointNodeConfiguration implements NodeConfiguration { - - private String queueName; - - @Override - public TbCheckpointNodeConfiguration defaultConfiguration() { - return new TbCheckpointNodeConfiguration(); - } -} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/AbstractRuleNodeUpgradeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/AbstractRuleNodeUpgradeTest.java new file mode 100644 index 0000000000..f2509a9d68 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/AbstractRuleNodeUpgradeTest.java @@ -0,0 +1,52 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.api.TbNode; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.util.TbPair; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.BDDMockito.willCallRealMethod; + +public abstract class AbstractRuleNodeUpgradeTest { + + protected abstract TbNode getTestNode(); + + @ParameterizedTest + @MethodSource + public void givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig(int givenVersion, String givenConfigStr, boolean hasChanges, String expectedConfigStr) throws TbNodeException { + // GIVEN + willCallRealMethod().given(getTestNode()).upgrade(anyInt(), any()); + JsonNode givenConfig = JacksonUtil.toJsonNode(givenConfigStr); + JsonNode expectedConfig = JacksonUtil.toJsonNode(expectedConfigStr); + + // WHEN + TbPair upgradeResult = getTestNode().upgrade(givenVersion, givenConfig); + + // THEN + assertThat(upgradeResult.getFirst()).isEqualTo(hasChanges); + ObjectNode upgradedConfig = (ObjectNode) upgradeResult.getSecond(); + assertThat(upgradedConfig).isEqualTo(expectedConfig); + } +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeTest.java new file mode 100644 index 0000000000..13f2b2ad42 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeTest.java @@ -0,0 +1,53 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.debug; + +import org.junit.jupiter.params.provider.Arguments; +import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; +import org.thingsboard.rule.engine.api.TbNode; + +import java.util.stream.Stream; + +import static org.mockito.Mockito.spy; + +public class TbMsgGeneratorNodeTest extends AbstractRuleNodeUpgradeTest { + + // Rule nodes upgrade + private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { + return Stream.of( + // default config for version 0 + Arguments.of(0, + "{\"msgCount\":0,\"periodInSeconds\":1,\"originatorId\":null,\"originatorType\":null, \"queueName\":null, \"scriptLang\":\"TBEL\",\"jsScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\",\"tbelScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\"}", + true, + "{\"msgCount\":0,\"periodInSeconds\":1,\"originatorId\":null,\"originatorType\":null, \"scriptLang\":\"TBEL\",\"jsScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\",\"tbelScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\"}"), + // default config for version 0 with queueName + Arguments.of(0, + "{\"msgCount\":0,\"periodInSeconds\":1,\"originatorId\":null,\"originatorType\":null, \"queueName\":\"Main\", \"scriptLang\":\"TBEL\",\"jsScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\",\"tbelScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\"}", + true, + "{\"msgCount\":0,\"periodInSeconds\":1,\"originatorId\":null,\"originatorType\":null, \"scriptLang\":\"TBEL\",\"jsScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\",\"tbelScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\"}"), + // default config for version 1 with upgrade from version 0 + Arguments.of(0, + "{\"msgCount\":0,\"periodInSeconds\":1,\"originatorId\":null,\"originatorType\":null, \"scriptLang\":\"TBEL\",\"jsScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\",\"tbelScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\"}", + false, + "{\"msgCount\":0,\"periodInSeconds\":1,\"originatorId\":null,\"originatorType\":null, \"scriptLang\":\"TBEL\",\"jsScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\",\"tbelScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\"}") + ); + } + + @Override + protected TbNode getTestNode() { + return spy(TbMsgGeneratorNode.class); + } +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeTest.java new file mode 100644 index 0000000000..545f6112d8 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeTest.java @@ -0,0 +1,55 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.flow; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.params.provider.Arguments; +import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; +import org.thingsboard.rule.engine.api.TbNode; + +import java.util.stream.Stream; + +import static org.mockito.Mockito.spy; + +@Slf4j +public class TbCheckpointNodeTest extends AbstractRuleNodeUpgradeTest { + + // Rule nodes upgrade + private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { + return Stream.of( + // default config for version 0 + Arguments.of(0, + "{\"queueName\":null}", + true, + "{}"), + // default config for version 0 with queueName + Arguments.of(0, + "{\"queueName\":\"Main\"}", + true, + "{}"), + // default config for version 1 with upgrade from version 0 + Arguments.of(0, + "{}", + false, + "{}") + ); + } + + @Override + protected TbNode getTestNode() { + return spy(TbCheckpointNode.class); + } +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeTest.java index 44300c1353..23e4b37c92 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeTest.java @@ -15,7 +15,6 @@ */ package org.thingsboard.rule.engine.telemetry; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeEach; @@ -25,8 +24,10 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.server.common.data.DataConstants; @@ -40,7 +41,6 @@ import org.thingsboard.server.common.data.kv.JsonDataEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.msg.TbMsgType; -import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -53,7 +53,6 @@ import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.willCallRealMethod; @@ -65,7 +64,7 @@ import static org.mockito.Mockito.when; import static org.thingsboard.server.common.data.DataConstants.NOTIFY_DEVICE_METADATA_KEY; @Slf4j -class TbMsgAttributesNodeTest { +class TbMsgAttributesNodeTest extends AbstractRuleNodeUpgradeTest { private TenantId tenantId; private DeviceId deviceId; @@ -223,21 +222,9 @@ class TbMsgAttributesNodeTest { ); } - @ParameterizedTest - @MethodSource - void givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig(int givenVersion, String givenConfigStr, boolean hasChanges, String expectedConfigStr) throws TbNodeException { - // GIVEN - willCallRealMethod().given(node).upgrade(anyInt(), any()); - JsonNode givenConfig = JacksonUtil.toJsonNode(givenConfigStr); - JsonNode expectedConfig = JacksonUtil.toJsonNode(expectedConfigStr); - - // WHEN - TbPair upgradeResult = node.upgrade(givenVersion, givenConfig); - - // THEN - assertThat(upgradeResult.getFirst()).isEqualTo(hasChanges); - ObjectNode upgradedConfig = (ObjectNode) upgradeResult.getSecond(); - assertThat(upgradedConfig).isEqualTo(expectedConfig); + @Override + protected TbNode getTestNode() { + return node; } } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java index 22e44a9271..7832bbc1dc 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java @@ -22,12 +22,17 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.stubbing.Answer; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.deduplication.DeduplicationStrategy; @@ -55,6 +60,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import java.util.stream.Stream; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -68,7 +74,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @Slf4j -public class TbMsgDeduplicationNodeTest { +public class TbMsgDeduplicationNodeTest extends AbstractRuleNodeUpgradeTest { private TbContext ctx; @@ -139,13 +145,24 @@ public class TbMsgDeduplicationNodeTest { node.destroy(); } - @Test - public void given_100_messages_strategy_first_then_verifyOutput() throws TbNodeException, ExecutionException, InterruptedException { + private static Stream given_100_messages_strategy_first_then_verifyOutput() { + return Stream.of( + Arguments.of((String) null), + Arguments.of(DataConstants.MAIN_QUEUE_NAME), + Arguments.of(DataConstants.HP_QUEUE_NAME) + ); + } + + @ParameterizedTest + @MethodSource + public void given_100_messages_strategy_first_then_verifyOutput(String queueName) throws TbNodeException, ExecutionException, InterruptedException { int wantedNumberOfTellSelfInvocation = 2; int msgCount = 100; awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); invokeTellSelf(wantedNumberOfTellSelfInvocation); + when(ctx.getQueueName()).thenReturn(queueName); + config.setInterval(deduplicationInterval); config.setMaxPendingMsgs(msgCount); nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); @@ -182,6 +199,12 @@ public class TbMsgDeduplicationNodeTest { Assertions.assertEquals(firstMsg.getData(), actualMsg.getData()); Assertions.assertEquals(firstMsg.getMetaData(), actualMsg.getMetaData()); Assertions.assertEquals(firstMsg.getType(), actualMsg.getType()); + + if (queueName == null) { + Assertions.assertEquals(firstMsg.getQueueName(), actualMsg.getQueueName()); + } else { + Assertions.assertEquals(ctx.getQueueName(), actualMsg.getQueueName()); + } } @Test @@ -238,10 +261,10 @@ public class TbMsgDeduplicationNodeTest { awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); invokeTellSelf(wantedNumberOfTellSelfInvocation); + when(ctx.getQueueName()).thenReturn(DataConstants.HP_QUEUE_NAME); config.setInterval(deduplicationInterval); config.setStrategy(DeduplicationStrategy.ALL); config.setOutMsgType(TbMsgType.POST_ATTRIBUTES_REQUEST.name()); - config.setQueueName(DataConstants.HP_QUEUE_NAME); nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); node.init(ctx, nodeConfiguration); @@ -268,7 +291,7 @@ public class TbMsgDeduplicationNodeTest { Assertions.assertEquals(getMergedData(inputMsgs), outMessage.getData()); Assertions.assertEquals(deviceId, outMessage.getOriginator()); Assertions.assertEquals(config.getOutMsgType(), outMessage.getType()); - Assertions.assertEquals(config.getQueueName(), outMessage.getQueueName()); + Assertions.assertEquals(DataConstants.HP_QUEUE_NAME, outMessage.getQueueName()); } @Test @@ -278,10 +301,10 @@ public class TbMsgDeduplicationNodeTest { awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); invokeTellSelf(wantedNumberOfTellSelfInvocation, true, 3); + when(ctx.getQueueName()).thenReturn(DataConstants.HP_QUEUE_NAME); config.setInterval(deduplicationInterval); config.setStrategy(DeduplicationStrategy.ALL); config.setOutMsgType(TbMsgType.POST_ATTRIBUTES_REQUEST.name()); - config.setQueueName(DataConstants.HP_QUEUE_NAME); nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); node.init(ctx, nodeConfiguration); @@ -292,7 +315,7 @@ public class TbMsgDeduplicationNodeTest { for (TbMsg msg : firstMsgPack) { node.onMsg(ctx, msg); } - long firstPackDeduplicationPackEndTs = firstMsgPack.get(0).getMetaDataTs() + TimeUnit.SECONDS.toMillis(deduplicationInterval); + long firstPackDeduplicationPackEndTs = firstMsgPack.get(0).getMetaDataTs() + TimeUnit.SECONDS.toMillis(deduplicationInterval); List secondMsgPack = getTbMsgs(deviceId, msgCount / 2, firstPackDeduplicationPackEndTs, 500); for (TbMsg msg : secondMsgPack) { @@ -316,13 +339,13 @@ public class TbMsgDeduplicationNodeTest { Assertions.assertEquals(getMergedData(firstMsgPack), firstMsg.getData()); Assertions.assertEquals(deviceId, firstMsg.getOriginator()); Assertions.assertEquals(config.getOutMsgType(), firstMsg.getType()); - Assertions.assertEquals(config.getQueueName(), firstMsg.getQueueName()); + Assertions.assertEquals(DataConstants.HP_QUEUE_NAME, firstMsg.getQueueName()); TbMsg secondMsg = resultMsgs.get(1); Assertions.assertEquals(getMergedData(secondMsgPack), secondMsg.getData()); Assertions.assertEquals(deviceId, secondMsg.getOriginator()); Assertions.assertEquals(config.getOutMsgType(), secondMsg.getType()); - Assertions.assertEquals(config.getQueueName(), secondMsg.getQueueName()); + Assertions.assertEquals(DataConstants.HP_QUEUE_NAME, secondMsg.getQueueName()); } @Test @@ -385,6 +408,27 @@ public class TbMsgDeduplicationNodeTest { Assertions.assertEquals(msgWithLatestTsInSecondPack.getType(), actualMsg.getType()); } + // Rule nodes upgrade + private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { + return Stream.of( + // default config for version 0 + Arguments.of(0, + "{\"interval\":60,\"strategy\":\"FIRST\",\"outMsgType\":null,\"maxPendingMsgs\":100,\"maxRetries\":3, \"queueName\":null}", + true, + "{\"interval\":60,\"strategy\":\"FIRST\",\"outMsgType\":null,\"maxPendingMsgs\":100,\"maxRetries\":3}"), + // default config for version 0 with queueName + Arguments.of(0, + "{\"interval\":60,\"strategy\":\"FIRST\",\"outMsgType\":null,\"maxPendingMsgs\":100,\"maxRetries\":3, \"queueName\":\"Main\"}", + true, + "{\"interval\":60,\"strategy\":\"FIRST\",\"outMsgType\":null,\"maxPendingMsgs\":100,\"maxRetries\":3}"), + // default config for version 1 with upgrade from version 0 + Arguments.of(0, + "{\"interval\":60,\"strategy\":\"FIRST\",\"outMsgType\":null,\"maxPendingMsgs\":100,\"maxRetries\":3}", + false, + "{\"interval\":60,\"strategy\":\"FIRST\",\"outMsgType\":null,\"maxPendingMsgs\":100,\"maxRetries\":3}") + ); + } + private TbMsg getMsgWithLatestTs(List firstMsgPack) { int indexOfLastMsgInArray = firstMsgPack.size() - 1; int indexToSetMaxTs = new Random().nextInt(indexOfLastMsgInArray) + 1; @@ -429,4 +473,9 @@ public class TbMsgDeduplicationNodeTest { return JacksonUtil.toString(mergedData); } + @Override + protected TbNode getTestNode() { + return node; + } + }