Browse Source

Merge pull request #9828 from YevhenBondarenko/feature/queue-in-rule-node

moved queueName from rule-node config to rule-node
pull/9966/merge
Andrew Shvayka 2 years ago
committed by GitHub
parent
commit
fb66d2fdfd
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      application/src/main/data/upgrade/3.6.2/schema_update.sql
  2. 9
      application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
  3. 1
      application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java
  4. 1
      application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/RuleChainImportService.java
  5. 6
      application/src/main/java/org/thingsboard/server/utils/TbNodeUpgradeUtils.java
  6. 1
      common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
  7. 7
      common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentDescriptor.java
  8. 6
      common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java
  9. 2
      dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
  10. 5
      dao/src/main/java/org/thingsboard/server/dao/model/sql/ComponentDescriptorEntity.java
  11. 5
      dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeEntity.java
  12. 3
      dao/src/main/java/org/thingsboard/server/dao/sql/component/AbstractComponentDescriptorInsertRepository.java
  13. 4
      dao/src/main/java/org/thingsboard/server/dao/sql/component/SqlComponentDescriptorInsertRepository.java
  14. 4
      dao/src/main/resources/sql/schema-entities.sql
  15. 2
      msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java
  16. 9
      msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestProperties.java
  17. 41
      msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java
  18. 204
      msa/black-box-tests/src/test/java/org/thingsboard/server/msa/rule/node/MqttNodeTest.java
  19. 79
      msa/black-box-tests/src/test/resources/MqttRuleNodeTestMetadata.json
  20. 25
      msa/black-box-tests/src/test/resources/docker-compose.mosquitto.yml
  21. 18
      msa/black-box-tests/src/test/resources/mosquitto/mosquitto.conf
  22. 2
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleNode.java
  23. 2
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
  24. 1
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java
  25. 30
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java
  26. 1
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeConfiguration.java
  27. 27
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java
  28. 1
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNodeConfiguration.java
  29. 31
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java
  30. 30
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeConfiguration.java
  31. 52
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/AbstractRuleNodeUpgradeTest.java
  32. 53
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeTest.java
  33. 55
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeTest.java
  34. 25
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeTest.java
  35. 67
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java

7
application/src/main/data/upgrade/3.6.2/schema_update.sql

@ -21,3 +21,10 @@ DROP INDEX IF EXISTS idx_rule_node_type_configuration_version;
CREATE INDEX IF NOT EXISTS idx_rule_node_type_id_configuration_version ON rule_node(type, id, configuration_version);
-- RULE NODE INDEXES UPDATE END
-- RULE NODE QUEUE UPDATE START
ALTER TABLE rule_node ADD COLUMN IF NOT EXISTS queue_name varchar(255);
ALTER TABLE component_descriptor ADD COLUMN IF NOT EXISTS has_queue_name boolean DEFAULT false;
-- RULE NODE QUEUE UPDATE END

9
application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java

