Browse Source

update the logic of de-duplicate node after review

pull/7893/head
ShvaykaD 3 years ago
parent
commit
60982bedec
  1. 42
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/DeDuplicateData.java
  2. 39
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/DeDuplicatePack.java
  3. 2
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/DeDuplicateStrategy.java
  4. 246
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/TbMsgDeDuplicateNode.java
  5. 4
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/TbMsgDeDuplicateNodeConfiguration.java
  6. 39
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/TbMsgDeDuplicateState.java
  7. 176
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDeDuplicateNode.java
  8. 243
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbMsgDeDuplicateNodeTest.java

42
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/DeDuplicateData.java

@ -0,0 +1,42 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.deduplicate;
import lombok.Data;
import org.thingsboard.server.common.data.id.EntityId;
import java.util.ArrayList;
import java.util.List;
@Data
public class DeDuplicateData {
private EntityId entityId;
private List<DeDuplicatePack> deDuplicateStates;
private int statesCount;
public DeDuplicateData(EntityId entityId) {
this.entityId = entityId;
this.deDuplicateStates = new ArrayList<>();
this.statesCount = 0;
}
public void addDeDuplicatePack(List<TbMsgDeDuplicateState> states, int indexOfFirstStateInPack, int indexOfLastStateInPack) {
deDuplicateStates.add(new DeDuplicatePack(indexOfFirstStateInPack, indexOfLastStateInPack, states));
statesCount += states.size();
}
}

39
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/DeDuplicatePack.java

@ -0,0 +1,39 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.deduplicate;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.util.List;
@Data
@AllArgsConstructor
public class DeDuplicatePack {
private int indexOfFirstStateInPack;
private int indexOfLastStateInPack;
private List<TbMsgDeDuplicateState> states;
public TbMsgDeDuplicateState getFirst() {
return states.get(indexOfFirstStateInPack);
}
public TbMsgDeDuplicateState getLast() {
return states.get(indexOfLastStateInPack);
}
}

