Browse Source

Merge pull request #15100 from dskarzh/fix/preserve-rule-node-counter-on-msg-copy

Fixed preservation of rule node execution counter in delay and deduplication nodes
pull/15109/head
Viacheslav Klimov 4 months ago
committed by GitHub
parent
commit
884cb7cf25
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 4
      application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
  2. 5
      common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
  3. 15
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java
  4. 14
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java
  5. 8
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/external/TbAbstractExternalNode.java
  6. 106
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeTest.java
  7. 13
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java

4
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java

@ -137,7 +137,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
defaultCtx.tellFailure(msg.getMsg(), e);
}
} else {
tbMsg.getCallback().onFailure(new RuleNodeException("Message is processed by more then " + maxRuleNodeExecutionsPerMessage + " rule nodes!", ruleChainName, ruleNode));
tbMsg.getCallback().onFailure(new RuleNodeException("Message is processed by more than " + maxRuleNodeExecutionsPerMessage + " rule nodes!", ruleChainName, ruleNode));
}
}
@ -160,7 +160,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
msg.getCtx().tellFailure(msg.getMsg(), e);
}
} else {
tbMsg.getCallback().onFailure(new RuleNodeException("Message is processed by more then " + maxRuleNodeExecutionsPerMessage + " rule nodes!", ruleChainName, ruleNode));
tbMsg.getCallback().onFailure(new RuleNodeException("Message is processed by more than " + maxRuleNodeExecutionsPerMessage + " rule nodes!", ruleChainName, ruleNode));
}
}
}

5
common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java

@ -110,11 +110,10 @@ public final class TbMsg implements Serializable {
.build();
}
public TbMsg copyWithNewCtx() {
public TbMsgBuilder copyWithNewCtx() {
return copy()
.ctx(ctx.copy())
.callback(TbMsgCallback.EMPTY)
.build();
.callback(TbMsgCallback.EMPTY);
}
private TbMsg(String queueName, UUID id, long ts, TbMsgType internalType, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data,

15
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java

@ -41,6 +41,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -177,15 +178,11 @@ public class TbMsgDeduplicationNode implements TbNode {
}
}
if (resultMsg != null) {
String queueName1 = queueName != null ? queueName : resultMsg.getQueueName();
deduplicationResults.add(TbMsg.newMsg()
.queueName(queueName1)
.type(resultMsg.getType())
.originator(resultMsg.getOriginator())
.customerId(resultMsg.getCustomerId())
.copyMetaData(resultMsg.getMetaData())
.data(resultMsg.getData())
.build());
var msgBuilder = resultMsg.copyWithNewCtx().id(UUID.randomUUID());
if (queueName != null) {
msgBuilder.queueName(queueName);
}
deduplicationResults.add(msgBuilder.build());
}
}
packBoundsOpt = findValidPack(msgList, deduplicationTimeoutMs);

14
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java

