diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/DeDuplicateData.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/DeDuplicateData.java new file mode 100644 index 0000000000..56d009b3c1 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/DeDuplicateData.java @@ -0,0 +1,42 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.deduplicate; + +import lombok.Data; +import org.thingsboard.server.common.data.id.EntityId; + +import java.util.ArrayList; +import java.util.List; + +@Data +public class DeDuplicateData { + + private EntityId entityId; + private List deDuplicateStates; + private int statesCount; + + public DeDuplicateData(EntityId entityId) { + this.entityId = entityId; + this.deDuplicateStates = new ArrayList<>(); + this.statesCount = 0; + } + + public void addDeDuplicatePack(List states, int indexOfFirstStateInPack, int indexOfLastStateInPack) { + deDuplicateStates.add(new DeDuplicatePack(indexOfFirstStateInPack, indexOfLastStateInPack, states)); + statesCount += states.size(); + } + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/DeDuplicatePack.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/DeDuplicatePack.java new file mode 100644 index 0000000000..7685169535 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/DeDuplicatePack.java @@ -0,0 +1,39 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.deduplicate; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.util.List; + +@Data +@AllArgsConstructor +public class DeDuplicatePack { + + private int indexOfFirstStateInPack; + private int indexOfLastStateInPack; + private List states; + + public TbMsgDeDuplicateState getFirst() { + return states.get(indexOfFirstStateInPack); + } + + public TbMsgDeDuplicateState getLast() { + return states.get(indexOfLastStateInPack); + } + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/DeDuplicateStrategy.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/DeDuplicateStrategy.java similarity index 93% rename from rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/DeDuplicateStrategy.java rename to rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/DeDuplicateStrategy.java index 69056de9fd..83dba2c285 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/DeDuplicateStrategy.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/DeDuplicateStrategy.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.rule.engine.delay; +package org.thingsboard.rule.engine.deduplicate; public enum DeDuplicateStrategy { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/TbMsgDeDuplicateNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/TbMsgDeDuplicateNode.java new file mode 100644 index 0000000000..2e0eb3196e --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/TbMsgDeDuplicateNode.java @@ -0,0 +1,246 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.deduplicate; + +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.api.RuleNode; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.TbRelationTypes; +import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@RuleNode( + type = ComponentType.ACTION, + name = "de-duplicate", + configClazz = TbMsgDeDuplicateNodeConfiguration.class, + nodeDescription = "De-duplicate messages for a configurable period based on a specified de-duplication strategy.", + nodeDetails = "Rule node allows you to select one of the following strategy to de-duplicate messages:

" + + "FIRST - return first message that arrived during de-duplication period.

" + + "LAST - return last message that arrived during de-duplication period.

" + + "ALL - return all messages as a single JSON array message. Where each element represents object with msg and metadata inner properties.

" + + "By default rule node De-duplicate messages by message originator, however, there is an option to de-duplicate messages independently from the incoming message's originator. " + + "In case of the De-duplicate strategy set to ALL and De-duplicate messages by message originator checkbox set to false, you must configure the Queue Name and Out Message Type for the output array message. " + + "Also in this case the output message originator will be set to the current tenant id.

", + icon = "pause", + uiResources = {"static/rulenode/rulenode-core-config.js"}, + configDirective = "tbActionNodeMsgDeDuplicateConfig" +) +@Slf4j +public class TbMsgDeDuplicateNode implements TbNode { + + private static final String TB_MSG_DEDUPLICATION_TIMEOUT_MSG = "TbMsgDeDuplicateNodeMsg"; + + private TbMsgDeDuplicateNodeConfiguration config; + + private Map> deDuplicateStateMap; + private long deDuplicationInterval; + private long lastScheduledTs; + private boolean deDuplicateByOriginator; + private UUID nextTickId; + + + @Override + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + this.config = TbNodeUtils.convert(configuration, TbMsgDeDuplicateNodeConfiguration.class); + this.deDuplicationInterval = TimeUnit.SECONDS.toMillis(config.getDelay()); + this.deDuplicateByOriginator = config.isDeDuplicateByOriginator(); + this.deDuplicateStateMap = new HashMap<>(); + scheduleTickMsg(ctx); + } + + @Override + public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { + if (TB_MSG_DEDUPLICATION_TIMEOUT_MSG.equals(msg.getType())) { + if (msg.getId().equals(nextTickId)) { + List deDuplicateDataList = getDeDuplicateDataList(); + if (!deDuplicateDataList.isEmpty()) { + if (deDuplicateByOriginator) { + deDuplicateDataList.forEach(deDuplicateData -> { + EntityId entityId = deDuplicateData.getEntityId(); + List deDuplicateStates = deDuplicateData.getDeDuplicateStates(); + deDuplicateStates.forEach(pack -> + ctx.enqueueForTellNext(getOutputMsg(pack), TbRelationTypes.SUCCESS, + () -> { + deDuplicateStateMap.computeIfPresent(entityId, (key, states) -> { + pack.getStates().forEach(states::remove); + return states; + }); + }, + throwable -> { + log.trace("Failed to enqueue de-duplication output message due to: ", throwable); + deDuplicateStateMap.computeIfPresent(entityId, (key, states) -> { + List statesToRemove = new ArrayList<>(); + states.forEach(state -> { + if (pack.getStates().contains(state)) { + int retries = state.getRetries(); + if (config.getMaxRetries() <= retries) { + state.incrementRetries(); + } else { + statesToRemove.add(state); + } + } + }); + states.removeAll(statesToRemove); + return states; + }); + })); + }); + } else { + // todo implement + } + } + scheduleTickMsg(ctx); + } else { + log.trace("[{}][{}] Received outdated tick msg: {}", ctx.getTenantId(), ctx.getSelfId(), msg.getId()); + } + } else { + List deDuplicateStateList = deDuplicateStateMap.computeIfAbsent(msg.getOriginator(), k -> new ArrayList<>()); + if (deDuplicateStateList.size() < config.getMaxPendingMsgs()) { + log.trace("[{}] Adding msg: [{}][{}] to the pending msgs map ...", msg.getOriginator(), msg.getId(), msg.getMetaDataTs()); + deDuplicateStateList.add(new TbMsgDeDuplicateState(msg)); + ctx.ack(msg); + } else { + log.trace("[{}] Max limit of pending messages reached for entity!", msg.getOriginator()); + ctx.tellFailure(msg, new RuntimeException("Max limit of pending messages reached for entity: " + msg.getOriginator())); + } + } + } + + private List getDeDuplicateDataList() { + if (deDuplicateStateMap.isEmpty()) { + return Collections.emptyList(); + } else { + List result = new ArrayList<>(); + long deDuplicationTimeoutMs = System.currentTimeMillis(); + deDuplicateStateMap.forEach((entityId, tbMsgDeDuplicateStates) -> { + if (!tbMsgDeDuplicateStates.isEmpty()) { + DeDuplicateData deDuplicateData = new DeDuplicateData(entityId); + TbMsgDeDuplicateState firstDeDuplicateState = getFirstDeDuplicateState(tbMsgDeDuplicateStates, null); + TbMsgDeDuplicateState lastStateInPack = firstDeDuplicateState; + long deDuplicationStartTs = firstDeDuplicateState.getTs(); + long deDuplicationEndTs = deDuplicationStartTs + deDuplicationInterval; + boolean hasNextDeduplicationPack = deDuplicationEndTs < deDuplicationTimeoutMs; + while (hasNextDeduplicationPack) { + long finalDeDuplicationStartTs = deDuplicationStartTs; + List statesPack = new ArrayList<>(); + for (TbMsgDeDuplicateState state : tbMsgDeDuplicateStates) { + if (state.getTs() >= finalDeDuplicationStartTs && state.getTs() < deDuplicationEndTs) { + statesPack.add(state); + if (state.getTs() > lastStateInPack.getTs()) { + lastStateInPack = state; + } + } + } + deDuplicateData.addDeDuplicatePack(statesPack, statesPack.indexOf(firstDeDuplicateState), statesPack.indexOf(lastStateInPack)); + if (deDuplicateData.getStatesCount() == tbMsgDeDuplicateStates.size()) { + hasNextDeduplicationPack = false; + } else { + firstDeDuplicateState = getFirstDeDuplicateState(tbMsgDeDuplicateStates, deDuplicationEndTs); + if (firstDeDuplicateState == null) { + hasNextDeduplicationPack = false; + } else { + deDuplicationStartTs = firstDeDuplicateState.getTs(); + deDuplicationEndTs = deDuplicationStartTs + deDuplicationInterval; + hasNextDeduplicationPack = deDuplicationEndTs < deDuplicationTimeoutMs; + } + } + } + result.add(deDuplicateData); + } + }); + return result; + } + } + + private TbMsgDeDuplicateState getFirstDeDuplicateState(List tbMsgDeDuplicateStates, Long previousPackEndTs) { + if (previousPackEndTs != null) { + tbMsgDeDuplicateStates = tbMsgDeDuplicateStates.stream().filter(state -> state.getTs() >= previousPackEndTs).collect(Collectors.toList()); + } + TbMsgDeDuplicateState first = null; + for (TbMsgDeDuplicateState state : tbMsgDeDuplicateStates) { + if (first == null || state.getTs() < first.getTs()) { + first = state; + } + } + return first; + } + + private TbMsg getOutputMsg(DeDuplicatePack pack) { + switch (config.getStrategy()) { + case FIRST: + return pack.getFirst().getTbMsg(); + case LAST: + return pack.getLast().getTbMsg(); + default: + EntityId originator = pack.getStates().get(0).getTbMsg().getOriginator(); + String queueName = pack.getStates().get(0).getTbMsg().getQueueName(); + String outMsgType = pack.getStates().get(0).getTbMsg().getType(); + String data = getMergedData(pack.getStates().stream() + .map(TbMsgDeDuplicateState::getTbMsg) + .collect(Collectors.toList())); + return TbMsg.newMsg(queueName, outMsgType, originator, getMetadata(), data); + } + } + + private void scheduleTickMsg(TbContext ctx) { + long curTs = System.currentTimeMillis(); + if (lastScheduledTs == 0L) { + lastScheduledTs = curTs; + } + lastScheduledTs = lastScheduledTs + deDuplicationInterval; + long curDelay = Math.max(0L, (lastScheduledTs - curTs)); + TbMsg tickMsg = ctx.newMsg(null, TB_MSG_DEDUPLICATION_TIMEOUT_MSG, ctx.getSelfId(), new TbMsgMetaData(), ""); + nextTickId = tickMsg.getId(); + ctx.tellSelf(tickMsg, curDelay); + } + + private String getMergedData(List msgs) { + ArrayNode mergedData = JacksonUtil.OBJECT_MAPPER.createArrayNode(); + msgs.forEach(msg -> { + ObjectNode msgNode = JacksonUtil.newObjectNode(); + msgNode.set("msg", JacksonUtil.toJsonNode(msg.getData())); + msgNode.set("metadata", JacksonUtil.valueToTree(msg.getMetaData().getData())); + mergedData.add(msgNode); + }); + return JacksonUtil.toString(mergedData); + } + + private TbMsgMetaData getMetadata() { + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("ts", String.valueOf(System.currentTimeMillis())); + return metaData; + } + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDeDuplicateNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/TbMsgDeDuplicateNodeConfiguration.java similarity index 93% rename from rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDeDuplicateNodeConfiguration.java rename to rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/TbMsgDeDuplicateNodeConfiguration.java index 6153591abe..e93bf25c23 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDeDuplicateNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/TbMsgDeDuplicateNodeConfiguration.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.rule.engine.delay; +package org.thingsboard.rule.engine.deduplicate; import lombok.Data; import org.thingsboard.rule.engine.api.NodeConfiguration; @@ -23,6 +23,7 @@ public class TbMsgDeDuplicateNodeConfiguration implements NodeConfigurationFIRST - return first message that arrived during de-duplication period.

" + - "LAST - return last message that arrived during de-duplication period.

" + - "ALL - return all messages as a single JSON array message. Where each element represents object with msg and metadata inner properties.

" + - "By default rule node De-duplicate messages by message originator, however, there is an option to de-duplicate messages independently from the incoming message's originator. " + - "In case of the De-duplicate strategy set to ALL and De-duplicate messages by message originator checkbox set to false, you must configure the Queue Name and Out Message Type for the output array message. " + - "Also in this case the output message originator will be set to the current tenant id.

", - icon = "pause", - uiResources = {"static/rulenode/rulenode-core-config.js"}, - configDirective = "tbActionNodeMsgDeDuplicateConfig" -) -@Slf4j -public class TbMsgDeDuplicateNode implements TbNode { - - private static final String TB_MSG_DEDUPLICATION_TIMEOUT_MSG = "TbMsgDeDuplicateNodeMsg"; - - private TbMsgDeDuplicateNodeConfiguration config; - - private Map> deDuplicationMap; - private long delay; - private long lastScheduledTs; - private boolean deDuplicateByOriginator; - private UUID nextTickId; - - - @Override - public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { - this.config = TbNodeUtils.convert(configuration, TbMsgDeDuplicateNodeConfiguration.class); - this.delay = TimeUnit.SECONDS.toMillis(config.getDelay()); - this.deDuplicateByOriginator = config.isDeDuplicateByOriginator(); - this.deDuplicationMap = new HashMap<>(); - scheduleTickMsg(ctx); - } - - @Override - public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { - if (TB_MSG_DEDUPLICATION_TIMEOUT_MSG.equals(msg.getType())) { - if (msg.getId().equals(nextTickId)) { - long deDuplicationTimeoutMs = System.currentTimeMillis(); - List keysToRemove = deDuplicationMap - .keySet() - .stream() - .filter(key -> key.startsWith(String.valueOf(lastScheduledTs)) || Long.parseLong(key.split("_")[0]) < deDuplicationTimeoutMs) - .collect(Collectors.toList()); - scheduleTickMsg(ctx); - keysToRemove.forEach(key -> { - List msgList = deDuplicationMap.get(key); - TbMsg outMsg; - switch (config.getStrategy()) { - case FIRST: - outMsg = msgList.get(0); - break; - case LAST: - outMsg = msgList.get(msgList.size() - 1); - break; - default: - EntityId originator; - String queueName; - String outMsgType; - if (deDuplicateByOriginator) { - String[] keyElements = key.split("_"); - originator = EntityIdFactory.getByTypeAndId(keyElements[1], keyElements[2]); - queueName = msgList.get(0).getQueueName(); - outMsgType = msgList.get(0).getType(); - } else { - originator = ctx.getTenantId(); - queueName = config.getQueueName(); - outMsgType = config.getOutMsgType(); - } - outMsg = TbMsg.newMsg(queueName, outMsgType, originator, getMetadata(), getMergedData(msgList)); - break; - } - ctx.enqueueForTellNext(outMsg, TbRelationTypes.SUCCESS, - () -> deDuplicationMap.remove(key), - throwable -> log.trace("Failed to enqueue de-duplication output message due to: ", throwable)); - }); - } else { - log.trace("[{}][{}] Received outdated tick msg: {}", ctx.getTenantId(), ctx.getSelfId(), msg.getId()); - } - } else { - String key = String.format("%d_%s", lastScheduledTs, getDeDuplicationId(ctx, msg)); - List deDuplicateMsgList = deDuplicationMap.computeIfAbsent(key, k -> new ArrayList<>()); - if (deDuplicateMsgList.size() < config.getMaxPendingMsgs()) { - deDuplicateMsgList.add(msg); - ctx.ack(msg); - } else { - ctx.tellFailure(msg, new RuntimeException("Max limit of pending messages reached for key: " + key)); - } - } - } - - private String getDeDuplicationId(TbContext ctx, TbMsg msg) { - return deDuplicateByOriginator ? - msg.getOriginator().getEntityType().name() + "_" + msg.getOriginator().getId() : - EntityType.RULE_NODE.name() + "_" + ctx.getSelfId().getId(); - } - - private void scheduleTickMsg(TbContext ctx) { - long curTs = System.currentTimeMillis(); - if (lastScheduledTs == 0L) { - lastScheduledTs = curTs; - } - lastScheduledTs = lastScheduledTs + delay; - long curDelay = Math.max(0L, (lastScheduledTs - curTs)); - TbMsg tickMsg = ctx.newMsg(null, TB_MSG_DEDUPLICATION_TIMEOUT_MSG, ctx.getSelfId(), new TbMsgMetaData(), ""); - nextTickId = tickMsg.getId(); - ctx.tellSelf(tickMsg, curDelay); - } - - private String getMergedData(List msgs) { - ArrayNode mergedData = JacksonUtil.OBJECT_MAPPER.createArrayNode(); - msgs.forEach(msg -> { - ObjectNode msgNode = JacksonUtil.newObjectNode(); - msgNode.set("msg", JacksonUtil.toJsonNode(msg.getData())); - msgNode.set("metadata", JacksonUtil.valueToTree(msg.getMetaData().getData())); - mergedData.add(msgNode); - }); - return JacksonUtil.toString(mergedData); - } - - private TbMsgMetaData getMetadata() { - TbMsgMetaData metaData = new TbMsgMetaData(); - metaData.putValue("ts", String.valueOf(System.currentTimeMillis())); - return metaData; - } - -} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbMsgDeDuplicateNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbMsgDeDuplicateNodeTest.java index dd7a8d8efd..954f8fcb52 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbMsgDeDuplicateNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbMsgDeDuplicateNodeTest.java @@ -18,6 +18,7 @@ package org.thingsboard.rule.engine.action; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.extern.slf4j.Slf4j; +import org.junit.Ignore; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -31,9 +32,9 @@ import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TbRelationTypes; -import org.thingsboard.rule.engine.delay.DeDuplicateStrategy; -import org.thingsboard.rule.engine.delay.TbMsgDeDuplicateNode; -import org.thingsboard.rule.engine.delay.TbMsgDeDuplicateNodeConfiguration; +import org.thingsboard.rule.engine.deduplicate.DeDuplicateStrategy; +import org.thingsboard.rule.engine.deduplicate.TbMsgDeDuplicateNode; +import org.thingsboard.rule.engine.deduplicate.TbMsgDeDuplicateNodeConfiguration; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleNodeId; @@ -43,12 +44,18 @@ import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.session.SessionMsgType; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.List; +import java.util.Random; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import static org.mockito.ArgumentMatchers.any; @@ -57,6 +64,7 @@ import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -70,7 +78,9 @@ public class TbMsgDeDuplicateNodeTest { private TbContext ctx; - private final ExecutorService executorService = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("de-duplication-node-test")); + private final ThingsBoardThreadFactory factory = ThingsBoardThreadFactory.forName("de-duplication-node-test"); + private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(factory); + private final int deDuplicationInterval = 1; private TenantId tenantId; @@ -79,14 +89,10 @@ public class TbMsgDeDuplicateNodeTest { private TbNodeConfiguration nodeConfiguration; private CountDownLatch awaitTellSelfLatch; - private CountDownLatch awaitAllMsgsProcessedLatch; - private int scheduleCount; @BeforeEach public void init() throws TbNodeException { ctx = mock(TbContext.class); - awaitTellSelfLatch = new CountDownLatch(1); - awaitAllMsgsProcessedLatch = new CountDownLatch(1); tenantId = TenantId.fromUUID(UUID.randomUUID()); RuleNodeId ruleNodeId = new RuleNodeId(UUID.randomUUID()); @@ -101,28 +107,32 @@ public class TbMsgDeDuplicateNodeTest { String data = (String) (invocationOnMock.getArguments())[4]; return TbMsg.newMsg(type, originator, metaData.copy(), data); }).when(ctx).newMsg(isNull(), eq(TB_MSG_DEDUPLICATION_TIMEOUT_MSG), nullable(EntityId.class), any(TbMsgMetaData.class), any(String.class)); + node = spy(new TbMsgDeDuplicateNode()); + config = new TbMsgDeDuplicateNodeConfiguration().defaultConfiguration(); + } - scheduleCount = 0; - + private void invokeTellSelf(int maxNumberOfInvocation, boolean incrementScheduleTimeout) { + AtomicLong scheduleTimeout = new AtomicLong(deDuplicationInterval); + AtomicInteger scheduleCount = new AtomicInteger(0); doAnswer((Answer) invocationOnMock -> { - scheduleCount++; - if (scheduleCount == 1) { + scheduleCount.getAndIncrement(); + if (scheduleCount.get() <= maxNumberOfInvocation) { TbMsg msg = (TbMsg) (invocationOnMock.getArguments())[0]; - executorService.submit(() -> { + executorService.schedule(() -> { try { - awaitAllMsgsProcessedLatch.await(); node.onMsg(ctx, msg); awaitTellSelfLatch.countDown(); } catch (ExecutionException | InterruptedException | TbNodeException e) { log.error("Failed to execute tellSelf method call due to: ", e); } - }); + }, scheduleTimeout.get(), TimeUnit.SECONDS); + if (incrementScheduleTimeout) { + scheduleTimeout.set(scheduleTimeout.get() * 3); + } } + return null; }).when(ctx).tellSelf(ArgumentMatchers.any(TbMsg.class), ArgumentMatchers.anyLong()); - - node = new TbMsgDeDuplicateNode(); - config = new TbMsgDeDuplicateNodeConfiguration().defaultConfiguration(); } @AfterEach @@ -132,170 +142,189 @@ public class TbMsgDeDuplicateNodeTest { } @Test - public void given_multipleMessages_thenVerifyOutputFirst() throws TbNodeException, ExecutionException, InterruptedException { + public void given_100_messages_then_verifyOutputFirst() throws TbNodeException, ExecutionException, InterruptedException { + int wantedNumberOfTellSelfInvocation = 2; + int msgCount = 100; + awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); + invokeTellSelf(wantedNumberOfTellSelfInvocation, false); + + config.setDelay(deDuplicationInterval); + config.setMaxPendingMsgs(msgCount); nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); node.init(ctx, nodeConfiguration); DeviceId deviceId = new DeviceId(UUID.randomUUID()); - List inputMsgs = createTbMsgs(deviceId); + long currentTimeMillis = System.currentTimeMillis(); + + List inputMsgs = getTbMsgs(deviceId, msgCount, currentTimeMillis, 500); for (TbMsg msg : inputMsgs) { node.onMsg(ctx, msg); } - awaitAllMsgsProcessedLatch.countDown(); - awaitTellSelfLatch.await(); - verify(ctx, times(inputMsgs.size())).ack(any()); - ArgumentCaptor newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); - ArgumentCaptor successCaptor = ArgumentCaptor.forClass(Runnable.class); - ArgumentCaptor> failureCaptor = ArgumentCaptor.forClass(Consumer.class); - verify(ctx, times(1)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture()); - Assertions.assertEquals(inputMsgs.get(0), newMsgCaptor.getValue()); - } + TbMsg msgToReject = createMsg(deviceId, inputMsgs.get(inputMsgs.size() - 1).getMetaDataTs() + 2); + node.onMsg(ctx, msgToReject); - @Test - public void given_moreMessagesThenAllowedInConfiguration_thenVerifyTellFailure() throws TbNodeException, ExecutionException, InterruptedException { - nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); - node.init(ctx, nodeConfiguration); - - DeviceId deviceId = new DeviceId(UUID.randomUUID()); - List inputMsgs = createTbMsgs(deviceId, 101); - for (TbMsg msg : inputMsgs) { - node.onMsg(ctx, msg); - } - awaitAllMsgsProcessedLatch.countDown(); awaitTellSelfLatch.await(); - verify(ctx, times(inputMsgs.size() -1)).ack(any()); - verify(ctx, times(1)).tellFailure(eq(inputMsgs.get(inputMsgs.size() -1)), any()); ArgumentCaptor newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); ArgumentCaptor successCaptor = ArgumentCaptor.forClass(Runnable.class); ArgumentCaptor> failureCaptor = ArgumentCaptor.forClass(Consumer.class); + + verify(ctx, times(msgCount)).ack(any()); + verify(ctx, times(1)).tellFailure(eq(msgToReject), any()); + verify(node, times(msgCount + wantedNumberOfTellSelfInvocation + 1)).onMsg(eq(ctx), any()); verify(ctx, times(1)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture()); Assertions.assertEquals(inputMsgs.get(0), newMsgCaptor.getValue()); } @Test - public void given_multipleMessages_thenVerifyOutputLast() throws TbNodeException, ExecutionException, InterruptedException { + public void given_100_messages_then_verifyOutputLast() throws TbNodeException, ExecutionException, InterruptedException { + int wantedNumberOfTellSelfInvocation = 2; + int msgCount = 100; + awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); + invokeTellSelf(wantedNumberOfTellSelfInvocation, false); + config.setStrategy(DeDuplicateStrategy.LAST); + config.setDelay(deDuplicationInterval); + config.setMaxPendingMsgs(msgCount); nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); node.init(ctx, nodeConfiguration); DeviceId deviceId = new DeviceId(UUID.randomUUID()); - List inputMsgs = createTbMsgs(deviceId); + long currentTimeMillis = System.currentTimeMillis(); + + List inputMsgs = getTbMsgs(deviceId, msgCount, currentTimeMillis, 500); + + int indexOfLastMsgInArray = inputMsgs.size() - 1; + int indexToSetMaxTs = new Random().nextInt(indexOfLastMsgInArray) + 1; + TbMsg currentMaxTsMsg = inputMsgs.get(indexOfLastMsgInArray); + TbMsg newLastMsgOfArray = inputMsgs.get(indexToSetMaxTs); + inputMsgs.set(indexOfLastMsgInArray, newLastMsgOfArray); + inputMsgs.set(indexToSetMaxTs, currentMaxTsMsg); + for (TbMsg msg : inputMsgs) { node.onMsg(ctx, msg); } - awaitAllMsgsProcessedLatch.countDown(); + + TbMsg msgToReject = createMsg(deviceId, inputMsgs.get(indexOfLastMsgInArray).getMetaDataTs() + 2); + node.onMsg(ctx, msgToReject); + awaitTellSelfLatch.await(); - verify(ctx, times(inputMsgs.size())).ack(any()); ArgumentCaptor newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); ArgumentCaptor successCaptor = ArgumentCaptor.forClass(Runnable.class); ArgumentCaptor> failureCaptor = ArgumentCaptor.forClass(Consumer.class); + + verify(ctx, times(msgCount)).ack(any()); + verify(ctx, times(1)).tellFailure(eq(msgToReject), any()); + verify(node, times(msgCount + wantedNumberOfTellSelfInvocation + 1)).onMsg(eq(ctx), any()); verify(ctx, times(1)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture()); - Assertions.assertEquals(inputMsgs.get(inputMsgs.size() - 1), newMsgCaptor.getValue()); + Assertions.assertEquals(currentMaxTsMsg, newMsgCaptor.getValue()); } @Test - public void given_multipleMessagesFromTwoOriginators_thenVerifyOutputAllForEachOriginator() throws TbNodeException, ExecutionException, InterruptedException { + public void given_100_messages_then_verifyOutputAll() throws TbNodeException, ExecutionException, InterruptedException { + int wantedNumberOfTellSelfInvocation = 2; + int msgCount = 100; + awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); + invokeTellSelf(wantedNumberOfTellSelfInvocation, false); + + config.setDelay(deDuplicationInterval); + config.setMaxPendingMsgs(msgCount); config.setStrategy(DeDuplicateStrategy.ALL); nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); node.init(ctx, nodeConfiguration); - DeviceId firstDeviceId = new DeviceId(UUID.randomUUID()); - List firstDeviceInputMsgs = createTbMsgs(firstDeviceId); - - DeviceId secondDeviceId = new DeviceId(UUID.randomUUID()); - List secondDeviceInputMsgs = createTbMsgs(secondDeviceId); - - List inputMsgs = new ArrayList<>(); - inputMsgs.addAll(firstDeviceInputMsgs); - inputMsgs.addAll(secondDeviceInputMsgs); + DeviceId deviceId = new DeviceId(UUID.randomUUID()); + long currentTimeMillis = System.currentTimeMillis(); + List inputMsgs = getTbMsgs(deviceId, msgCount, currentTimeMillis, 500); for (TbMsg msg : inputMsgs) { node.onMsg(ctx, msg); } - awaitAllMsgsProcessedLatch.countDown(); + awaitTellSelfLatch.await(); - verify(ctx, times(inputMsgs.size())).ack(any()); ArgumentCaptor newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); ArgumentCaptor successCaptor = ArgumentCaptor.forClass(Runnable.class); ArgumentCaptor> failureCaptor = ArgumentCaptor.forClass(Consumer.class); - verify(ctx, times(2)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture()); - List outMessages = newMsgCaptor.getAllValues(); - Assertions.assertEquals(2, outMessages.size()); - for (TbMsg tbMsg : outMessages) { - if (tbMsg.getOriginator().equals(firstDeviceId)) { - Assertions.assertEquals(getMergedData(firstDeviceInputMsgs), tbMsg.getData()); - } else { - Assertions.assertEquals(getMergedData(secondDeviceInputMsgs), tbMsg.getData()); - Assertions.assertEquals(secondDeviceId, tbMsg.getOriginator()); - } - } + verify(ctx, times(msgCount)).ack(any()); + verify(node, times(msgCount + wantedNumberOfTellSelfInvocation)).onMsg(eq(ctx), any()); + verify(ctx, times(1)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture()); + + Assertions.assertEquals(1, newMsgCaptor.getAllValues().size()); + TbMsg outMessage = newMsgCaptor.getAllValues().get(0); + Assertions.assertEquals(getMergedData(inputMsgs), outMessage.getData()); + Assertions.assertEquals(deviceId, outMessage.getOriginator()); } @Test - public void given_multipleMessagesFromTwoOriginators_thenVerifyOutputAll() throws TbNodeException, ExecutionException, InterruptedException { + public void given_100_messages_then_verifyOutput_2_packs() throws TbNodeException, ExecutionException, InterruptedException { + int wantedNumberOfTellSelfInvocation = 2; + int msgCount = 100; + awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); + invokeTellSelf(wantedNumberOfTellSelfInvocation, true); + + config.setDelay(deDuplicationInterval); + config.setMaxPendingMsgs(msgCount); config.setStrategy(DeDuplicateStrategy.ALL); - config.setDeDuplicateByOriginator(false); - config.setQueueName(HIGH_PRIORITY_QUEUE_NAME); - config.setOutMsgType(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()); nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); node.init(ctx, nodeConfiguration); - DeviceId firstDeviceId = new DeviceId(UUID.randomUUID()); - List firstDeviceInputMsgs = createTbMsgs(firstDeviceId, 50); - - DeviceId secondDeviceId = new DeviceId(UUID.randomUUID()); - List secondDeviceInputMsgs = createTbMsgs(secondDeviceId, 50); + DeviceId deviceId = new DeviceId(UUID.randomUUID()); + long currentTimeMillis = System.currentTimeMillis(); - List inputMsgs = new ArrayList<>(); - inputMsgs.addAll(firstDeviceInputMsgs); - inputMsgs.addAll(secondDeviceInputMsgs); + List firstMsgPack = getTbMsgs(deviceId, msgCount / 2, currentTimeMillis, 500); + for (TbMsg msg : firstMsgPack) { + node.onMsg(ctx, msg); + } + TbMsg firstMsgFromFirstPack = firstMsgPack.get(0); + long firstPackDeDuplicationPackEndTs = firstMsgFromFirstPack.getMetaDataTs() + TimeUnit.SECONDS.toMillis(deDuplicationInterval); - for (TbMsg msg : inputMsgs) { + List secondMsgPack = getTbMsgs(deviceId, msgCount / 2, firstPackDeDuplicationPackEndTs, 500); + for (TbMsg msg : secondMsgPack) { node.onMsg(ctx, msg); } - awaitAllMsgsProcessedLatch.countDown(); + awaitTellSelfLatch.await(); - verify(ctx, times(inputMsgs.size())).ack(any()); ArgumentCaptor newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); ArgumentCaptor successCaptor = ArgumentCaptor.forClass(Runnable.class); ArgumentCaptor> failureCaptor = ArgumentCaptor.forClass(Consumer.class); - verify(ctx, times(1)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture()); - Assertions.assertEquals(1, newMsgCaptor.getAllValues().size()); - TbMsg outMessage = newMsgCaptor.getAllValues().get(0); - Assertions.assertEquals(getMergedData(inputMsgs), outMessage.getData()); - Assertions.assertEquals(tenantId, outMessage.getOriginator()); - Assertions.assertEquals(config.getQueueName(), outMessage.getQueueName()); - Assertions.assertEquals(config.getOutMsgType(), outMessage.getType()); - } + verify(ctx, times(msgCount)).ack(any()); + verify(node, times(msgCount + wantedNumberOfTellSelfInvocation)).onMsg(eq(ctx), any()); + verify(ctx, times(2)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture()); - private List createTbMsgs(DeviceId deviceId) { - return createTbMsgs(deviceId, 100); + Assertions.assertEquals(2, newMsgCaptor.getAllValues().size()); + Assertions.assertEquals(getMergedData(firstMsgPack), newMsgCaptor.getAllValues().get(0).getData()); + Assertions.assertEquals(getMergedData(secondMsgPack), newMsgCaptor.getAllValues().get(1).getData()); } - private List createTbMsgs(DeviceId deviceId, int msgCount) { + private List getTbMsgs(DeviceId deviceId, int msgCount, long currentTimeMillis, int initTsStep) { List inputMsgs = new ArrayList<>(); + var ts = currentTimeMillis + initTsStep; for (int i = 0; i < msgCount; i++) { - ObjectNode dataNode = JacksonUtil.newObjectNode(); - dataNode.put("msgId", i); - dataNode.put("deviceId", deviceId.getId().toString()); - TbMsg tbMsg = TbMsg.newMsg( - MAIN_QUEUE_NAME, - SessionMsgType.POST_TELEMETRY_REQUEST.name(), - deviceId, - new TbMsgMetaData(), - JacksonUtil.toString(dataNode)); - inputMsgs.add(tbMsg); + inputMsgs.add(createMsg(deviceId, ts)); + ts += 2; } return inputMsgs; } + private TbMsg createMsg(DeviceId deviceId, long ts) { + ObjectNode dataNode = JacksonUtil.newObjectNode(); + dataNode.put("deviceId", deviceId.getId().toString()); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("ts", String.valueOf(ts)); + return TbMsg.newMsg( + MAIN_QUEUE_NAME, + SessionMsgType.POST_TELEMETRY_REQUEST.name(), + deviceId, + metaData, + JacksonUtil.toString(dataNode)); + } + private String getMergedData(List msgs) { ArrayNode mergedData = JacksonUtil.OBJECT_MAPPER.createArrayNode(); msgs.forEach(msg -> {