2
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/DeDuplicateStrategy.java → rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/DeDuplicateStrategy.java

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.delay;
package org.thingsboard.rule.engine.deduplicate;
public enum DeDuplicateStrategy {

246
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/TbMsgDeDuplicateNode.java

@ -0,0 +1,246 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.deduplicate;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TbRelationTypes;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@RuleNode(
type = ComponentType.ACTION,
name = "de-duplicate",
configClazz = TbMsgDeDuplicateNodeConfiguration.class,
nodeDescription = "De-duplicate messages for a configurable period based on a specified de-duplication strategy.",
nodeDetails = "Rule node allows you to select one of the following strategy to de-duplicate messages: <br></br>" +
"<b>FIRST</b> - return first message that arrived during de-duplication period.<br></br>" +
"<b>LAST</b> - return last message that arrived during de-duplication period.<br></br>" +
"<b>ALL</b> - return all messages as a single JSON array message. Where each element represents object with <b>msg</b> and <b>metadata</b> inner properties.<br></br>" +
"By default rule node <b>De-duplicate messages by message originator</b>, however, there is an option to de-duplicate messages independently from the incoming message's originator. " +
"In case of the <b>De-duplicate strategy</b> set to <b>ALL</b> and <b>De-duplicate messages by message originator</b> checkbox set to <b>false</b>, you must configure the <b>Queue Name</b> and <b>Out Message Type</b> for the output array message. " +
"Also in this case the output message originator will be set to the current tenant id.<br></br>",
icon = "pause",
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbActionNodeMsgDeDuplicateConfig"
)
@Slf4j
public class TbMsgDeDuplicateNode implements TbNode {
private static final String TB_MSG_DEDUPLICATION_TIMEOUT_MSG = "TbMsgDeDuplicateNodeMsg";
private TbMsgDeDuplicateNodeConfiguration config;
private Map<EntityId, List<TbMsgDeDuplicateState>> deDuplicateStateMap;
private long deDuplicationInterval;
private long lastScheduledTs;
private boolean deDuplicateByOriginator;
private UUID nextTickId;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbMsgDeDuplicateNodeConfiguration.class);
this.deDuplicationInterval = TimeUnit.SECONDS.toMillis(config.getDelay());
this.deDuplicateByOriginator = config.isDeDuplicateByOriginator();
this.deDuplicateStateMap = new HashMap<>();
scheduleTickMsg(ctx);
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
if (TB_MSG_DEDUPLICATION_TIMEOUT_MSG.equals(msg.getType())) {
if (msg.getId().equals(nextTickId)) {
List<DeDuplicateData> deDuplicateDataList = getDeDuplicateDataList();
if (!deDuplicateDataList.isEmpty()) {
if (deDuplicateByOriginator) {
deDuplicateDataList.forEach(deDuplicateData -> {
EntityId entityId = deDuplicateData.getEntityId();
List<DeDuplicatePack> deDuplicateStates = deDuplicateData.getDeDuplicateStates();
deDuplicateStates.forEach(pack ->
ctx.enqueueForTellNext(getOutputMsg(pack), TbRelationTypes.SUCCESS,
() -> {
deDuplicateStateMap.computeIfPresent(entityId, (key, states) -> {
pack.getStates().forEach(states::remove);
return states;
});
},
throwable -> {
log.trace("Failed to enqueue de-duplication output message due to: ", throwable);
deDuplicateStateMap.computeIfPresent(entityId, (key, states) -> {
List<TbMsgDeDuplicateState> statesToRemove = new ArrayList<>();
states.forEach(state -> {
if (pack.getStates().contains(state)) {
int retries = state.getRetries();
if (config.getMaxRetries() <= retries) {
state.incrementRetries();
} else {
statesToRemove.add(state);
}
}
});
states.removeAll(statesToRemove);
return states;
});
}));
});
} else {
// todo implement
}
}
scheduleTickMsg(ctx);
} else {
log.trace("[{}][{}] Received outdated tick msg: {}", ctx.getTenantId(), ctx.getSelfId(), msg.getId());
}
} else {
List<TbMsgDeDuplicateState> deDuplicateStateList = deDuplicateStateMap.computeIfAbsent(msg.getOriginator(), k -> new ArrayList<>());
if (deDuplicateStateList.size() < config.getMaxPendingMsgs()) {
log.trace("[{}] Adding msg: [{}][{}] to the pending msgs map ...", msg.getOriginator(), msg.getId(), msg.getMetaDataTs());
deDuplicateStateList.add(new TbMsgDeDuplicateState(msg));
ctx.ack(msg);
} else {
log.trace("[{}] Max limit of pending messages reached for entity!", msg.getOriginator());
ctx.tellFailure(msg, new RuntimeException("Max limit of pending messages reached for entity: " + msg.getOriginator()));
}
}
}
private List<DeDuplicateData> getDeDuplicateDataList() {
if (deDuplicateStateMap.isEmpty()) {
return Collections.emptyList();
} else {
List<DeDuplicateData> result = new ArrayList<>();
long deDuplicationTimeoutMs = System.currentTimeMillis();
deDuplicateStateMap.forEach((entityId, tbMsgDeDuplicateStates) -> {
if (!tbMsgDeDuplicateStates.isEmpty()) {
DeDuplicateData deDuplicateData = new DeDuplicateData(entityId);
TbMsgDeDuplicateState firstDeDuplicateState = getFirstDeDuplicateState(tbMsgDeDuplicateStates, null);
TbMsgDeDuplicateState lastStateInPack = firstDeDuplicateState;
long deDuplicationStartTs = firstDeDuplicateState.getTs();
long deDuplicationEndTs = deDuplicationStartTs + deDuplicationInterval;
boolean hasNextDeduplicationPack = deDuplicationEndTs < deDuplicationTimeoutMs;
while (hasNextDeduplicationPack) {
long finalDeDuplicationStartTs = deDuplicationStartTs;
List<TbMsgDeDuplicateState> statesPack = new ArrayList<>();
for (TbMsgDeDuplicateState state : tbMsgDeDuplicateStates) {
if (state.getTs() >= finalDeDuplicationStartTs && state.getTs() < deDuplicationEndTs) {
statesPack.add(state);
if (state.getTs() > lastStateInPack.getTs()) {
lastStateInPack = state;
}
}
}
deDuplicateData.addDeDuplicatePack(statesPack, statesPack.indexOf(firstDeDuplicateState), statesPack.indexOf(lastStateInPack));
if (deDuplicateData.getStatesCount() == tbMsgDeDuplicateStates.size()) {
hasNextDeduplicationPack = false;
} else {
firstDeDuplicateState = getFirstDeDuplicateState(tbMsgDeDuplicateStates, deDuplicationEndTs);
if (firstDeDuplicateState == null) {
hasNextDeduplicationPack = false;
} else {
deDuplicationStartTs = firstDeDuplicateState.getTs();
deDuplicationEndTs = deDuplicationStartTs + deDuplicationInterval;
hasNextDeduplicationPack = deDuplicationEndTs < deDuplicationTimeoutMs;
}
}
}
result.add(deDuplicateData);
}
});
return result;
}
}
private TbMsgDeDuplicateState getFirstDeDuplicateState(List<TbMsgDeDuplicateState> tbMsgDeDuplicateStates, Long previousPackEndTs) {
if (previousPackEndTs != null) {
tbMsgDeDuplicateStates = tbMsgDeDuplicateStates.stream().filter(state -> state.getTs() >= previousPackEndTs).collect(Collectors.toList());
}
TbMsgDeDuplicateState first = null;
for (TbMsgDeDuplicateState state : tbMsgDeDuplicateStates) {
if (first == null || state.getTs() < first.getTs()) {
first = state;
}
}
return first;
}
private TbMsg getOutputMsg(DeDuplicatePack pack) {
switch (config.getStrategy()) {
case FIRST:
return pack.getFirst().getTbMsg();
case LAST:
return pack.getLast().getTbMsg();
default:
EntityId originator = pack.getStates().get(0).getTbMsg().getOriginator();
String queueName = pack.getStates().get(0).getTbMsg().getQueueName();
String outMsgType = pack.getStates().get(0).getTbMsg().getType();
String data = getMergedData(pack.getStates().stream()
.map(TbMsgDeDuplicateState::getTbMsg)
.collect(Collectors.toList()));
return TbMsg.newMsg(queueName, outMsgType, originator, getMetadata(), data);
}
}
private void scheduleTickMsg(TbContext ctx) {
long curTs = System.currentTimeMillis();
if (lastScheduledTs == 0L) {
lastScheduledTs = curTs;
}
lastScheduledTs = lastScheduledTs + deDuplicationInterval;
long curDelay = Math.max(0L, (lastScheduledTs - curTs));
TbMsg tickMsg = ctx.newMsg(null, TB_MSG_DEDUPLICATION_TIMEOUT_MSG, ctx.getSelfId(), new TbMsgMetaData(), "");
nextTickId = tickMsg.getId();
ctx.tellSelf(tickMsg, curDelay);
}
private String getMergedData(List<TbMsg> msgs) {
ArrayNode mergedData = JacksonUtil.OBJECT_MAPPER.createArrayNode();
msgs.forEach(msg -> {
ObjectNode msgNode = JacksonUtil.newObjectNode();
msgNode.set("msg", JacksonUtil.toJsonNode(msg.getData()));
msgNode.set("metadata", JacksonUtil.valueToTree(msg.getMetaData().getData()));
mergedData.add(msgNode);
});
return JacksonUtil.toString(mergedData);
}
private TbMsgMetaData getMetadata() {
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("ts", String.valueOf(System.currentTimeMillis()));
return metaData;
}
}

