|
|
|
@ -136,10 +136,11 @@ class DefaultTbContextTest { |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
public void givenMsgWithCurrentNodeAlreadyInStack_whenInput_thenTellFailureToPreventLoop() { |
|
|
|
// GIVEN - simulate the second iteration: current node is already in the return stack,
|
|
|
|
// which happens when forwardMsgToDefaultRuleChain=true and the device's default rule chain
|
|
|
|
// is the same as the rule chain containing this node
|
|
|
|
public void givenDefaultCapAndCurrentNodeAlreadyInStack_whenInput_thenTellFailure() { |
|
|
|
// GIVEN - default cap = 1: any revisit of the same (chain, node) within one message fails.
|
|
|
|
// Simulates the second visit of an input node whose first push is still in the stack.
|
|
|
|
given(mainCtxMock.getRuleChainInputLoopMaxVisits()).willReturn(1); |
|
|
|
|
|
|
|
var callbackMock = mock(TbMsgCallback.class); |
|
|
|
given(callbackMock.isMsgValid()).willReturn(true); |
|
|
|
|
|
|
|
@ -157,7 +158,6 @@ class DefaultTbContextTest { |
|
|
|
.data(TbMsg.EMPTY_STRING) |
|
|
|
.callback(callbackMock) |
|
|
|
.build(); |
|
|
|
// Push current node into the stack - simulates a previous iteration that already forwarded to this rule chain
|
|
|
|
msg.pushToStack(RULE_CHAIN_ID, RULE_NODE_ID); |
|
|
|
|
|
|
|
var targetRuleChainId = new RuleChainId(UUID.randomUUID()); |
|
|
|
@ -165,16 +165,215 @@ class DefaultTbContextTest { |
|
|
|
// WHEN
|
|
|
|
defaultTbContext.input(msg, targetRuleChainId); |
|
|
|
|
|
|
|
// THEN - loop detected: tellFailure is called and no message is enqueued
|
|
|
|
// THEN - failure with the unified visit-limit message that names the property
|
|
|
|
ArgumentCaptor<TbActorMsg> tellCaptor = ArgumentCaptor.forClass(TbActorMsg.class); |
|
|
|
then(chainActorMock).should().tell(tellCaptor.capture()); |
|
|
|
TbActorMsg capturedTellMsg = tellCaptor.getValue(); |
|
|
|
assertThat(capturedTellMsg).isInstanceOf(RuleNodeToRuleChainTellNextMsg.class); |
|
|
|
RuleNodeToRuleChainTellNextMsg failureMsg = (RuleNodeToRuleChainTellNextMsg) capturedTellMsg; |
|
|
|
assertThat(failureMsg.getRelationTypes()).containsOnly(TbNodeConnectionType.FAILURE); |
|
|
|
assertThat(failureMsg.getFailureMessage()).containsIgnoringCase("loop"); |
|
|
|
assertThat(failureMsg.getFailureMessage()).contains("visit limit 1 reached"); |
|
|
|
assertThat(failureMsg.getFailureMessage()).contains("TB_RULE_CHAIN_INPUT_LOOP_MAX_VISITS"); |
|
|
|
assertThat(failureMsg.getFailureMessage()).contains("actors.rule.chain.input_loop_max_visits"); |
|
|
|
|
|
|
|
then(mainCtxMock).should().getRuleChainInputLoopMaxVisits(); |
|
|
|
then(mainCtxMock).shouldHaveNoMoreInteractions(); |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
public void givenVisitsBelowCap_whenInput_thenEnqueue() { |
|
|
|
// GIVEN - admin raised cap to 5; this is the first visit of the input node for the message
|
|
|
|
given(mainCtxMock.getRuleChainInputLoopMaxVisits()).willReturn(5); |
|
|
|
var tpi = resolve(null); |
|
|
|
given(mainCtxMock.resolve(eq(ServiceType.TB_RULE_ENGINE), nullable(String.class), eq(TENANT_ID), eq(TENANT_ID))).willReturn(tpi); |
|
|
|
var clusterService = mock(TbClusterService.class); |
|
|
|
given(mainCtxMock.getClusterService()).willReturn(clusterService); |
|
|
|
|
|
|
|
var callbackMock = mock(TbMsgCallback.class); |
|
|
|
given(callbackMock.isMsgValid()).willReturn(true); |
|
|
|
|
|
|
|
var ruleNode = new RuleNode(RULE_NODE_ID); |
|
|
|
ruleNode.setRuleChainId(RULE_CHAIN_ID); |
|
|
|
ruleNode.setDebugSettings(DebugSettings.off()); |
|
|
|
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); |
|
|
|
given(nodeCtxMock.getSelf()).willReturn(ruleNode); |
|
|
|
|
|
|
|
var msg = TbMsg.newMsg() |
|
|
|
.type(TbMsgType.POST_TELEMETRY_REQUEST) |
|
|
|
.originator(TENANT_ID) |
|
|
|
.copyMetaData(TbMsgMetaData.EMPTY) |
|
|
|
.data(TbMsg.EMPTY_STRING) |
|
|
|
.callback(callbackMock) |
|
|
|
.build(); |
|
|
|
var targetRuleChainId = new RuleChainId(UUID.randomUUID()); |
|
|
|
|
|
|
|
then(mainCtxMock).shouldHaveNoInteractions(); // no message was enqueued
|
|
|
|
// WHEN
|
|
|
|
defaultTbContext.input(msg, targetRuleChainId); |
|
|
|
|
|
|
|
// THEN
|
|
|
|
then(clusterService).should().pushMsgToRuleEngine(eq(tpi), eq(msg.getId()), any(), any()); |
|
|
|
then(chainActorMock).shouldHaveNoInteractions(); |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
public void givenVisitsAtCap_whenInput_thenTellFailure() { |
|
|
|
// GIVEN - cap=2, this input node has already been pushed twice for this message
|
|
|
|
given(mainCtxMock.getRuleChainInputLoopMaxVisits()).willReturn(2); |
|
|
|
|
|
|
|
var callbackMock = mock(TbMsgCallback.class); |
|
|
|
given(callbackMock.isMsgValid()).willReturn(true); |
|
|
|
|
|
|
|
var ruleNode = new RuleNode(RULE_NODE_ID); |
|
|
|
ruleNode.setRuleChainId(RULE_CHAIN_ID); |
|
|
|
ruleNode.setDebugSettings(DebugSettings.off()); |
|
|
|
given(nodeCtxMock.getSelf()).willReturn(ruleNode); |
|
|
|
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); |
|
|
|
given(nodeCtxMock.getChainActor()).willReturn(chainActorMock); |
|
|
|
|
|
|
|
var msg = TbMsg.newMsg() |
|
|
|
.type(TbMsgType.POST_TELEMETRY_REQUEST) |
|
|
|
.originator(TENANT_ID) |
|
|
|
.copyMetaData(TbMsgMetaData.EMPTY) |
|
|
|
.data(TbMsg.EMPTY_STRING) |
|
|
|
.callback(callbackMock) |
|
|
|
.build(); |
|
|
|
// two prior visits of THIS node + an unrelated entry that must NOT contribute to the count
|
|
|
|
msg.pushToStack(RULE_CHAIN_ID, RULE_NODE_ID); |
|
|
|
msg.pushToStack(new RuleChainId(UUID.randomUUID()), new RuleNodeId(UUID.randomUUID())); |
|
|
|
msg.pushToStack(RULE_CHAIN_ID, RULE_NODE_ID); |
|
|
|
|
|
|
|
var targetRuleChainId = new RuleChainId(UUID.randomUUID()); |
|
|
|
|
|
|
|
// WHEN
|
|
|
|
defaultTbContext.input(msg, targetRuleChainId); |
|
|
|
|
|
|
|
// THEN - cap failure: tellFailure with message naming the limit, ENV name and yaml key
|
|
|
|
ArgumentCaptor<TbActorMsg> tellCaptor = ArgumentCaptor.forClass(TbActorMsg.class); |
|
|
|
then(chainActorMock).should().tell(tellCaptor.capture()); |
|
|
|
TbActorMsg capturedTellMsg = tellCaptor.getValue(); |
|
|
|
assertThat(capturedTellMsg).isInstanceOf(RuleNodeToRuleChainTellNextMsg.class); |
|
|
|
RuleNodeToRuleChainTellNextMsg failureMsg = (RuleNodeToRuleChainTellNextMsg) capturedTellMsg; |
|
|
|
assertThat(failureMsg.getRelationTypes()).containsOnly(TbNodeConnectionType.FAILURE); |
|
|
|
assertThat(failureMsg.getFailureMessage()).contains("visit limit 2 reached"); |
|
|
|
assertThat(failureMsg.getFailureMessage()).contains("TB_RULE_CHAIN_INPUT_LOOP_MAX_VISITS"); |
|
|
|
assertThat(failureMsg.getFailureMessage()).contains("actors.rule.chain.input_loop_max_visits"); |
|
|
|
|
|
|
|
then(mainCtxMock).should().getRuleChainInputLoopMaxVisits(); |
|
|
|
then(mainCtxMock).shouldHaveNoMoreInteractions(); |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
public void givenCurrentNodeRepeatedBelowCap_whenInput_thenEnqueue() { |
|
|
|
// GIVEN - customer's pagination case: same RuleChainInputNode visited three times already.
|
|
|
|
// With cap > occurrences it must proceed.
|
|
|
|
given(mainCtxMock.getRuleChainInputLoopMaxVisits()).willReturn(10); |
|
|
|
var tpi = resolve(null); |
|
|
|
given(mainCtxMock.resolve(eq(ServiceType.TB_RULE_ENGINE), nullable(String.class), eq(TENANT_ID), eq(TENANT_ID))).willReturn(tpi); |
|
|
|
var clusterService = mock(TbClusterService.class); |
|
|
|
given(mainCtxMock.getClusterService()).willReturn(clusterService); |
|
|
|
|
|
|
|
var callbackMock = mock(TbMsgCallback.class); |
|
|
|
given(callbackMock.isMsgValid()).willReturn(true); |
|
|
|
|
|
|
|
var ruleNode = new RuleNode(RULE_NODE_ID); |
|
|
|
ruleNode.setRuleChainId(RULE_CHAIN_ID); |
|
|
|
ruleNode.setDebugSettings(DebugSettings.off()); |
|
|
|
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); |
|
|
|
given(nodeCtxMock.getSelf()).willReturn(ruleNode); |
|
|
|
|
|
|
|
var msg = TbMsg.newMsg() |
|
|
|
.type(TbMsgType.POST_TELEMETRY_REQUEST) |
|
|
|
.originator(TENANT_ID) |
|
|
|
.copyMetaData(TbMsgMetaData.EMPTY) |
|
|
|
.data(TbMsg.EMPTY_STRING) |
|
|
|
.callback(callbackMock) |
|
|
|
.build(); |
|
|
|
// three prior iterations through the same input node — count 3 < cap 10
|
|
|
|
msg.pushToStack(RULE_CHAIN_ID, RULE_NODE_ID); |
|
|
|
msg.pushToStack(RULE_CHAIN_ID, RULE_NODE_ID); |
|
|
|
msg.pushToStack(RULE_CHAIN_ID, RULE_NODE_ID); |
|
|
|
|
|
|
|
var targetRuleChainId = new RuleChainId(UUID.randomUUID()); |
|
|
|
|
|
|
|
// WHEN
|
|
|
|
defaultTbContext.input(msg, targetRuleChainId); |
|
|
|
|
|
|
|
// THEN
|
|
|
|
then(clusterService).should().pushMsgToRuleEngine(eq(tpi), eq(msg.getId()), any(), any()); |
|
|
|
then(chainActorMock).shouldHaveNoInteractions(); |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
public void givenNegativeCapAndCurrentNodeAlreadyInStack_whenInput_thenTellFailure() { |
|
|
|
// GIVEN - misconfiguration: admin set a non-positive value. Per yaml contract, values < 1
|
|
|
|
// are clamped to 1 — strict behavior is preserved instead of failing at startup.
|
|
|
|
given(mainCtxMock.getRuleChainInputLoopMaxVisits()).willReturn(-3); |
|
|
|
|
|
|
|
var callbackMock = mock(TbMsgCallback.class); |
|
|
|
given(callbackMock.isMsgValid()).willReturn(true); |
|
|
|
|
|
|
|
var ruleNode = new RuleNode(RULE_NODE_ID); |
|
|
|
ruleNode.setRuleChainId(RULE_CHAIN_ID); |
|
|
|
ruleNode.setDebugSettings(DebugSettings.off()); |
|
|
|
given(nodeCtxMock.getSelf()).willReturn(ruleNode); |
|
|
|
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); |
|
|
|
given(nodeCtxMock.getChainActor()).willReturn(chainActorMock); |
|
|
|
|
|
|
|
var msg = TbMsg.newMsg() |
|
|
|
.type(TbMsgType.POST_TELEMETRY_REQUEST) |
|
|
|
.originator(TENANT_ID) |
|
|
|
.copyMetaData(TbMsgMetaData.EMPTY) |
|
|
|
.data(TbMsg.EMPTY_STRING) |
|
|
|
.callback(callbackMock) |
|
|
|
.build(); |
|
|
|
msg.pushToStack(RULE_CHAIN_ID, RULE_NODE_ID); |
|
|
|
|
|
|
|
var targetRuleChainId = new RuleChainId(UUID.randomUUID()); |
|
|
|
|
|
|
|
// WHEN
|
|
|
|
defaultTbContext.input(msg, targetRuleChainId); |
|
|
|
|
|
|
|
// THEN - clamped cap = 1, count = 1, fails with limit 1 in the message
|
|
|
|
ArgumentCaptor<TbActorMsg> tellCaptor = ArgumentCaptor.forClass(TbActorMsg.class); |
|
|
|
then(chainActorMock).should().tell(tellCaptor.capture()); |
|
|
|
TbActorMsg capturedTellMsg = tellCaptor.getValue(); |
|
|
|
RuleNodeToRuleChainTellNextMsg failureMsg = (RuleNodeToRuleChainTellNextMsg) capturedTellMsg; |
|
|
|
assertThat(failureMsg.getFailureMessage()).contains("visit limit 1 reached"); |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
public void givenDefaultCapAndFreshMessage_whenInput_thenEnqueue() { |
|
|
|
// GIVEN - default cap = 1, fresh message (count = 0): first visit must pass.
|
|
|
|
given(mainCtxMock.getRuleChainInputLoopMaxVisits()).willReturn(1); |
|
|
|
var tpi = resolve(null); |
|
|
|
given(mainCtxMock.resolve(eq(ServiceType.TB_RULE_ENGINE), nullable(String.class), eq(TENANT_ID), eq(TENANT_ID))).willReturn(tpi); |
|
|
|
var clusterService = mock(TbClusterService.class); |
|
|
|
given(mainCtxMock.getClusterService()).willReturn(clusterService); |
|
|
|
|
|
|
|
var callbackMock = mock(TbMsgCallback.class); |
|
|
|
given(callbackMock.isMsgValid()).willReturn(true); |
|
|
|
|
|
|
|
var ruleNode = new RuleNode(RULE_NODE_ID); |
|
|
|
ruleNode.setRuleChainId(RULE_CHAIN_ID); |
|
|
|
ruleNode.setDebugSettings(DebugSettings.off()); |
|
|
|
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); |
|
|
|
given(nodeCtxMock.getSelf()).willReturn(ruleNode); |
|
|
|
|
|
|
|
var msg = TbMsg.newMsg() |
|
|
|
.type(TbMsgType.POST_TELEMETRY_REQUEST) |
|
|
|
.originator(TENANT_ID) |
|
|
|
.copyMetaData(TbMsgMetaData.EMPTY) |
|
|
|
.data(TbMsg.EMPTY_STRING) |
|
|
|
.callback(callbackMock) |
|
|
|
.build(); |
|
|
|
var targetRuleChainId = new RuleChainId(UUID.randomUUID()); |
|
|
|
|
|
|
|
// WHEN
|
|
|
|
defaultTbContext.input(msg, targetRuleChainId); |
|
|
|
|
|
|
|
// THEN
|
|
|
|
then(clusterService).should().pushMsgToRuleEngine(eq(tpi), eq(msg.getId()), any(), any()); |
|
|
|
then(chainActorMock).shouldHaveNoInteractions(); |
|
|
|
} |
|
|
|
|
|
|
|
@MethodSource |
|
|
|
|