@ -186,7 +186,7 @@ class DefaultTbContext implements TbContext {
@Override
public void enqueue(TbMsg tbMsg, Runnable onSuccess, Consumer<Throwable> onFailure) {
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator());
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator());
enqueue(tpi, tbMsg, onFailure, onSuccess);
}
@ -311,7 +311,7 @@ class DefaultTbContext implements TbContext {
@Override
public boolean isLocalEntity(EntityId entityId) {
return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), entityId).isMyPartition();
return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), entityId).isMyPartition();
}
private void scheduleMsgWithDelay(TbActorMsg msg, long delayInMs, TbActorRef target) {
@ -510,6 +510,11 @@ class DefaultTbContext implements TbContext {
return ruleChainName;
}
@Override
public String getQueueName() {
return getSelf().getQueueName();
}
@Override
public TenantId getTenantId() {
return nodeCtx.getTenantId();

1
application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java

@ -189,6 +189,7 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
scannedComponent.setName(ruleNodeAnnotation.name());
scannedComponent.setScope(ruleNodeAnnotation.scope());
scannedComponent.setClusteringMode(ruleNodeAnnotation.clusteringMode());
scannedComponent.setHasQueueName(ruleNodeAnnotation.hasQueueName());
NodeDefinition nodeDefinition = prepareNodeDefinition(clazz, ruleNodeAnnotation);
ObjectNode configurationDescriptor = JacksonUtil.newObjectNode();
JsonNode node = JacksonUtil.valueToTree(nodeDefinition);

1
application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/RuleChainImportService.java

@ -21,7 +21,6 @@ import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;

6
application/src/main/java/org/thingsboard/server/utils/TbNodeUpgradeUtils.java

@ -25,6 +25,8 @@ import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.service.component.RuleNodeClassInfo;
import static org.thingsboard.server.common.data.DataConstants.QUEUE_NAME;
@Slf4j
public class TbNodeUpgradeUtils {
@ -44,9 +46,13 @@ public class TbNodeUpgradeUtils {
} else {
var tbVersionedNode = getTbVersionedNode(nodeInfo);
try {
JsonNode queueName = oldConfiguration.get(QUEUE_NAME);
TbPair<Boolean, JsonNode> upgradeResult = tbVersionedNode.upgrade(configurationVersion, oldConfiguration);
if (upgradeResult.getFirst()) {
node.setConfiguration(upgradeResult.getSecond());
if (nodeInfo.getAnnotation().hasQueueName() && queueName != null && queueName.isTextual()) {
node.setQueueName(queueName.asText());
}
}
} catch (Exception e) {
try {

1
common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java

@ -136,5 +136,6 @@ public class DataConstants {
public static final String HP_QUEUE_TOPIC = "tb_rule_engine.hp";
public static final String SQ_QUEUE_NAME = "SequentialByOriginator";
public static final String SQ_QUEUE_TOPIC = "tb_rule_engine.sq";
public static final String QUEUE_NAME = "queueName";
}

7
common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentDescriptor.java

@ -55,6 +55,8 @@ public class ComponentDescriptor extends BaseData<ComponentDescriptorId> {
@Length(fieldName = "actions")
@ApiModelProperty(position = 10, value = "Rule Node Actions. Deprecated. Always null.", accessMode = ApiModelProperty.AccessMode.READ_ONLY)
@Getter @Setter private String actions;
@ApiModelProperty(position = 11, value = "Indicates that the RuleNode supports queue name configuration.", accessMode = ApiModelProperty.AccessMode.READ_ONLY, example = "true")
@Getter @Setter private boolean hasQueueName;
public ComponentDescriptor() {
super();
@ -74,6 +76,7 @@ public class ComponentDescriptor extends BaseData<ComponentDescriptorId> {
this.configurationDescriptor = plugin.getConfigurationDescriptor();
this.configurationVersion = plugin.getConfigurationVersion();
this.actions = plugin.getActions();
this.hasQueueName = plugin.isHasQueueName();
}
@ApiModelProperty(position = 1, value = "JSON object with the descriptor Id. " +
@ -104,6 +107,8 @@ public class ComponentDescriptor extends BaseData<ComponentDescriptorId> {
if (!Objects.equals(actions, that.actions)) return false;
if (!Objects.equals(configurationDescriptor, that.configurationDescriptor)) return false;
if (configurationVersion != that.configurationVersion) return false;
if (clusteringMode != that.clusteringMode) return false;
if (hasQueueName != that.isHasQueueName()) return false;
return Objects.equals(clazz, that.clazz);
}
@ -115,6 +120,8 @@ public class ComponentDescriptor extends BaseData<ComponentDescriptorId> {
result = 31 * result + (name != null ? name.hashCode() : 0);
result = 31 * result + (clazz != null ? clazz.hashCode() : 0);
result = 31 * result + (actions != null ? actions.hashCode() : 0);
result = 31 * result + (clusteringMode != null ? clusteringMode.hashCode() : 0);
result = 31 * result + (hasQueueName ? 1 : 0);
return result;
}

6
common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java

@ -50,9 +50,11 @@ public class RuleNode extends BaseDataWithAdditionalInfo<RuleNodeId> implements
private boolean debugMode;
@ApiModelProperty(position = 7, value = "Enable/disable singleton mode. ", example = "false")
private boolean singletonMode;
@ApiModelProperty(position = 8, value = "Version of rule node configuration. ", example = "0")
@ApiModelProperty(position = 8, value = "Queue name. ", example = "Main")
private String queueName;
@ApiModelProperty(position = 9, value = "Version of rule node configuration. ", example = "0")
private int configurationVersion;
@ApiModelProperty(position = 9, value = "JSON with the rule node configuration. Structure depends on the rule node implementation.", dataType = "com.fasterxml.jackson.databind.JsonNode")
@ApiModelProperty(position = 10, value = "JSON with the rule node configuration. Structure depends on the rule node implementation.", dataType = "com.fasterxml.jackson.databind.JsonNode")
private transient JsonNode configuration;
@JsonIgnore
private byte[] configurationBytes;

2
dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java

@ -354,6 +354,7 @@ public class ModelConstants {
public static final String COMPONENT_DESCRIPTOR_CONFIGURATION_DESCRIPTOR_PROPERTY = "configuration_descriptor";
public static final String COMPONENT_DESCRIPTOR_CONFIGURATION_VERSION_PROPERTY = "configuration_version";
public static final String COMPONENT_DESCRIPTOR_ACTIONS_PROPERTY = "actions";
public static final String COMPONENT_DESCRIPTOR_HAS_QUEUE_NAME_PROPERTY = "has_queue_name";
/**
* Event constants.
@ -389,6 +390,7 @@ public class ModelConstants {
public static final String DEBUG_MODE = "debug_mode";
public static final String SINGLETON_MODE = "singleton_mode";
public static final String QUEUE_NAME = "queue_name";
/**
* Rule chain constants.

5
dao/src/main/java/org/thingsboard/server/dao/model/sql/ComponentDescriptorEntity.java

@ -70,6 +70,9 @@ public class ComponentDescriptorEntity extends BaseSqlEntity<ComponentDescriptor
@Column(name = ModelConstants.COMPONENT_DESCRIPTOR_ACTIONS_PROPERTY)
private String actions;
@Column(name = ModelConstants.COMPONENT_DESCRIPTOR_HAS_QUEUE_NAME_PROPERTY)
private boolean hasQueueName;
public ComponentDescriptorEntity() {
}
@ -86,6 +89,7 @@ public class ComponentDescriptorEntity extends BaseSqlEntity<ComponentDescriptor
this.clazz = component.getClazz();
this.configurationDescriptor = component.getConfigurationDescriptor();
this.configurationVersion = component.getConfigurationVersion();
this.hasQueueName = component.isHasQueueName();
}
@Override
@ -100,6 +104,7 @@ public class ComponentDescriptorEntity extends BaseSqlEntity<ComponentDescriptor
data.setActions(this.getActions());
data.setConfigurationDescriptor(configurationDescriptor);
data.setConfigurationVersion(configurationVersion);
data.setHasQueueName(hasQueueName);
return data;
}
}

5
dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeEntity.java

@ -66,6 +66,9 @@ public class RuleNodeEntity extends BaseSqlEntity<RuleNode> {
@Column(name = ModelConstants.SINGLETON_MODE)
private boolean singletonMode;
@Column(name = ModelConstants.QUEUE_NAME)
private String queueName;
@Column(name = ModelConstants.EXTERNAL_ID_PROPERTY)
private UUID externalId;
@ -84,6 +87,7 @@ public class RuleNodeEntity extends BaseSqlEntity<RuleNode> {
this.name = ruleNode.getName();
this.debugMode = ruleNode.isDebugMode();
this.singletonMode = ruleNode.isSingletonMode();
this.queueName = ruleNode.getQueueName();
this.configurationVersion = ruleNode.getConfigurationVersion();
this.configuration = ruleNode.getConfiguration();
this.additionalInfo = ruleNode.getAdditionalInfo();
@ -103,6 +107,7 @@ public class RuleNodeEntity extends BaseSqlEntity<RuleNode> {
ruleNode.setName(name);
ruleNode.setDebugMode(debugMode);
ruleNode.setSingletonMode(singletonMode);
ruleNode.setQueueName(queueName);
ruleNode.setConfigurationVersion(configurationVersion);
ruleNode.setConfiguration(configuration);
ruleNode.setAdditionalInfo(additionalInfo);

3
dao/src/main/java/org/thingsboard/server/dao/sql/component/AbstractComponentDescriptorInsertRepository.java

@ -77,7 +77,8 @@ public abstract class AbstractComponentDescriptorInsertRepository implements Com
.setParameter("name", entity.getName())
.setParameter("scope", entity.getScope().name())
.setParameter("type", entity.getType().name())
.setParameter("clustering_mode", entity.getClusteringMode().name());
.setParameter("clustering_mode", entity.getClusteringMode().name())
.setParameter("has_queue_name", entity.isHasQueueName());
}
private ComponentDescriptorEntity processSaveOrUpdate(ComponentDescriptorEntity entity, String query) {

4
dao/src/main/java/org/thingsboard/server/dao/sql/component/SqlComponentDescriptorInsertRepository.java

@ -44,10 +44,10 @@ public class SqlComponentDescriptorInsertRepository extends AbstractComponentDes
}
private static String getInsertOrUpdateStatement(String conflictKeyStatement, String updateKeyStatement) {
return "INSERT INTO component_descriptor (id, created_time, actions, clazz, configuration_descriptor, configuration_version, name, scope, type, clustering_mode) VALUES (:id, :created_time, :actions, :clazz, :configuration_descriptor, :configuration_version, :name, :scope, :type, :clustering_mode) ON CONFLICT " + conflictKeyStatement + " DO UPDATE SET " + updateKeyStatement + " returning *";
return "INSERT INTO component_descriptor (id, created_time, actions, clazz, configuration_descriptor, configuration_version, name, scope, type, clustering_mode, has_queue_name) VALUES (:id, :created_time, :actions, :clazz, :configuration_descriptor, :configuration_version, :name, :scope, :type, :clustering_mode, :has_queue_name) ON CONFLICT " + conflictKeyStatement + " DO UPDATE SET " + updateKeyStatement + " returning *";
}
private static String getUpdateStatement(String id) {
return "actions = :actions, " + id + ",created_time = :created_time, configuration_descriptor = :configuration_descriptor, configuration_version = :configuration_version, name = :name, scope = :scope, type = :type, clustering_mode = :clustering_mode";
return "actions = :actions, " + id + ",created_time = :created_time, configuration_descriptor = :configuration_descriptor, configuration_version = :configuration_version, name = :name, scope = :scope, type = :type, clustering_mode = :clustering_mode, has_queue_name = :has_queue_name";
}
}

4
dao/src/main/resources/sql/schema-entities.sql

@ -126,7 +126,8 @@ CREATE TABLE IF NOT EXISTS component_descriptor (
name varchar(255),
scope varchar(255),
type varchar(255),
clustering_mode varchar(255)
clustering_mode varchar(255),
has_queue_name boolean DEFAULT false
);
CREATE TABLE IF NOT EXISTS customer (
@ -187,6 +188,7 @@ CREATE TABLE IF NOT EXISTS rule_node (
name varchar(255),
debug_mode boolean,
singleton_mode boolean,
queue_name varchar(255),
external_id uuid
);

2
msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java

@ -107,6 +107,7 @@ public class ContainerTestSuite {
List<File> composeFiles = new ArrayList<>(Arrays.asList(
new File(targetDir + "docker-compose.yml"),
new File(targetDir + "docker-compose.volumes.yml"),
new File(targetDir + "docker-compose.mosquitto.yml"),
new File(targetDir + (IS_HYBRID_MODE ? "docker-compose.hybrid.yml" : "docker-compose.postgres.yml")),
new File(targetDir + (IS_HYBRID_MODE ? "docker-compose.hybrid-test-extras.yml" : "docker-compose.postgres-test-extras.yml")),
new File(targetDir + "docker-compose.postgres.volumes.yml"),
@ -162,6 +163,7 @@ public class ContainerTestSuite {
.withEnv(queueEnv)
.withEnv("LOAD_BALANCER_NAME", "")
.withExposedService("haproxy", 80, Wait.forHttp("/swagger-ui.html").withStartupTimeout(CONTAINER_STARTUP_TIMEOUT))
.withExposedService("broker", 1883)
.waitingFor("tb-core1", Wait.forLogMessage(TB_CORE_LOG_REGEXP, 1).withStartupTimeout(CONTAINER_STARTUP_TIMEOUT))
.waitingFor("tb-core2", Wait.forLogMessage(TB_CORE_LOG_REGEXP, 1).withStartupTimeout(CONTAINER_STARTUP_TIMEOUT))
.waitingFor("tb-http-transport1", Wait.forLogMessage(TRANSPORTS_LOG_REGEXP, 1).withStartupTimeout(CONTAINER_STARTUP_TIMEOUT))

9
msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestProperties.java

@ -48,4 +48,13 @@ public class TestProperties {
}
return System.getProperty("tb.wsUrl", "ws://localhost:8080");
}
public static String getMqttBrokerUrl() {
if (instance.isActive()) {
String host = instance.getTestContainer().getServiceHost("broker", 1883);
Integer port = instance.getTestContainer().getServicePort("broker", 1883);
return "tcp://" + host + ":" + port;
}
return System.getProperty("mqtt.broker", "tcp://localhost:1883");
}
}

41
msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java

@ -31,10 +31,12 @@ import org.thingsboard.server.common.data.Dashboard;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.EventInfo;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.asset.AssetProfile;
import org.thingsboard.server.common.data.event.EventType;
import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.AssetProfileId;
@ -45,9 +47,11 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.rule.RuleChain;
@ -527,4 +531,41 @@ public class TestRestClient {
.then()
.statusCode(HTTP_OK);
}
public PageData<EventInfo> getEvents(EntityId entityId, EventType eventType, TenantId tenantId, TimePageLink pageLink) {
Map<String, String> params = new HashMap<>();
params.put("entityType", entityId.getEntityType().name());
params.put("entityId", entityId.getId().toString());
params.put("eventType", eventType.name());
params.put("tenantId", tenantId.getId().toString());
addTimePageLinkToParam(params, pageLink);
return given().spec(requestSpec)
.get("/api/events/{entityType}/{entityId}/{eventType}?tenantId={tenantId}&" + getTimeUrlParams(pageLink), params)
.then()
.statusCode(HTTP_OK)
.extract()
.as(new TypeRef<>() {});
}
private void addTimePageLinkToParam(Map<String, String> params, TimePageLink pageLink) {
this.addPageLinkToParam(params, pageLink);
if (pageLink.getStartTime() != null) {
params.put("startTime", String.valueOf(pageLink.getStartTime()));
}
if (pageLink.getEndTime() != null) {
params.put("endTime", String.valueOf(pageLink.getEndTime()));
}
}
private String getTimeUrlParams(TimePageLink pageLink) {
String urlParams = getUrlParams(pageLink);
if (pageLink.getStartTime() != null) {
urlParams += "&startTime={startTime}";
}
if (pageLink.getEndTime() != null) {
urlParams += "&endTime={endTime}";
}
return urlParams;
}
}

204
msa/black-box-tests/src/test/java/org/thingsboard/server/msa/rule/node/MqttNodeTest.java

@ -0,0 +1,204 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.msa.rule.node;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EventInfo;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.event.EventType;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.data.rule.NodeConnectionInfo;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.msa.AbstractContainerTest;
import org.thingsboard.server.msa.DisableUIListeners;
import org.thingsboard.server.msa.TestProperties;
import org.thingsboard.server.msa.WsClient;
import org.thingsboard.server.msa.mapper.WsTelemetryResponse;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.fail;
import static org.thingsboard.server.msa.prototypes.DevicePrototypes.defaultDevicePrototype;
@DisableUIListeners
@Slf4j
public class MqttNodeTest extends AbstractContainerTest {
private static final String TOPIC = "tb/mqtt/device";
private Device device;
@BeforeMethod
public void setUp() {
testRestClient.login("tenant@thingsboard.org", "tenant");
device = testRestClient.postDevice("", defaultDevicePrototype("mqtt_"));
}
@AfterMethod
public void tearDown() {
testRestClient.deleteDeviceIfExists(device.getId());
}
@Test
public void telemetryUpload() throws Exception {
RuleChainId defaultRuleChainId = getDefaultRuleChainId();
createRootRuleChainWithTestNode("MqttRuleNodeTestMetadata.json", "org.thingsboard.rule.engine.mqtt.TbMqttNode", 2);
DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId());
WsClient wsClient = subscribeToWebSocket(device.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS);
MqttMessageListener messageListener = new MqttMessageListener();
MqttClient responseClient = new MqttClient(TestProperties.getMqttBrokerUrl(), StringUtils.randomAlphanumeric(10), new MemoryPersistence());
responseClient.connect();
responseClient.subscribe(TOPIC, messageListener);
MqttClient mqttClient = new MqttClient("tcp://localhost:1883", StringUtils.randomAlphanumeric(10), new MemoryPersistence());
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName(deviceCredentials.getCredentialsId());
mqttClient.connect(mqttConnectOptions);
mqttClient.publish("v1/devices/me/telemetry", new MqttMessage(createPayload().toString().getBytes()));
WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage();
log.info("Received telemetry: {}", actualLatestTelemetry);
wsClient.closeBlocking();
assertThat(actualLatestTelemetry.getData()).hasSize(4);
assertThat(actualLatestTelemetry.getLatestValues().keySet()).containsOnlyOnceElementsOf(Arrays.asList("booleanKey", "stringKey", "doubleKey", "longKey"));
assertThat(actualLatestTelemetry.getDataValuesByKey("booleanKey").get(1)).isEqualTo(Boolean.TRUE.toString());
assertThat(actualLatestTelemetry.getDataValuesByKey("stringKey").get(1)).isEqualTo("value1");
assertThat(actualLatestTelemetry.getDataValuesByKey("doubleKey").get(1)).isEqualTo(Double.toString(42.0));
assertThat(actualLatestTelemetry.getDataValuesByKey("longKey").get(1)).isEqualTo(Long.toString(73));
Awaitility
.await()
.alias("Get integration events")
.atMost(10, TimeUnit.SECONDS)
.until(() -> messageListener.getEvents().size() > 0);
BlockingQueue<MqttEvent> events = messageListener.getEvents();
JsonNode actual = JacksonUtil.toJsonNode(Objects.requireNonNull(events.poll()).message);
assertThat(actual.get("stringKey").asText()).isEqualTo("value1");
assertThat(actual.get("booleanKey").asBoolean()).isEqualTo(Boolean.TRUE);
assertThat(actual.get("doubleKey").asDouble()).isEqualTo(42.0);
assertThat(actual.get("longKey").asLong()).isEqualTo(73);
testRestClient.setRootRuleChain(defaultRuleChainId);
}
@Data
private class MqttMessageListener implements IMqttMessageListener {
private final BlockingQueue<MqttEvent> events;
private MqttMessageListener() {
events = new ArrayBlockingQueue<>(100);
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) {
log.info("MQTT message [{}], topic [{}]", mqttMessage.toString(), s);
events.add(new MqttEvent(s, mqttMessage.toString()));
}
public BlockingQueue<MqttEvent> getEvents() {
return events;
}
}
@Data
private class MqttEvent {
private final String topic;
private final String message;
}
private RuleChainId getDefaultRuleChainId() {
PageData<RuleChain> ruleChains = testRestClient.getRuleChains(new PageLink(40, 0));
Optional<RuleChain> defaultRuleChain = ruleChains.getData()
.stream()
.filter(RuleChain::isRoot)
.findFirst();
if (!defaultRuleChain.isPresent()) {
fail("Root rule chain wasn't found");
}
return defaultRuleChain.get().getId();
}
protected RuleChainId createRootRuleChainWithTestNode(String ruleChainMetadataFile, String ruleNodeType, int eventsCount) throws Exception {
RuleChain newRuleChain = new RuleChain();
newRuleChain.setName("testRuleChain");
RuleChain ruleChain = testRestClient.postRuleChain(newRuleChain);
JsonNode configuration = JacksonUtil.OBJECT_MAPPER.readTree(this.getClass().getClassLoader().getResourceAsStream(ruleChainMetadataFile));
RuleChainMetaData ruleChainMetaData = new RuleChainMetaData();
ruleChainMetaData.setRuleChainId(ruleChain.getId());
ruleChainMetaData.setFirstNodeIndex(configuration.get("firstNodeIndex").asInt());
ruleChainMetaData.setNodes(Arrays.asList(JacksonUtil.OBJECT_MAPPER.treeToValue(configuration.get("nodes"), RuleNode[].class)));
ruleChainMetaData.setConnections(Arrays.asList(JacksonUtil.OBJECT_MAPPER.treeToValue(configuration.get("connections"), NodeConnectionInfo[].class)));
ruleChainMetaData = testRestClient.postRuleChainMetadata(ruleChainMetaData);
testRestClient.setRootRuleChain(ruleChain.getId());
RuleNode node = ruleChainMetaData.getNodes().stream().filter(ruleNode -> ruleNode.getType().equals(ruleNodeType)).findFirst().get();
Awaitility
.await()
.alias("Get events from rule chain")
.atMost(10, TimeUnit.SECONDS)
.until(() -> {
PageData<EventInfo> events = testRestClient.getEvents(node.getId(), EventType.LC_EVENT, ruleChain.getTenantId(), new TimePageLink(1024));
List<EventInfo> eventInfos = events.getData().stream().filter(eventInfo ->
"STARTED".equals(eventInfo.getBody().get("event").asText()) &&
"true".equals(eventInfo.getBody().get("success").asText()))
.collect(Collectors.toList());
return eventInfos.size() == eventsCount;
});
return ruleChain.getId();
}
}

79
msa/black-box-tests/src/test/resources/MqttRuleNodeTestMetadata.json

@ -0,0 +1,79 @@
{
"firstNodeIndex": 2,
"nodes": [
{
"additionalInfo": {
"description": "",
"layoutX": 626,
"layoutY": 152
},
"type": "org.thingsboard.rule.engine.mqtt.TbMqttNode",
"name": "test mqtt",
"debugMode": true,
"singletonMode": true,
"queueName": "HighPriority",
"configurationVersion": 0,
"configuration": {
"topicPattern": "tb/mqtt/device",
"host": "broker",
"port": 1883,
"connectTimeoutSec": 10,
"clientId": null,
"cleanSession": true,
"retainedMessage": false,
"ssl": false,
"credentials": {
"type": "anonymous"
}
},
"externalId": null
},
{
"additionalInfo": {
"description": "",
"layoutX": 949,
"layoutY": 153
},
"type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode",
"name": "save timeseries",
"debugMode": true,
"singletonMode": false,
"configurationVersion": 0,
"configuration": {
"defaultTTL": 0,
"skipLatestPersistence": false,
"useServerTs": false
},
"externalId": null
},
{
"additionalInfo": {
"description": "",
"layoutX": 305,
"layoutY": 151
},
"type": "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode",
"name": "switch",
"debugMode": false,
"singletonMode": false,
"configurationVersion": 0,
"configuration": {
"version": 0
},
"externalId": null
}
],
"connections": [
{
"fromIndex": 0,
"toIndex": 1,
"type": "Success"
},
{
"fromIndex": 2,
"toIndex": 0,
"type": "Post telemetry"
}
],
"ruleChainConnections": null
}

25
msa/black-box-tests/src/test/resources/docker-compose.mosquitto.yml

@ -0,0 +1,25 @@
#
# Copyright © 2016-2024 The Thingsboard Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
version: '3.0'
services:
broker:
image: eclipse-mosquitto
volumes:
- ./mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf
ports:
- "1883"
restart: always

18
msa/black-box-tests/src/test/resources/mosquitto/mosquitto.conf

@ -0,0 +1,18 @@
#
# Copyright © 2016-2024 The Thingsboard Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
listener 1883
allow_anonymous true

2
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleNode.java

@ -42,6 +42,8 @@ public @interface RuleNode {
ComponentClusteringMode clusteringMode() default ComponentClusteringMode.ENABLED;
boolean hasQueueName() default false;
boolean inEnabled() default true;
boolean outEnabled() default true;

2
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java

@ -256,6 +256,8 @@ public interface TbContext {
String getRuleChainName();
String getQueueName();
TenantId getTenantId();
AttributesService getAttributesService();

1
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java

@ -36,7 +36,6 @@ import java.util.stream.Collectors;
*/
public class TbNodeUtils {
private static final Pattern DATA_PATTERN = Pattern.compile("(\\$\\[)(.*?)(])");
public static <T> T convert(TbNodeConfiguration configuration, Class<T> clazz) throws TbNodeException {

30
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java

@ -15,6 +15,8 @@
*/
package org.thingsboard.rule.engine.debug;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
@ -35,6 +37,7 @@ import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.data.script.ScriptLanguage;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
@ -44,12 +47,15 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.thingsboard.common.util.DonAsynchron.withCallback;
import static org.thingsboard.server.common.data.DataConstants.QUEUE_NAME;
@Slf4j
@RuleNode(
type = ComponentType.ACTION,
name = "generator",
configClazz = TbMsgGeneratorNodeConfiguration.class,
version = 1,
hasQueueName = true,
nodeDescription = "Periodically generates messages",
nodeDetails = "Generates messages with configurable period. Javascript function used for message generation.",
inEnabled = false,
@ -69,6 +75,7 @@ public class TbMsgGeneratorNode implements TbNode {
private UUID nextTickId;
private TbMsg prevMsg;
private final AtomicBoolean initialized = new AtomicBoolean(false);
private String queueName;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
@ -76,6 +83,7 @@ public class TbMsgGeneratorNode implements TbNode {
this.config = TbNodeUtils.convert(configuration, TbMsgGeneratorNodeConfiguration.class);
this.delay = TimeUnit.SECONDS.toMillis(config.getPeriodInSeconds());
this.currentMsgCount = 0;
this.queueName = ctx.getQueueName();
if (!StringUtils.isEmpty(config.getOriginatorId())) {
originatorId = EntityIdFactory.getByTypeAndUuid(config.getOriginatorType(), config.getOriginatorId());
ctx.checkTenantEntity(originatorId);
@ -137,7 +145,7 @@ public class TbMsgGeneratorNode implements TbNode {
}
lastScheduledTs = lastScheduledTs + delay;
long curDelay = Math.max(0L, (lastScheduledTs - curTs));
TbMsg tickMsg = ctx.newMsg(config.getQueueName(), TbMsgType.GENERATOR_NODE_SELF_MSG, ctx.getSelfId(),
TbMsg tickMsg = ctx.newMsg(queueName, TbMsgType.GENERATOR_NODE_SELF_MSG, ctx.getSelfId(),
getCustomerIdFromMsg(msg), TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING);
nextTickId = tickMsg.getId();
ctx.tellSelf(tickMsg, curDelay);
@ -146,14 +154,14 @@ public class TbMsgGeneratorNode implements TbNode {
private ListenableFuture<TbMsg> generate(TbContext ctx, TbMsg msg) {
log.trace("generate, config {}", config);
if (prevMsg == null) {
prevMsg = ctx.newMsg(config.getQueueName(), TbMsg.EMPTY_STRING, originatorId, msg.getCustomerId(), TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
prevMsg = ctx.newMsg(queueName, TbMsg.EMPTY_STRING, originatorId, msg.getCustomerId(), TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
}
if (initialized.get()) {
ctx.logJsEvalRequest();
return Futures.transformAsync(scriptEngine.executeGenerateAsync(prevMsg), generated -> {
log.trace("generate process response, generated {}, config {}", generated, config);
ctx.logJsEvalResponse();
prevMsg = ctx.newMsg(config.getQueueName(), generated.getType(), originatorId, msg.getCustomerId(), generated.getMetaData(), generated.getData());
prevMsg = ctx.newMsg(queueName, generated.getType(), originatorId, msg.getCustomerId(), generated.getMetaData(), generated.getData());
return Futures.immediateFuture(prevMsg);
}, MoreExecutors.directExecutor()); //usually it runs on js-executor-remote-callback thread pool
}
@ -174,4 +182,20 @@ public class TbMsgGeneratorNode implements TbNode {
scriptEngine = null;
}
}
@Override
public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
boolean hasChanges = false;
switch (fromVersion) {
case 0:
if (oldConfiguration.has(QUEUE_NAME)) {
hasChanges = true;
((ObjectNode) oldConfiguration).remove(QUEUE_NAME);
}
break;
default:
break;
}
return new TbPair<>(hasChanges, oldConfiguration);
}
}

1
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeConfiguration.java

@ -36,7 +36,6 @@ public class TbMsgGeneratorNodeConfiguration implements NodeConfiguration<TbMsgG
private ScriptLanguage scriptLang;
private String jsScript;
private String tbelScript;
private String queueName;
@Override
public TbMsgGeneratorNodeConfiguration defaultConfiguration() {

27
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java

@ -15,6 +15,7 @@
*/
package org.thingsboard.rule.engine.deduplication;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j;
@ -43,10 +44,14 @@ import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.thingsboard.server.common.data.DataConstants.QUEUE_NAME;
@RuleNode(
type = ComponentType.TRANSFORMATION,
name = "deduplication",
configClazz = TbMsgDeduplicationNodeConfiguration.class,
version = 1,
hasQueueName = true,
nodeDescription = "Deduplicate messages within the same originator entity for a configurable period " +
"based on a specified deduplication strategy.",
nodeDetails = "Deduplication strategies: <ul><li><strong>FIRST</strong> - return first message that arrived during deduplication period.</li>" +
@ -66,6 +71,7 @@ public class TbMsgDeduplicationNode implements TbNode {
private final Map<EntityId, DeduplicationData> deduplicationMap;
private long deduplicationInterval;
private String queueName;
public TbMsgDeduplicationNode() {
this.deduplicationMap = new HashMap<>();
@ -75,6 +81,7 @@ public class TbMsgDeduplicationNode implements TbNode {
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbMsgDeduplicationNodeConfiguration.class);
this.deduplicationInterval = TimeUnit.SECONDS.toMillis(config.getInterval());
this.queueName = ctx.getQueueName();
}
@Override
@ -91,6 +98,22 @@ public class TbMsgDeduplicationNode implements TbNode {
deduplicationMap.clear();
}
@Override
public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
boolean hasChanges = false;
switch (fromVersion) {
case 0:
if (oldConfiguration.has(QUEUE_NAME)) {
hasChanges = true;
((ObjectNode) oldConfiguration).remove(QUEUE_NAME);
}
break;
default:
break;
}
return new TbPair<>(hasChanges, oldConfiguration);
}
private void processOnRegularMsg(TbContext ctx, TbMsg msg) {
EntityId id = msg.getOriginator();
DeduplicationData deduplicationMsgs = deduplicationMap.computeIfAbsent(id, k -> new DeduplicationData());
@ -132,7 +155,7 @@ public class TbMsgDeduplicationNode implements TbNode {
}
}
deduplicationResults.add(TbMsg.newMsg(
config.getQueueName(),
queueName,
config.getOutMsgType(),
deduplicationId,
getMetadata(),
@ -154,7 +177,7 @@ public class TbMsgDeduplicationNode implements TbNode {
}
if (resultMsg != null) {
deduplicationResults.add(TbMsg.newMsg(
resultMsg.getQueueName(),
queueName != null ? queueName : resultMsg.getQueueName(),
resultMsg.getType(),
resultMsg.getOriginator(),
resultMsg.getCustomerId(),

1
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNodeConfiguration.java

@ -26,7 +26,6 @@ public class TbMsgDeduplicationNodeConfiguration implements NodeConfiguration<Tb
// only for DeduplicationStrategy.ALL:
private String outMsgType;
private String queueName;
// Advanced settings:
private int maxPendingMsgs;

31
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java

@ -15,7 +15,10 @@
*/
package org.thingsboard.rule.engine.flow;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
@ -24,17 +27,22 @@ import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.TbMsg;
import static org.thingsboard.server.common.data.DataConstants.QUEUE_NAME;
@Slf4j
@RuleNode(
type = ComponentType.FLOW,
name = "checkpoint",
configClazz = TbCheckpointNodeConfiguration.class,
configClazz = EmptyNodeConfiguration.class,
version = 1,
hasQueueName = true,
nodeDescription = "transfers the message to another queue",
nodeDetails = "After successful transfer incoming message is automatically acknowledged. Queue name is configurable.",
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbActionNodeCheckPointConfig"
configDirective = "tbNodeEmptyConfig"
)
public class TbCheckpointNode implements TbNode {
@ -42,8 +50,7 @@ public class TbCheckpointNode implements TbNode {
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
TbCheckpointNodeConfiguration config = TbNodeUtils.convert(configuration, TbCheckpointNodeConfiguration.class);
this.queueName = config.getQueueName();
this.queueName = ctx.getQueueName();
}
@Override
@ -51,4 +58,20 @@ public class TbCheckpointNode implements TbNode {
ctx.enqueueForTellNext(msg, queueName, TbNodeConnectionType.SUCCESS, () -> ctx.ack(msg), error -> ctx.tellFailure(msg, error));
}
@Override
public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
boolean hasChanges = false;
switch (fromVersion) {
case 0:
if (oldConfiguration.has(QUEUE_NAME)) {
hasChanges = true;
((ObjectNode) oldConfiguration).remove(QUEUE_NAME);
}
break;
default:
break;
}
return new TbPair<>(hasChanges, oldConfiguration);
}
}

30
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeConfiguration.java

@ -1,30 +0,0 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.flow;
import lombok.Data;
import org.thingsboard.rule.engine.api.NodeConfiguration;
@Data
public class TbCheckpointNodeConfiguration implements NodeConfiguration<TbCheckpointNodeConfiguration> {
private String queueName;
@Override
public TbCheckpointNodeConfiguration defaultConfiguration() {
return new TbCheckpointNodeConfiguration();
}
}

52
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/AbstractRuleNodeUpgradeTest.java

@ -0,0 +1,52 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.util.TbPair;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.BDDMockito.willCallRealMethod;
public abstract class AbstractRuleNodeUpgradeTest {
protected abstract TbNode getTestNode();
@ParameterizedTest
@MethodSource
public void givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig(int givenVersion, String givenConfigStr, boolean hasChanges, String expectedConfigStr) throws TbNodeException {
// GIVEN
willCallRealMethod().given(getTestNode()).upgrade(anyInt(), any());
JsonNode givenConfig = JacksonUtil.toJsonNode(givenConfigStr);
JsonNode expectedConfig = JacksonUtil.toJsonNode(expectedConfigStr);
// WHEN
TbPair<Boolean, JsonNode> upgradeResult = getTestNode().upgrade(givenVersion, givenConfig);
// THEN
assertThat(upgradeResult.getFirst()).isEqualTo(hasChanges);
ObjectNode upgradedConfig = (ObjectNode) upgradeResult.getSecond();
assertThat(upgradedConfig).isEqualTo(expectedConfig);
}
}

53
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeTest.java

@ -0,0 +1,53 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.debug;
import org.junit.jupiter.params.provider.Arguments;
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
import org.thingsboard.rule.engine.api.TbNode;
import java.util.stream.Stream;
import static org.mockito.Mockito.spy;
public class TbMsgGeneratorNodeTest extends AbstractRuleNodeUpgradeTest {
// Rule nodes upgrade
private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
return Stream.of(
// default config for version 0
Arguments.of(0,
"{\"msgCount\":0,\"periodInSeconds\":1,\"originatorId\":null,\"originatorType\":null, \"queueName\":null, \"scriptLang\":\"TBEL\",\"jsScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\",\"tbelScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\"}",
true,
"{\"msgCount\":0,\"periodInSeconds\":1,\"originatorId\":null,\"originatorType\":null, \"scriptLang\":\"TBEL\",\"jsScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\",\"tbelScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\"}"),
// default config for version 0 with queueName
Arguments.of(0,
"{\"msgCount\":0,\"periodInSeconds\":1,\"originatorId\":null,\"originatorType\":null, \"queueName\":\"Main\", \"scriptLang\":\"TBEL\",\"jsScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\",\"tbelScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\"}",
true,
"{\"msgCount\":0,\"periodInSeconds\":1,\"originatorId\":null,\"originatorType\":null, \"scriptLang\":\"TBEL\",\"jsScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\",\"tbelScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\"}"),
// default config for version 1 with upgrade from version 0
Arguments.of(0,
"{\"msgCount\":0,\"periodInSeconds\":1,\"originatorId\":null,\"originatorType\":null, \"scriptLang\":\"TBEL\",\"jsScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\",\"tbelScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\"}",
false,
"{\"msgCount\":0,\"periodInSeconds\":1,\"originatorId\":null,\"originatorType\":null, \"scriptLang\":\"TBEL\",\"jsScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\",\"tbelScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\"}")
);
}
@Override
protected TbNode getTestNode() {
return spy(TbMsgGeneratorNode.class);
}
}

55
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeTest.java

@ -0,0 +1,55 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.flow;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.params.provider.Arguments;
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
import org.thingsboard.rule.engine.api.TbNode;
import java.util.stream.Stream;
import static org.mockito.Mockito.spy;
@Slf4j
public class TbCheckpointNodeTest extends AbstractRuleNodeUpgradeTest {
// Rule nodes upgrade
private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
return Stream.of(
// default config for version 0
Arguments.of(0,
"{\"queueName\":null}",
true,
"{}"),
// default config for version 0 with queueName
Arguments.of(0,
"{\"queueName\":\"Main\"}",
true,
"{}"),
// default config for version 1 with upgrade from version 0
Arguments.of(0,
"{}",
false,
"{}")
);
}
@Override
protected TbNode getTestNode() {
return spy(TbCheckpointNode.class);
}
}

25
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeTest.java

@ -15,7 +15,6 @@
*/
package org.thingsboard.rule.engine.telemetry;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
@ -25,8 +24,10 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.DataConstants;
@ -40,7 +41,6 @@ import org.thingsboard.server.common.data.kv.JsonDataEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
@ -53,7 +53,6 @@ import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.willCallRealMethod;
@ -65,7 +64,7 @@ import static org.mockito.Mockito.when;
import static org.thingsboard.server.common.data.DataConstants.NOTIFY_DEVICE_METADATA_KEY;
@Slf4j
class TbMsgAttributesNodeTest {
class TbMsgAttributesNodeTest extends AbstractRuleNodeUpgradeTest {
private TenantId tenantId;
private DeviceId deviceId;
@ -223,21 +222,9 @@ class TbMsgAttributesNodeTest {
);
}
@ParameterizedTest
@MethodSource
void givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig(int givenVersion, String givenConfigStr, boolean hasChanges, String expectedConfigStr) throws TbNodeException {
// GIVEN
willCallRealMethod().given(node).upgrade(anyInt(), any());
JsonNode givenConfig = JacksonUtil.toJsonNode(givenConfigStr);
JsonNode expectedConfig = JacksonUtil.toJsonNode(expectedConfigStr);
// WHEN
TbPair<Boolean, JsonNode> upgradeResult = node.upgrade(givenVersion, givenConfig);
// THEN
assertThat(upgradeResult.getFirst()).isEqualTo(hasChanges);
ObjectNode upgradedConfig = (ObjectNode) upgradeResult.getSecond();
assertThat(upgradedConfig).isEqualTo(expectedConfig);
@Override
protected TbNode getTestNode() {
return node;
}
}

67
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java

@ -22,12 +22,17 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.stubbing.Answer;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.deduplication.DeduplicationStrategy;
@ -55,6 +60,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Stream;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
@ -68,7 +74,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@Slf4j
public class TbMsgDeduplicationNodeTest {
public class TbMsgDeduplicationNodeTest extends AbstractRuleNodeUpgradeTest {
private TbContext ctx;
@ -139,13 +145,24 @@ public class TbMsgDeduplicationNodeTest {
node.destroy();
}
@Test
public void given_100_messages_strategy_first_then_verifyOutput() throws TbNodeException, ExecutionException, InterruptedException {
private static Stream<Arguments> given_100_messages_strategy_first_then_verifyOutput() {
return Stream.of(
Arguments.of((String) null),
Arguments.of(DataConstants.MAIN_QUEUE_NAME),
Arguments.of(DataConstants.HP_QUEUE_NAME)
);
}
@ParameterizedTest
@MethodSource
public void given_100_messages_strategy_first_then_verifyOutput(String queueName) throws TbNodeException, ExecutionException, InterruptedException {
int wantedNumberOfTellSelfInvocation = 2;
int msgCount = 100;
awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation);
invokeTellSelf(wantedNumberOfTellSelfInvocation);
when(ctx.getQueueName()).thenReturn(queueName);
config.setInterval(deduplicationInterval);
config.setMaxPendingMsgs(msgCount);
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
@ -182,6 +199,12 @@ public class TbMsgDeduplicationNodeTest {
Assertions.assertEquals(firstMsg.getData(), actualMsg.getData());
Assertions.assertEquals(firstMsg.getMetaData(), actualMsg.getMetaData());
Assertions.assertEquals(firstMsg.getType(), actualMsg.getType());
if (queueName == null) {
Assertions.assertEquals(firstMsg.getQueueName(), actualMsg.getQueueName());
} else {
Assertions.assertEquals(ctx.getQueueName(), actualMsg.getQueueName());
}
}
@Test
@ -238,10 +261,10 @@ public class TbMsgDeduplicationNodeTest {
awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation);
invokeTellSelf(wantedNumberOfTellSelfInvocation);
when(ctx.getQueueName()).thenReturn(DataConstants.HP_QUEUE_NAME);
config.setInterval(deduplicationInterval);
config.setStrategy(DeduplicationStrategy.ALL);
config.setOutMsgType(TbMsgType.POST_ATTRIBUTES_REQUEST.name());
config.setQueueName(DataConstants.HP_QUEUE_NAME);
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
node.init(ctx, nodeConfiguration);
@ -268,7 +291,7 @@ public class TbMsgDeduplicationNodeTest {
Assertions.assertEquals(getMergedData(inputMsgs), outMessage.getData());
Assertions.assertEquals(deviceId, outMessage.getOriginator());
Assertions.assertEquals(config.getOutMsgType(), outMessage.getType());
Assertions.assertEquals(config.getQueueName(), outMessage.getQueueName());
Assertions.assertEquals(DataConstants.HP_QUEUE_NAME, outMessage.getQueueName());
}
@Test
@ -278,10 +301,10 @@ public class TbMsgDeduplicationNodeTest {
awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation);
invokeTellSelf(wantedNumberOfTellSelfInvocation, true, 3);
when(ctx.getQueueName()).thenReturn(DataConstants.HP_QUEUE_NAME);
config.setInterval(deduplicationInterval);
config.setStrategy(DeduplicationStrategy.ALL);
config.setOutMsgType(TbMsgType.POST_ATTRIBUTES_REQUEST.name());
config.setQueueName(DataConstants.HP_QUEUE_NAME);
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
node.init(ctx, nodeConfiguration);
@ -292,7 +315,7 @@ public class TbMsgDeduplicationNodeTest {
for (TbMsg msg : firstMsgPack) {
node.onMsg(ctx, msg);
}
long firstPackDeduplicationPackEndTs = firstMsgPack.get(0).getMetaDataTs() + TimeUnit.SECONDS.toMillis(deduplicationInterval);
long firstPackDeduplicationPackEndTs = firstMsgPack.get(0).getMetaDataTs() + TimeUnit.SECONDS.toMillis(deduplicationInterval);
List<TbMsg> secondMsgPack = getTbMsgs(deviceId, msgCount / 2, firstPackDeduplicationPackEndTs, 500);
for (TbMsg msg : secondMsgPack) {
@ -316,13 +339,13 @@ public class TbMsgDeduplicationNodeTest {
Assertions.assertEquals(getMergedData(firstMsgPack), firstMsg.getData());
Assertions.assertEquals(deviceId, firstMsg.getOriginator());
Assertions.assertEquals(config.getOutMsgType(), firstMsg.getType());
Assertions.assertEquals(config.getQueueName(), firstMsg.getQueueName());
Assertions.assertEquals(DataConstants.HP_QUEUE_NAME, firstMsg.getQueueName());
TbMsg secondMsg = resultMsgs.get(1);
Assertions.assertEquals(getMergedData(secondMsgPack), secondMsg.getData());
Assertions.assertEquals(deviceId, secondMsg.getOriginator());
Assertions.assertEquals(config.getOutMsgType(), secondMsg.getType());
Assertions.assertEquals(config.getQueueName(), secondMsg.getQueueName());
Assertions.assertEquals(DataConstants.HP_QUEUE_NAME, secondMsg.getQueueName());
}
@Test
@ -385,6 +408,27 @@ public class TbMsgDeduplicationNodeTest {
Assertions.assertEquals(msgWithLatestTsInSecondPack.getType(), actualMsg.getType());
}
// Rule nodes upgrade
private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
return Stream.of(
// default config for version 0
Arguments.of(0,
"{\"interval\":60,\"strategy\":\"FIRST\",\"outMsgType\":null,\"maxPendingMsgs\":100,\"maxRetries\":3, \"queueName\":null}",
true,
"{\"interval\":60,\"strategy\":\"FIRST\",\"outMsgType\":null,\"maxPendingMsgs\":100,\"maxRetries\":3}"),
// default config for version 0 with queueName
Arguments.of(0,
"{\"interval\":60,\"strategy\":\"FIRST\",\"outMsgType\":null,\"maxPendingMsgs\":100,\"maxRetries\":3, \"queueName\":\"Main\"}",
true,
"{\"interval\":60,\"strategy\":\"FIRST\",\"outMsgType\":null,\"maxPendingMsgs\":100,\"maxRetries\":3}"),
// default config for version 1 with upgrade from version 0
Arguments.of(0,
"{\"interval\":60,\"strategy\":\"FIRST\",\"outMsgType\":null,\"maxPendingMsgs\":100,\"maxRetries\":3}",
false,
"{\"interval\":60,\"strategy\":\"FIRST\",\"outMsgType\":null,\"maxPendingMsgs\":100,\"maxRetries\":3}")
);
}
private TbMsg getMsgWithLatestTs(List<TbMsg> firstMsgPack) {
int indexOfLastMsgInArray = firstMsgPack.size() - 1;
int indexToSetMaxTs = new Random().nextInt(indexOfLastMsgInArray) + 1;
@ -429,4 +473,9 @@ public class TbMsgDeduplicationNodeTest {
return JacksonUtil.toString(mergedData);
}
@Override
protected TbNode getTestNode() {
return node;
}
}

Loading…
Cancel
Save