Browse Source

minor improvements & code refactoring

pull/7893/head
ShvaykaD 3 years ago
parent
commit
f8f45e2ceb
  1. 17
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/TbMsgDeDuplicateNode.java
  2. 4
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/TbMsgDeDuplicateNodeConfiguration.java
  3. 29
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbMsgDeDuplicateNodeTest.java

17
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/TbMsgDeDuplicateNode.java

@ -74,7 +74,7 @@ public class TbMsgDeDuplicateNode implements TbNode {
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbMsgDeDuplicateNodeConfiguration.class);
this.deDuplicationInterval = TimeUnit.SECONDS.toMillis(config.getDelay());
this.deDuplicationInterval = TimeUnit.SECONDS.toMillis(config.getInterval());
this.deDuplicateByOriginator = config.isDeDuplicateByOriginator();
this.deDuplicateStateMap = new HashMap<>();
scheduleTickMsg(ctx);
@ -84,9 +84,9 @@ public class TbMsgDeDuplicateNode implements TbNode {
public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
if (TB_MSG_DEDUPLICATION_TIMEOUT_MSG.equals(msg.getType())) {
if (msg.getId().equals(nextTickId)) {
List<DeDuplicateData> deDuplicateDataList = getDeDuplicateDataList();
if (!deDuplicateDataList.isEmpty()) {
if (deDuplicateByOriginator) {
if (deDuplicateByOriginator) {
List<DeDuplicateData> deDuplicateDataList = getDeDuplicateDataList();
if (!deDuplicateDataList.isEmpty()) {
deDuplicateDataList.forEach(deDuplicateData -> {
EntityId entityId = deDuplicateData.getEntityId();
List<DeDuplicatePack> deDuplicateStates = deDuplicateData.getDeDuplicateStates();
@ -117,9 +117,9 @@ public class TbMsgDeDuplicateNode implements TbNode {
});
}));
});
} else {
// todo implement
}
} else {
// todo implement
}
scheduleTickMsg(ctx);
} else {
@ -138,6 +138,11 @@ public class TbMsgDeDuplicateNode implements TbNode {
}
}
@Override
public void destroy() {
deDuplicateStateMap.clear();
}
private List<DeDuplicateData> getDeDuplicateDataList() {
if (deDuplicateStateMap.isEmpty()) {
return Collections.emptyList();

4
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplicate/TbMsgDeDuplicateNodeConfiguration.java

@ -21,7 +21,7 @@ import org.thingsboard.rule.engine.api.NodeConfiguration;
@Data
public class TbMsgDeDuplicateNodeConfiguration implements NodeConfiguration<TbMsgDeDuplicateNodeConfiguration> {
private int delay;
private int interval;
private int maxPendingMsgs;
private int maxRetries;
private boolean deDuplicateByOriginator;
@ -35,7 +35,7 @@ public class TbMsgDeDuplicateNodeConfiguration implements NodeConfiguration<TbMs
@Override
public TbMsgDeDuplicateNodeConfiguration defaultConfiguration() {
TbMsgDeDuplicateNodeConfiguration configuration = new TbMsgDeDuplicateNodeConfiguration();
configuration.setDelay(60);
configuration.setInterval(60);
configuration.setMaxPendingMsgs(100);
configuration.setMaxRetries(3);
configuration.setDeDuplicateByOriginator(true);

29
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbMsgDeDuplicateNodeTest.java

@ -18,7 +18,6 @@ package org.thingsboard.rule.engine.action;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j;
import org.junit.Ignore;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@ -44,8 +43,6 @@ import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Random;
import java.util.UUID;
@ -111,7 +108,11 @@ public class TbMsgDeDuplicateNodeTest {
config = new TbMsgDeDuplicateNodeConfiguration().defaultConfiguration();
}
private void invokeTellSelf(int maxNumberOfInvocation, boolean incrementScheduleTimeout) {
private void invokeTellSelf(int maxNumberOfInvocation) {
invokeTellSelf(maxNumberOfInvocation, false, 0);
}
private void invokeTellSelf(int maxNumberOfInvocation, boolean delayScheduleTimeout, int delayMultiplier) {
AtomicLong scheduleTimeout = new AtomicLong(deDuplicationInterval);
AtomicInteger scheduleCount = new AtomicInteger(0);
doAnswer((Answer<Void>) invocationOnMock -> {
@ -126,8 +127,8 @@ public class TbMsgDeDuplicateNodeTest {
log.error("Failed to execute tellSelf method call due to: ", e);
}
}, scheduleTimeout.get(), TimeUnit.SECONDS);
if (incrementScheduleTimeout) {
scheduleTimeout.set(scheduleTimeout.get() * 3);
if (delayScheduleTimeout) {
scheduleTimeout.set(scheduleTimeout.get() * delayMultiplier);
}
}
@ -146,9 +147,9 @@ public class TbMsgDeDuplicateNodeTest {
int wantedNumberOfTellSelfInvocation = 2;
int msgCount = 100;
awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation);
invokeTellSelf(wantedNumberOfTellSelfInvocation, false);
invokeTellSelf(wantedNumberOfTellSelfInvocation);
config.setDelay(deDuplicationInterval);
config.setInterval(deDuplicationInterval);
config.setMaxPendingMsgs(msgCount);
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
node.init(ctx, nodeConfiguration);
@ -182,10 +183,10 @@ public class TbMsgDeDuplicateNodeTest {
int wantedNumberOfTellSelfInvocation = 2;
int msgCount = 100;
awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation);
invokeTellSelf(wantedNumberOfTellSelfInvocation, false);
invokeTellSelf(wantedNumberOfTellSelfInvocation);
config.setStrategy(DeDuplicateStrategy.LAST);
config.setDelay(deDuplicationInterval);
config.setInterval(deDuplicationInterval);
config.setMaxPendingMsgs(msgCount);
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
node.init(ctx, nodeConfiguration);
@ -227,9 +228,9 @@ public class TbMsgDeDuplicateNodeTest {
int wantedNumberOfTellSelfInvocation = 2;
int msgCount = 100;
awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation);
invokeTellSelf(wantedNumberOfTellSelfInvocation, false);
invokeTellSelf(wantedNumberOfTellSelfInvocation);
config.setDelay(deDuplicationInterval);
config.setInterval(deDuplicationInterval);
config.setMaxPendingMsgs(msgCount);
config.setStrategy(DeDuplicateStrategy.ALL);
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
@ -264,9 +265,9 @@ public class TbMsgDeDuplicateNodeTest {
int wantedNumberOfTellSelfInvocation = 2;
int msgCount = 100;
awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation);
invokeTellSelf(wantedNumberOfTellSelfInvocation, true);
invokeTellSelf(wantedNumberOfTellSelfInvocation, true, 3);
config.setDelay(deDuplicationInterval);
config.setInterval(deDuplicationInterval);
config.setMaxPendingMsgs(msgCount);
config.setStrategy(DeDuplicateStrategy.ALL);
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));

Loading…
Cancel
Save