4
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDeDuplicateNodeConfiguration.java → rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/TbMsgDeDuplicateNodeConfiguration.java

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.delay;
package org.thingsboard.rule.engine.deduplicate;
import lombok.Data;
import org.thingsboard.rule.engine.api.NodeConfiguration;
@ -23,6 +23,7 @@ public class TbMsgDeDuplicateNodeConfiguration implements NodeConfiguration<TbMs
private int delay;
private int maxPendingMsgs;
private int maxRetries;
private boolean deDuplicateByOriginator;
private DeDuplicateStrategy strategy;
@ -36,6 +37,7 @@ public class TbMsgDeDuplicateNodeConfiguration implements NodeConfiguration<TbMs
TbMsgDeDuplicateNodeConfiguration configuration = new TbMsgDeDuplicateNodeConfiguration();
configuration.setDelay(60);
configuration.setMaxPendingMsgs(100);
configuration.setMaxRetries(3);
configuration.setDeDuplicateByOriginator(true);
configuration.setStrategy(DeDuplicateStrategy.FIRST);
return configuration;

39
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/TbMsgDeDuplicateState.java

@ -0,0 +1,39 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.deduplicate;
import lombok.Data;
import org.thingsboard.server.common.msg.TbMsg;
@Data
public class TbMsgDeDuplicateState {
private TbMsg tbMsg;
private int retries;
public TbMsgDeDuplicateState(TbMsg tbMsg) {
this.tbMsg = tbMsg;
this.retries = 0;
}
public long getTs() {
return tbMsg.getMetaDataTs();
}
public void incrementRetries() {
this.retries++;
}
}

176
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDeDuplicateNode.java

