From 16fdfc518d2a4cee00784fdf1633ff31ae1da992 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 15 Aug 2023 14:43:56 +0200 Subject: [PATCH] TbMathNode: test added for concurrent calls by the same originator utilizing the whole rule-dispatcher pool. 1 failed. non-blocking implementation wanted; Additional refactoring: JUnit5 and mock init --- .../rule/engine/math/TbMathNodeTest.java | 124 ++++++++++++++---- 1 file changed, 98 insertions(+), 26 deletions(-) diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java index 69d1b46dbe..4c36d51a7d 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java @@ -15,17 +15,18 @@ */ package org.thingsboard.rule.engine.math; -import com.datastax.oss.driver.api.core.uuid.Uuids; import com.google.common.util.concurrent.Futures; -import org.junit.After; +import lombok.extern.slf4j.Slf4j; import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.verification.Timeout; import org.thingsboard.common.util.AbstractListeningExecutor; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; @@ -47,21 +48,34 @@ import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import java.util.Arrays; +import java.util.List; import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.willAnswer; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -@RunWith(MockitoJUnitRunner.class) +@ExtendWith(MockitoExtension.class) +@Slf4j public class TbMathNodeTest { - private EntityId originator = new DeviceId(Uuids.timeBased()); - private TenantId tenantId = TenantId.fromUUID(Uuids.timeBased()); + static final int RULE_DISPATCHER_POOL_SIZE = 2; + static final int DB_CALLBACK_POOL_SIZE = 3; + private final EntityId originator = DeviceId.fromString("ccd71696-0586-422d-940e-755a41ec3b0d"); + private final TenantId tenantId = TenantId.fromUUID(UUID.fromString("e7f46b23-0c7d-42f5-9b06-fc35ab17af8a")); @Mock private TbContext ctx; @@ -71,35 +85,41 @@ public class TbMathNodeTest { private TimeseriesService tsService; @Mock private RuleEngineTelemetryService telemetryService; - private AbstractListeningExecutor dbExecutor; + private AbstractListeningExecutor dbCallbackExecutor; + private AbstractListeningExecutor ruleEngineDispatcherExecutor; - @Before + @BeforeEach public void before() { - dbExecutor = new AbstractListeningExecutor() { + dbCallbackExecutor = new AbstractListeningExecutor() { @Override protected int getThreadPollSize() { - return 3; + return DB_CALLBACK_POOL_SIZE; } }; - dbExecutor.init(); - initMocks(); + dbCallbackExecutor.init(); + ruleEngineDispatcherExecutor = new AbstractListeningExecutor() { + @Override + protected int getThreadPollSize() { + return RULE_DISPATCHER_POOL_SIZE; + } + }; + ruleEngineDispatcherExecutor.init(); + + lenient().when(ctx.getAttributesService()).thenReturn(attributesService); + lenient().when(ctx.getTelemetryService()).thenReturn(telemetryService); + lenient().when(ctx.getTimeseriesService()).thenReturn(tsService); + lenient().when(ctx.getTenantId()).thenReturn(tenantId); + lenient().when(ctx.getDbCallbackExecutor()).thenReturn(dbCallbackExecutor); } - @After + @AfterEach public void after() { - dbExecutor.destroy(); + ruleEngineDispatcherExecutor.executor().shutdownNow(); + dbCallbackExecutor.executor().shutdownNow(); } private void initMocks() { - Mockito.reset(ctx); - Mockito.reset(attributesService); - Mockito.reset(tsService); - Mockito.reset(telemetryService); - lenient().when(ctx.getAttributesService()).thenReturn(attributesService); - lenient().when(ctx.getTelemetryService()).thenReturn(telemetryService); - lenient().when(ctx.getTimeseriesService()).thenReturn(tsService); - lenient().when(ctx.getTenantId()).thenReturn(tenantId); - lenient().when(ctx.getDbCallbackExecutor()).thenReturn(dbExecutor); + Mockito.clearInvocations(ctx, attributesService, tsService, telemetryService); } private TbMathNode initNode(TbRuleNodeMathFunctionType operation, TbMathResult result, TbMathArgument... arguments) { @@ -496,4 +516,56 @@ public class TbMathNodeTest { }); Assert.assertNotNull(thrown.getMessage()); } + + @Test + public void testExp4j_concurrent() { + TbMathNode node = spy(initNodeWithCustomFunction("2a+3b", + new TbMathResult(TbMathArgumentType.MESSAGE_BODY, "result", 2, false, false, null), + new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a"), + new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "b") + )); + EntityId originatorSlow = DeviceId.fromString("7f01170d-6bba-419c-b95c-2b4c3ba32f30"); + EntityId originatorFast = DeviceId.fromString("c45360ff-7906-4102-a2ae-3495a86168d0"); + CountDownLatch slowProcessingLatch = new CountDownLatch(1); + + List slowMsgList = List.of( + TbMsg.newMsg("TEST", originatorSlow, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString()), + TbMsg.newMsg("TEST", originatorSlow, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString()) + ); + List fastMsgList = List.of( + TbMsg.newMsg("TEST", originatorFast, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString()), + TbMsg.newMsg("TEST", originatorFast, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString()) + ); + + log.debug("rule-dispatcher [{}], db-callback [{}], slowMsg [{}], fastMsg [{}]", RULE_DISPATCHER_POOL_SIZE, DB_CALLBACK_POOL_SIZE, slowMsgList.size(), fastMsgList.size()); + + willAnswer(invocation -> { + TbContext ctx = invocation.getArgument(0); + TbMsg msg = invocation.getArgument(1); + log.debug("awaiting on slowProcessingLatch [{}]", msg); + try { + assertThat(slowProcessingLatch.await(30, TimeUnit.SECONDS)).as("await on slowProcessingLatch").isTrue(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return invocation.callRealMethod(); + }).given(node).processMsgAsync(eq(ctx), argThat(slowMsgList::contains)); + + // submit slow msg may block all rule engine dispatcher threads + slowMsgList.forEach(msg -> ruleEngineDispatcherExecutor.executeAsync(() -> node.onMsg(ctx, msg))); + // wait until dispatcher threads started with all slowMsg + verify(node, new Timeout(TimeUnit.SECONDS.toMillis(5), times(slowMsgList.size()))).onMsg(eq(ctx), argThat(slowMsgList::contains)); + + // submit fast have to return immediately + fastMsgList.forEach(msg -> ruleEngineDispatcherExecutor.executeAsync(() -> node.onMsg(ctx, msg))); + // wait until all fast messages processed + verify(ctx, new Timeout(TimeUnit.SECONDS.toMillis(5), times(fastMsgList.size()))).tellSuccess(any()); + + slowProcessingLatch.countDown(); + + verify(ctx, new Timeout(TimeUnit.SECONDS.toMillis(5), times(fastMsgList.size() + slowMsgList.size()))).tellSuccess(any()); + + verify(ctx, never()).tellFailure(any(), any()); + } + }