@ -62,17 +62,9 @@ public class TbMsgDelayNode implements TbNode {
if (msg.isTypeOf(TbMsgType.DELAY_TIMEOUT_SELF_MSG)) {
TbMsg pendingMsg = pendingMsgs.remove(UUID.fromString(msg.getData()));
if (pendingMsg != null) {
ctx.enqueueForTellNext(
TbMsg.newMsg()
.queueName(pendingMsg.getQueueName())
.type(pendingMsg.getType())
.originator(pendingMsg.getOriginator())
.customerId(pendingMsg.getCustomerId())
.copyMetaData(pendingMsg.getMetaData())
.data(pendingMsg.getData())
.build(),
TbNodeConnectionType.SUCCESS
);
ctx.enqueueForTellNext(pendingMsg.copyWithNewCtx()
.id(UUID.randomUUID())
.build(), TbNodeConnectionType.SUCCESS);
}
} else {
if (pendingMsgs.size() < config.getMaxPendingMsgs()) {

8
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/external/TbAbstractExternalNode.java

@ -30,7 +30,7 @@ public abstract class TbAbstractExternalNode implements TbNode {
protected void tellSuccess(TbContext ctx, TbMsg tbMsg) {
if (forceAck) {
ctx.enqueueForTellNext(tbMsg.copyWithNewCtx(), TbNodeConnectionType.SUCCESS);
ctx.enqueueForTellNext(tbMsg.copyWithNewCtx().build(), TbNodeConnectionType.SUCCESS);
} else {
ctx.tellSuccess(tbMsg);
}
@ -39,9 +39,9 @@ public abstract class TbAbstractExternalNode implements TbNode {
protected void tellFailure(TbContext ctx, TbMsg tbMsg, Throwable t) {
if (forceAck) {
if (t == null) {
ctx.enqueueForTellNext(tbMsg.copyWithNewCtx(), TbNodeConnectionType.FAILURE);
ctx.enqueueForTellNext(tbMsg.copyWithNewCtx().build(), TbNodeConnectionType.FAILURE);
} else {
ctx.enqueueForTellFailure(tbMsg.copyWithNewCtx(), t);
ctx.enqueueForTellFailure(tbMsg.copyWithNewCtx().build(), t);
}
} else {
if (t == null) {
@ -55,7 +55,7 @@ public abstract class TbAbstractExternalNode implements TbNode {
protected TbMsg ackIfNeeded(TbContext ctx, TbMsg msg) {
if (forceAck) {
ctx.ack(msg);
return msg.copyWithNewCtx();
return msg.copyWithNewCtx().build();
} else {
return msg;
}

106
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeTest.java

@ -0,0 +1,106 @@
/**
* Copyright © 2016-2026 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.delay;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.TbMsgProcessingCtx;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.lenient;
@ExtendWith(MockitoExtension.class)
class TbMsgDelayNodeTest {
final DeviceId deviceId = new DeviceId(UUID.fromString("5770153d-6ca2-4447-8a54-5d8a4538e052"));
final RuleNodeId ruleNodeId = new RuleNodeId(UUID.fromString("ee682a85-7f5a-4182-91bc-46e555138fe2"));
TbMsgDelayNode node;
@Mock
TbContext ctxMock;
@BeforeEach
void setUp() throws TbNodeException {
node = new TbMsgDelayNode();
var config = new TbMsgDelayNodeConfiguration().defaultConfiguration();
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
lenient().when(ctxMock.getSelfId()).thenReturn(ruleNodeId);
}
@Test
void shouldPreserveRuleNodeCounterAndResetCallbackWhenEnqueuingDelayedMsg() {
// GIVEN
int ruleNodeExecCounter = 5;
var originalMsg = TbMsg.newMsg()
.id(UUID.randomUUID())
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.originator(deviceId)
.metaData(TbMsgMetaData.EMPTY)
.data("{\"temperature\":42}")
.ctx(new TbMsgProcessingCtx(ruleNodeExecCounter))
.build();
String originalMsgId = originalMsg.getId().toString();
var tickMsg = TbMsg.newMsg()
.type(TbMsgType.DELAY_TIMEOUT_SELF_MSG)
.originator(ruleNodeId)
.metaData(TbMsgMetaData.EMPTY)
.data(originalMsgId)
.build();
given(ctxMock.newMsg(null, TbMsgType.DELAY_TIMEOUT_SELF_MSG, ruleNodeId, null, TbMsgMetaData.EMPTY, originalMsgId)).willReturn(tickMsg);
node.onMsg(ctxMock, originalMsg);
// WHEN
node.onMsg(ctxMock, tickMsg);
// THEN
var msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
then(ctxMock).should().enqueueForTellNext(msgCaptor.capture(), eq(TbNodeConnectionType.SUCCESS));
var enqueuedMsg = msgCaptor.getValue();
assertThat(enqueuedMsg).usingRecursiveComparison()
.ignoringFields("id", "ts", "callback")
.isEqualTo(originalMsg);
assertThat(enqueuedMsg.getId()).isNotNull().isNotEqualTo(originalMsg.getId());
assertThat(enqueuedMsg.getAndIncrementRuleNodeCounter()).isEqualTo(ruleNodeExecCounter);
assertThat(enqueuedMsg.getCallback()).isSameAs(TbMsgCallback.EMPTY);
}
}

13
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java

@ -47,6 +47,8 @@ import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.TbMsgProcessingCtx;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import java.util.ArrayList;
import java.util.List;
@ -79,6 +81,8 @@ public class TbMsgDeduplicationNodeTest extends AbstractRuleNodeUpgradeTest {
private TbContext ctx;
private static final int RULE_NODE_EXEC_COUNTER = 5;
private final ScheduledExecutorService executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("de-duplication-node-test");
private final int deduplicationInterval = 1;
@ -204,6 +208,8 @@ public class TbMsgDeduplicationNodeTest extends AbstractRuleNodeUpgradeTest {
Assertions.assertEquals(firstMsg.getData(), actualMsg.getData());
Assertions.assertEquals(firstMsg.getMetaData(), actualMsg.getMetaData());
Assertions.assertEquals(firstMsg.getType(), actualMsg.getType());
Assertions.assertEquals(RULE_NODE_EXEC_COUNTER, actualMsg.getAndIncrementRuleNodeCounter());
Assertions.assertSame(TbMsgCallback.EMPTY, actualMsg.getCallback());
if (queueName == null) {
Assertions.assertEquals(firstMsg.getQueueName(), actualMsg.getQueueName());
@ -257,6 +263,8 @@ public class TbMsgDeduplicationNodeTest extends AbstractRuleNodeUpgradeTest {
Assertions.assertEquals(msgWithLatestTs.getData(), actualMsg.getData());
Assertions.assertEquals(msgWithLatestTs.getMetaData(), actualMsg.getMetaData());
Assertions.assertEquals(msgWithLatestTs.getType(), actualMsg.getType());
Assertions.assertEquals(RULE_NODE_EXEC_COUNTER, actualMsg.getAndIncrementRuleNodeCounter());
Assertions.assertSame(TbMsgCallback.EMPTY, actualMsg.getCallback());
}
@Test
@ -402,6 +410,8 @@ public class TbMsgDeduplicationNodeTest extends AbstractRuleNodeUpgradeTest {
Assertions.assertEquals(msgWithLatestTsInFirstPack.getData(), actualMsg.getData());
Assertions.assertEquals(msgWithLatestTsInFirstPack.getMetaData(), actualMsg.getMetaData());
Assertions.assertEquals(msgWithLatestTsInFirstPack.getType(), actualMsg.getType());
Assertions.assertEquals(RULE_NODE_EXEC_COUNTER, actualMsg.getAndIncrementRuleNodeCounter());
Assertions.assertSame(TbMsgCallback.EMPTY, actualMsg.getCallback());
// verify that newMsg is called but content of messages is the same as in the last msg for the second pack.
actualMsg = resultMsgs.get(1);
@ -411,6 +421,8 @@ public class TbMsgDeduplicationNodeTest extends AbstractRuleNodeUpgradeTest {
Assertions.assertEquals(msgWithLatestTsInSecondPack.getData(), actualMsg.getData());
Assertions.assertEquals(msgWithLatestTsInSecondPack.getMetaData(), actualMsg.getMetaData());
Assertions.assertEquals(msgWithLatestTsInSecondPack.getType(), actualMsg.getType());
Assertions.assertEquals(RULE_NODE_EXEC_COUNTER, actualMsg.getAndIncrementRuleNodeCounter());
Assertions.assertSame(TbMsgCallback.EMPTY, actualMsg.getCallback());
}
@Test
@ -539,6 +551,7 @@ public class TbMsgDeduplicationNodeTest extends AbstractRuleNodeUpgradeTest {
.originator(deviceId)
.copyMetaData(metaData)
.data(JacksonUtil.toString(dataNode))
.ctx(new TbMsgProcessingCtx(RULE_NODE_EXEC_COUNTER))
.build();
}

Loading…
Cancel
Save