@ -1,176 +0,0 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.delay;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TbRelationTypes;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@RuleNode(
type = ComponentType.ACTION,
name = "de-duplicate",
configClazz = TbMsgDeDuplicateNodeConfiguration.class,
nodeDescription = "De-duplicate messages for a configurable period based on a specified de-duplication strategy.",
nodeDetails = "Rule node allows you to select one of the following strategy to de-duplicate messages: <br></br>" +
"<b>FIRST</b> - return first message that arrived during de-duplication period.<br></br>" +
"<b>LAST</b> - return last message that arrived during de-duplication period.<br></br>" +
"<b>ALL</b> - return all messages as a single JSON array message. Where each element represents object with <b>msg</b> and <b>metadata</b> inner properties.<br></br>" +
"By default rule node <b>De-duplicate messages by message originator</b>, however, there is an option to de-duplicate messages independently from the incoming message's originator. " +
"In case of the <b>De-duplicate strategy</b> set to <b>ALL</b> and <b>De-duplicate messages by message originator</b> checkbox set to <b>false</b>, you must configure the <b>Queue Name</b> and <b>Out Message Type</b> for the output array message. " +
"Also in this case the output message originator will be set to the current tenant id.<br></br>",
icon = "pause",
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbActionNodeMsgDeDuplicateConfig"
)
@Slf4j
public class TbMsgDeDuplicateNode implements TbNode {
private static final String TB_MSG_DEDUPLICATION_TIMEOUT_MSG = "TbMsgDeDuplicateNodeMsg";
private TbMsgDeDuplicateNodeConfiguration config;
private Map<String, List<TbMsg>> deDuplicationMap;
private long delay;
private long lastScheduledTs;
private boolean deDuplicateByOriginator;
private UUID nextTickId;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbMsgDeDuplicateNodeConfiguration.class);
this.delay = TimeUnit.SECONDS.toMillis(config.getDelay());
this.deDuplicateByOriginator = config.isDeDuplicateByOriginator();
this.deDuplicationMap = new HashMap<>();
scheduleTickMsg(ctx);
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
if (TB_MSG_DEDUPLICATION_TIMEOUT_MSG.equals(msg.getType())) {
if (msg.getId().equals(nextTickId)) {
long deDuplicationTimeoutMs = System.currentTimeMillis();
List<String> keysToRemove = deDuplicationMap
.keySet()
.stream()
.filter(key -> key.startsWith(String.valueOf(lastScheduledTs)) || Long.parseLong(key.split("_")[0]) < deDuplicationTimeoutMs)
.collect(Collectors.toList());
scheduleTickMsg(ctx);
keysToRemove.forEach(key -> {
List<TbMsg> msgList = deDuplicationMap.get(key);
TbMsg outMsg;
switch (config.getStrategy()) {
case FIRST:
outMsg = msgList.get(0);
break;
case LAST:
outMsg = msgList.get(msgList.size() - 1);
break;
default:
EntityId originator;
String queueName;
String outMsgType;
if (deDuplicateByOriginator) {
String[] keyElements = key.split("_");
originator = EntityIdFactory.getByTypeAndId(keyElements[1], keyElements[2]);
queueName = msgList.get(0).getQueueName();
outMsgType = msgList.get(0).getType();
} else {
originator = ctx.getTenantId();
queueName = config.getQueueName();
outMsgType = config.getOutMsgType();
}
outMsg = TbMsg.newMsg(queueName, outMsgType, originator, getMetadata(), getMergedData(msgList));
break;
}
ctx.enqueueForTellNext(outMsg, TbRelationTypes.SUCCESS,
() -> deDuplicationMap.remove(key),
throwable -> log.trace("Failed to enqueue de-duplication output message due to: ", throwable));
});
} else {
log.trace("[{}][{}] Received outdated tick msg: {}", ctx.getTenantId(), ctx.getSelfId(), msg.getId());
}
} else {
String key = String.format("%d_%s", lastScheduledTs, getDeDuplicationId(ctx, msg));
List<TbMsg> deDuplicateMsgList = deDuplicationMap.computeIfAbsent(key, k -> new ArrayList<>());
if (deDuplicateMsgList.size() < config.getMaxPendingMsgs()) {
deDuplicateMsgList.add(msg);
ctx.ack(msg);
} else {
ctx.tellFailure(msg, new RuntimeException("Max limit of pending messages reached for key: " + key));
}
}
}
private String getDeDuplicationId(TbContext ctx, TbMsg msg) {
return deDuplicateByOriginator ?
msg.getOriginator().getEntityType().name() + "_" + msg.getOriginator().getId() :
EntityType.RULE_NODE.name() + "_" + ctx.getSelfId().getId();
}
private void scheduleTickMsg(TbContext ctx) {
long curTs = System.currentTimeMillis();
if (lastScheduledTs == 0L) {
lastScheduledTs = curTs;
}
lastScheduledTs = lastScheduledTs + delay;
long curDelay = Math.max(0L, (lastScheduledTs - curTs));
TbMsg tickMsg = ctx.newMsg(null, TB_MSG_DEDUPLICATION_TIMEOUT_MSG, ctx.getSelfId(), new TbMsgMetaData(), "");
nextTickId = tickMsg.getId();
ctx.tellSelf(tickMsg, curDelay);
}
private String getMergedData(List<TbMsg> msgs) {
ArrayNode mergedData = JacksonUtil.OBJECT_MAPPER.createArrayNode();
msgs.forEach(msg -> {
ObjectNode msgNode = JacksonUtil.newObjectNode();
msgNode.set("msg", JacksonUtil.toJsonNode(msg.getData()));
msgNode.set("metadata", JacksonUtil.valueToTree(msg.getMetaData().getData()));
mergedData.add(msgNode);
});
return JacksonUtil.toString(mergedData);
}
private TbMsgMetaData getMetadata() {
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("ts", String.valueOf(System.currentTimeMillis()));
return metaData;
}
}

243
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbMsgDeDuplicateNodeTest.java

@ -18,6 +18,7 @@ package org.thingsboard.rule.engine.action;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j;
import org.junit.Ignore;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@ -31,9 +32,9 @@ import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TbRelationTypes;
import org.thingsboard.rule.engine.delay.DeDuplicateStrategy;
import org.thingsboard.rule.engine.delay.TbMsgDeDuplicateNode;
import org.thingsboard.rule.engine.delay.TbMsgDeDuplicateNodeConfiguration;
import org.thingsboard.rule.engine.deduplicate.DeDuplicateStrategy;
import org.thingsboard.rule.engine.deduplicate.TbMsgDeDuplicateNode;
import org.thingsboard.rule.engine.deduplicate.TbMsgDeDuplicateNodeConfiguration;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleNodeId;
@ -43,12 +44,18 @@ import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import static org.mockito.ArgumentMatchers.any;
@ -57,6 +64,7 @@ import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -70,7 +78,9 @@ public class TbMsgDeDuplicateNodeTest {
private TbContext ctx;
private final ExecutorService executorService = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("de-duplication-node-test"));
private final ThingsBoardThreadFactory factory = ThingsBoardThreadFactory.forName("de-duplication-node-test");
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(factory);
private final int deDuplicationInterval = 1;
private TenantId tenantId;
@ -79,14 +89,10 @@ public class TbMsgDeDuplicateNodeTest {
private TbNodeConfiguration nodeConfiguration;
private CountDownLatch awaitTellSelfLatch;
private CountDownLatch awaitAllMsgsProcessedLatch;
private int scheduleCount;
@BeforeEach
public void init() throws TbNodeException {
ctx = mock(TbContext.class);
awaitTellSelfLatch = new CountDownLatch(1);
awaitAllMsgsProcessedLatch = new CountDownLatch(1);
tenantId = TenantId.fromUUID(UUID.randomUUID());
RuleNodeId ruleNodeId = new RuleNodeId(UUID.randomUUID());
@ -101,28 +107,32 @@ public class TbMsgDeDuplicateNodeTest {
String data = (String) (invocationOnMock.getArguments())[4];
return TbMsg.newMsg(type, originator, metaData.copy(), data);
}).when(ctx).newMsg(isNull(), eq(TB_MSG_DEDUPLICATION_TIMEOUT_MSG), nullable(EntityId.class), any(TbMsgMetaData.class), any(String.class));
node = spy(new TbMsgDeDuplicateNode());
config = new TbMsgDeDuplicateNodeConfiguration().defaultConfiguration();
}
scheduleCount = 0;
private void invokeTellSelf(int maxNumberOfInvocation, boolean incrementScheduleTimeout) {
AtomicLong scheduleTimeout = new AtomicLong(deDuplicationInterval);
AtomicInteger scheduleCount = new AtomicInteger(0);
doAnswer((Answer<Void>) invocationOnMock -> {
scheduleCount++;
if (scheduleCount == 1) {
scheduleCount.getAndIncrement();
if (scheduleCount.get() <= maxNumberOfInvocation) {
TbMsg msg = (TbMsg) (invocationOnMock.getArguments())[0];
executorService.submit(() -> {
executorService.schedule(() -> {
try {
awaitAllMsgsProcessedLatch.await();
node.onMsg(ctx, msg);
awaitTellSelfLatch.countDown();
} catch (ExecutionException | InterruptedException | TbNodeException e) {
log.error("Failed to execute tellSelf method call due to: ", e);
}
});
}, scheduleTimeout.get(), TimeUnit.SECONDS);
if (incrementScheduleTimeout) {
scheduleTimeout.set(scheduleTimeout.get() * 3);
}
}
return null;
}).when(ctx).tellSelf(ArgumentMatchers.any(TbMsg.class), ArgumentMatchers.anyLong());
node = new TbMsgDeDuplicateNode();
config = new TbMsgDeDuplicateNodeConfiguration().defaultConfiguration();
}
@AfterEach
@ -132,170 +142,189 @@ public class TbMsgDeDuplicateNodeTest {
}
@Test
public void given_multipleMessages_thenVerifyOutputFirst() throws TbNodeException, ExecutionException, InterruptedException {
public void given_100_messages_then_verifyOutputFirst() throws TbNodeException, ExecutionException, InterruptedException {
int wantedNumberOfTellSelfInvocation = 2;
int msgCount = 100;
awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation);
invokeTellSelf(wantedNumberOfTellSelfInvocation, false);
config.setDelay(deDuplicationInterval);
config.setMaxPendingMsgs(msgCount);
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
node.init(ctx, nodeConfiguration);
DeviceId deviceId = new DeviceId(UUID.randomUUID());
List<TbMsg> inputMsgs = createTbMsgs(deviceId);
long currentTimeMillis = System.currentTimeMillis();
List<TbMsg> inputMsgs = getTbMsgs(deviceId, msgCount, currentTimeMillis, 500);
for (TbMsg msg : inputMsgs) {
node.onMsg(ctx, msg);
}
awaitAllMsgsProcessedLatch.countDown();
awaitTellSelfLatch.await();
verify(ctx, times(inputMsgs.size())).ack(any());
ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
ArgumentCaptor<Runnable> successCaptor = ArgumentCaptor.forClass(Runnable.class);
ArgumentCaptor<Consumer<Throwable>> failureCaptor = ArgumentCaptor.forClass(Consumer.class);
verify(ctx, times(1)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture());
Assertions.assertEquals(inputMsgs.get(0), newMsgCaptor.getValue());
}
TbMsg msgToReject = createMsg(deviceId, inputMsgs.get(inputMsgs.size() - 1).getMetaDataTs() + 2);
node.onMsg(ctx, msgToReject);
@Test
public void given_moreMessagesThenAllowedInConfiguration_thenVerifyTellFailure() throws TbNodeException, ExecutionException, InterruptedException {
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
node.init(ctx, nodeConfiguration);
DeviceId deviceId = new DeviceId(UUID.randomUUID());
List<TbMsg> inputMsgs = createTbMsgs(deviceId, 101);
for (TbMsg msg : inputMsgs) {
node.onMsg(ctx, msg);
}
awaitAllMsgsProcessedLatch.countDown();
awaitTellSelfLatch.await();
verify(ctx, times(inputMsgs.size() -1)).ack(any());
verify(ctx, times(1)).tellFailure(eq(inputMsgs.get(inputMsgs.size() -1)), any());
ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
ArgumentCaptor<Runnable> successCaptor = ArgumentCaptor.forClass(Runnable.class);
ArgumentCaptor<Consumer<Throwable>> failureCaptor = ArgumentCaptor.forClass(Consumer.class);
verify(ctx, times(msgCount)).ack(any());
verify(ctx, times(1)).tellFailure(eq(msgToReject), any());
verify(node, times(msgCount + wantedNumberOfTellSelfInvocation + 1)).onMsg(eq(ctx), any());
verify(ctx, times(1)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture());
Assertions.assertEquals(inputMsgs.get(0), newMsgCaptor.getValue());
}
@Test
public void given_multipleMessages_thenVerifyOutputLast() throws TbNodeException, ExecutionException, InterruptedException {
public void given_100_messages_then_verifyOutputLast() throws TbNodeException, ExecutionException, InterruptedException {
int wantedNumberOfTellSelfInvocation = 2;
int msgCount = 100;
awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation);
invokeTellSelf(wantedNumberOfTellSelfInvocation, false);
config.setStrategy(DeDuplicateStrategy.LAST);
config.setDelay(deDuplicationInterval);
config.setMaxPendingMsgs(msgCount);
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
node.init(ctx, nodeConfiguration);
DeviceId deviceId = new DeviceId(UUID.randomUUID());
List<TbMsg> inputMsgs = createTbMsgs(deviceId);
long currentTimeMillis = System.currentTimeMillis();
List<TbMsg> inputMsgs = getTbMsgs(deviceId, msgCount, currentTimeMillis, 500);
int indexOfLastMsgInArray = inputMsgs.size() - 1;
int indexToSetMaxTs = new Random().nextInt(indexOfLastMsgInArray) + 1;
TbMsg currentMaxTsMsg = inputMsgs.get(indexOfLastMsgInArray);
TbMsg newLastMsgOfArray = inputMsgs.get(indexToSetMaxTs);
inputMsgs.set(indexOfLastMsgInArray, newLastMsgOfArray);
inputMsgs.set(indexToSetMaxTs, currentMaxTsMsg);
for (TbMsg msg : inputMsgs) {
node.onMsg(ctx, msg);
}
awaitAllMsgsProcessedLatch.countDown();
TbMsg msgToReject = createMsg(deviceId, inputMsgs.get(indexOfLastMsgInArray).getMetaDataTs() + 2);
node.onMsg(ctx, msgToReject);
awaitTellSelfLatch.await();
verify(ctx, times(inputMsgs.size())).ack(any());
ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
ArgumentCaptor<Runnable> successCaptor = ArgumentCaptor.forClass(Runnable.class);
ArgumentCaptor<Consumer<Throwable>> failureCaptor = ArgumentCaptor.forClass(Consumer.class);
verify(ctx, times(msgCount)).ack(any());
verify(ctx, times(1)).tellFailure(eq(msgToReject), any());
verify(node, times(msgCount + wantedNumberOfTellSelfInvocation + 1)).onMsg(eq(ctx), any());
verify(ctx, times(1)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture());
Assertions.assertEquals(inputMsgs.get(inputMsgs.size() - 1), newMsgCaptor.getValue());
Assertions.assertEquals(currentMaxTsMsg, newMsgCaptor.getValue());
}
@Test
public void given_multipleMessagesFromTwoOriginators_thenVerifyOutputAllForEachOriginator() throws TbNodeException, ExecutionException, InterruptedException {
public void given_100_messages_then_verifyOutputAll() throws TbNodeException, ExecutionException, InterruptedException {
int wantedNumberOfTellSelfInvocation = 2;
int msgCount = 100;
awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation);
invokeTellSelf(wantedNumberOfTellSelfInvocation, false);
config.setDelay(deDuplicationInterval);
config.setMaxPendingMsgs(msgCount);
config.setStrategy(DeDuplicateStrategy.ALL);
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
node.init(ctx, nodeConfiguration);
DeviceId firstDeviceId = new DeviceId(UUID.randomUUID());
List<TbMsg> firstDeviceInputMsgs = createTbMsgs(firstDeviceId);
DeviceId secondDeviceId = new DeviceId(UUID.randomUUID());
List<TbMsg> secondDeviceInputMsgs = createTbMsgs(secondDeviceId);
List<TbMsg> inputMsgs = new ArrayList<>();
inputMsgs.addAll(firstDeviceInputMsgs);
inputMsgs.addAll(secondDeviceInputMsgs);
DeviceId deviceId = new DeviceId(UUID.randomUUID());
long currentTimeMillis = System.currentTimeMillis();
List<TbMsg> inputMsgs = getTbMsgs(deviceId, msgCount, currentTimeMillis, 500);
for (TbMsg msg : inputMsgs) {
node.onMsg(ctx, msg);
}
awaitAllMsgsProcessedLatch.countDown();
awaitTellSelfLatch.await();
verify(ctx, times(inputMsgs.size())).ack(any());
ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
ArgumentCaptor<Runnable> successCaptor = ArgumentCaptor.forClass(Runnable.class);
ArgumentCaptor<Consumer<Throwable>> failureCaptor = ArgumentCaptor.forClass(Consumer.class);
verify(ctx, times(2)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture());
List<TbMsg> outMessages = newMsgCaptor.getAllValues();
Assertions.assertEquals(2, outMessages.size());
for (TbMsg tbMsg : outMessages) {
if (tbMsg.getOriginator().equals(firstDeviceId)) {
Assertions.assertEquals(getMergedData(firstDeviceInputMsgs), tbMsg.getData());
} else {
Assertions.assertEquals(getMergedData(secondDeviceInputMsgs), tbMsg.getData());
Assertions.assertEquals(secondDeviceId, tbMsg.getOriginator());
}
}
verify(ctx, times(msgCount)).ack(any());
verify(node, times(msgCount + wantedNumberOfTellSelfInvocation)).onMsg(eq(ctx), any());
verify(ctx, times(1)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture());
Assertions.assertEquals(1, newMsgCaptor.getAllValues().size());
TbMsg outMessage = newMsgCaptor.getAllValues().get(0);
Assertions.assertEquals(getMergedData(inputMsgs), outMessage.getData());
Assertions.assertEquals(deviceId, outMessage.getOriginator());
}
@Test
public void given_multipleMessagesFromTwoOriginators_thenVerifyOutputAll() throws TbNodeException, ExecutionException, InterruptedException {
public void given_100_messages_then_verifyOutput_2_packs() throws TbNodeException, ExecutionException, InterruptedException {
int wantedNumberOfTellSelfInvocation = 2;
int msgCount = 100;
awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation);
invokeTellSelf(wantedNumberOfTellSelfInvocation, true);
config.setDelay(deDuplicationInterval);
config.setMaxPendingMsgs(msgCount);
config.setStrategy(DeDuplicateStrategy.ALL);
config.setDeDuplicateByOriginator(false);
config.setQueueName(HIGH_PRIORITY_QUEUE_NAME);
config.setOutMsgType(SessionMsgType.POST_ATTRIBUTES_REQUEST.name());
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
node.init(ctx, nodeConfiguration);
DeviceId firstDeviceId = new DeviceId(UUID.randomUUID());
List<TbMsg> firstDeviceInputMsgs = createTbMsgs(firstDeviceId, 50);
DeviceId secondDeviceId = new DeviceId(UUID.randomUUID());
List<TbMsg> secondDeviceInputMsgs = createTbMsgs(secondDeviceId, 50);
DeviceId deviceId = new DeviceId(UUID.randomUUID());
long currentTimeMillis = System.currentTimeMillis();
List<TbMsg> inputMsgs = new ArrayList<>();
inputMsgs.addAll(firstDeviceInputMsgs);
inputMsgs.addAll(secondDeviceInputMsgs);
List<TbMsg> firstMsgPack = getTbMsgs(deviceId, msgCount / 2, currentTimeMillis, 500);
for (TbMsg msg : firstMsgPack) {
node.onMsg(ctx, msg);
}
TbMsg firstMsgFromFirstPack = firstMsgPack.get(0);
long firstPackDeDuplicationPackEndTs = firstMsgFromFirstPack.getMetaDataTs() + TimeUnit.SECONDS.toMillis(deDuplicationInterval);
for (TbMsg msg : inputMsgs) {
List<TbMsg> secondMsgPack = getTbMsgs(deviceId, msgCount / 2, firstPackDeDuplicationPackEndTs, 500);
for (TbMsg msg : secondMsgPack) {
node.onMsg(ctx, msg);
}
awaitAllMsgsProcessedLatch.countDown();
awaitTellSelfLatch.await();
verify(ctx, times(inputMsgs.size())).ack(any());
ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
ArgumentCaptor<Runnable> successCaptor = ArgumentCaptor.forClass(Runnable.class);
ArgumentCaptor<Consumer<Throwable>> failureCaptor = ArgumentCaptor.forClass(Consumer.class);
verify(ctx, times(1)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture());
Assertions.assertEquals(1, newMsgCaptor.getAllValues().size());
TbMsg outMessage = newMsgCaptor.getAllValues().get(0);
Assertions.assertEquals(getMergedData(inputMsgs), outMessage.getData());
Assertions.assertEquals(tenantId, outMessage.getOriginator());
Assertions.assertEquals(config.getQueueName(), outMessage.getQueueName());
Assertions.assertEquals(config.getOutMsgType(), outMessage.getType());
}
verify(ctx, times(msgCount)).ack(any());
verify(node, times(msgCount + wantedNumberOfTellSelfInvocation)).onMsg(eq(ctx), any());
verify(ctx, times(2)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture());
private List<TbMsg> createTbMsgs(DeviceId deviceId) {
return createTbMsgs(deviceId, 100);
Assertions.assertEquals(2, newMsgCaptor.getAllValues().size());
Assertions.assertEquals(getMergedData(firstMsgPack), newMsgCaptor.getAllValues().get(0).getData());
Assertions.assertEquals(getMergedData(secondMsgPack), newMsgCaptor.getAllValues().get(1).getData());
}
private List<TbMsg> createTbMsgs(DeviceId deviceId, int msgCount) {
private List<TbMsg> getTbMsgs(DeviceId deviceId, int msgCount, long currentTimeMillis, int initTsStep) {
List<TbMsg> inputMsgs = new ArrayList<>();
var ts = currentTimeMillis + initTsStep;
for (int i = 0; i < msgCount; i++) {
ObjectNode dataNode = JacksonUtil.newObjectNode();
dataNode.put("msgId", i);
dataNode.put("deviceId", deviceId.getId().toString());
TbMsg tbMsg = TbMsg.newMsg(
MAIN_QUEUE_NAME,
SessionMsgType.POST_TELEMETRY_REQUEST.name(),
deviceId,
new TbMsgMetaData(),
JacksonUtil.toString(dataNode));
inputMsgs.add(tbMsg);
inputMsgs.add(createMsg(deviceId, ts));
ts += 2;
}
return inputMsgs;
}
private TbMsg createMsg(DeviceId deviceId, long ts) {
ObjectNode dataNode = JacksonUtil.newObjectNode();
dataNode.put("deviceId", deviceId.getId().toString());
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("ts", String.valueOf(ts));
return TbMsg.newMsg(
MAIN_QUEUE_NAME,
SessionMsgType.POST_TELEMETRY_REQUEST.name(),
deviceId,
metaData,
JacksonUtil.toString(dataNode));
}
private String getMergedData(List<TbMsg> msgs) {
ArrayNode mergedData = JacksonUtil.OBJECT_MAPPER.createArrayNode();
msgs.forEach(msg -> {

Loading…
Cancel
Save