diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java index 3eecae7f7a..cc74d248d2 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java @@ -137,7 +137,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor ListenableFuture executeAsync(Callable task) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java index bee3896a5b..d8be34372e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java @@ -17,8 +17,10 @@ package org.thingsboard.server.dao.sql; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.transaction.support.TransactionTemplate; +import org.thingsboard.common.util.ListeningExecutor; import javax.sql.DataSource; import java.sql.SQLException; @@ -29,7 +31,8 @@ import java.sql.Statement; public abstract class JpaAbstractDaoListeningExecutorService { @Autowired - protected JpaExecutorService service; + @Qualifier("jpaExecutorService") + protected ListeningExecutor service; @Autowired protected DataSource dataSource; diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java index 0d35e08b5b..d9e77a34e1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java @@ -120,7 +120,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq public ListenableFuture findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { var aggParams = query.getAggParameters(); if (Aggregation.NONE.equals(aggParams.getAggregation()) || aggParams.getInterval() < 1) { - return Futures.immediateFuture(findAllAsyncWithLimit(entityId, query)); + return service.submit(() -> findAllWithLimit(entityId, query)); } else { List>> futures = new ArrayList<>(); var intervalType = aggParams.getIntervalType(); @@ -144,7 +144,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq } } - ReadTsKvQueryResult findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) { + ReadTsKvQueryResult findAllWithLimit(EntityId entityId, ReadTsKvQuery query) { Integer keyId = keyDictionaryDao.getOrSaveKeyId(query.getKey()); List tsKvEntities = tsKvRepository.findAllWithLimit( entityId.getId(), diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java index ae1a95c9ec..c273dd646a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java @@ -152,7 +152,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements var aggParams = query.getAggParameters(); var intervalType = aggParams.getIntervalType(); if (query.getAggregation() == Aggregation.NONE) { - return Futures.immediateFuture(findAllAsyncWithLimit(entityId, query)); + return service.submit(() -> findAllWithLimit(entityId, query)); } else if (IntervalType.MILLISECONDS.equals(intervalType)) { long startTs = query.getStartTs(); long endTs = Math.max(query.getStartTs() + 1, query.getEndTs()); @@ -179,7 +179,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements super.cleanup(systemTtl); } - private ReadTsKvQueryResult findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) { + private ReadTsKvQueryResult findAllWithLimit(EntityId entityId, ReadTsKvQuery query) { String strKey = query.getKey(); Integer keyId = keyDictionaryDao.getOrSaveKeyId(strKey); List timescaleTsKvEntities = tsKvRepository.findAllWithLimit( diff --git a/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java b/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java index 583fc4116d..c3d8394bd3 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDaoTest.java @@ -18,6 +18,8 @@ package org.thingsboard.server.dao.sqlts; import com.google.common.util.concurrent.Futures; import org.junit.Before; import org.junit.Test; +import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult; @@ -48,10 +50,11 @@ public class AbstractChunkedAggregationTimeseriesDaoTest { @Before public void setUp() throws Exception { tsDao = spy(AbstractChunkedAggregationTimeseriesDao.class); + ReflectionTestUtils.setField(tsDao, "service", DirectListeningExecutor.INSTANCE); Optional optionalListenableFuture = Optional.of(mock(TsKvEntry.class)); willReturn(Futures.immediateFuture(optionalListenableFuture)).given(tsDao).findAndAggregateAsync(any(), anyString(), anyLong(), anyLong(), anyLong(), any()); willReturn(Futures.immediateFuture(mock(ReadTsKvQueryResult.class))).given(tsDao).getReadTsKvQueryResultFuture(any(), any()); - willReturn(mock(ReadTsKvQueryResult.class)).given(tsDao).findAllAsyncWithLimit(any(), any()); + willReturn(mock(ReadTsKvQueryResult.class)).given(tsDao).findAllWithLimit(any(), any()); } @Test @@ -161,7 +164,7 @@ public class AbstractChunkedAggregationTimeseriesDaoTest { ReadTsKvQuery query = new BaseReadTsKvQuery(TEMP, 1, 3000, interval, LIMIT, COUNT, DESC); willCallRealMethod().given(tsDao).findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); tsDao.findAllAsync(SYS_TENANT_ID, SYS_TENANT_ID, query); - verify(tsDao, times(1)).findAllAsyncWithLimit(any(), any()); + verify(tsDao, times(1)).findAllWithLimit(any(), any()); verify(tsDao, times(0)).findAndAggregateAsync(any(), any(), anyLong(), anyLong(), anyLong(), any()); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java index b4d2951264..9a29f80b1c 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java @@ -41,6 +41,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -177,15 +178,11 @@ public class TbMsgDeduplicationNode implements TbNode { } } if (resultMsg != null) { - String queueName1 = queueName != null ? queueName : resultMsg.getQueueName(); - deduplicationResults.add(TbMsg.newMsg() - .queueName(queueName1) - .type(resultMsg.getType()) - .originator(resultMsg.getOriginator()) - .customerId(resultMsg.getCustomerId()) - .copyMetaData(resultMsg.getMetaData()) - .data(resultMsg.getData()) - .build()); + var msgBuilder = resultMsg.copyWithNewCtx().id(UUID.randomUUID()); + if (queueName != null) { + msgBuilder.queueName(queueName); + } + deduplicationResults.add(msgBuilder.build()); } } packBoundsOpt = findValidPack(msgList, deduplicationTimeoutMs); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java index f0e4a3e40e..04288cf7f3 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java @@ -62,17 +62,9 @@ public class TbMsgDelayNode implements TbNode { if (msg.isTypeOf(TbMsgType.DELAY_TIMEOUT_SELF_MSG)) { TbMsg pendingMsg = pendingMsgs.remove(UUID.fromString(msg.getData())); if (pendingMsg != null) { - ctx.enqueueForTellNext( - TbMsg.newMsg() - .queueName(pendingMsg.getQueueName()) - .type(pendingMsg.getType()) - .originator(pendingMsg.getOriginator()) - .customerId(pendingMsg.getCustomerId()) - .copyMetaData(pendingMsg.getMetaData()) - .data(pendingMsg.getData()) - .build(), - TbNodeConnectionType.SUCCESS - ); + ctx.enqueueForTellNext(pendingMsg.copyWithNewCtx() + .id(UUID.randomUUID()) + .build(), TbNodeConnectionType.SUCCESS); } } else { if (pendingMsgs.size() < config.getMaxPendingMsgs()) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/external/TbAbstractExternalNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/external/TbAbstractExternalNode.java index e2c218f272..4efa462e37 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/external/TbAbstractExternalNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/external/TbAbstractExternalNode.java @@ -30,7 +30,7 @@ public abstract class TbAbstractExternalNode implements TbNode { protected void tellSuccess(TbContext ctx, TbMsg tbMsg) { if (forceAck) { - ctx.enqueueForTellNext(tbMsg.copyWithNewCtx(), TbNodeConnectionType.SUCCESS); + ctx.enqueueForTellNext(tbMsg.copyWithNewCtx().build(), TbNodeConnectionType.SUCCESS); } else { ctx.tellSuccess(tbMsg); } @@ -39,9 +39,9 @@ public abstract class TbAbstractExternalNode implements TbNode { protected void tellFailure(TbContext ctx, TbMsg tbMsg, Throwable t) { if (forceAck) { if (t == null) { - ctx.enqueueForTellNext(tbMsg.copyWithNewCtx(), TbNodeConnectionType.FAILURE); + ctx.enqueueForTellNext(tbMsg.copyWithNewCtx().build(), TbNodeConnectionType.FAILURE); } else { - ctx.enqueueForTellFailure(tbMsg.copyWithNewCtx(), t); + ctx.enqueueForTellFailure(tbMsg.copyWithNewCtx().build(), t); } } else { if (t == null) { @@ -55,7 +55,7 @@ public abstract class TbAbstractExternalNode implements TbNode { protected TbMsg ackIfNeeded(TbContext ctx, TbMsg msg) { if (forceAck) { ctx.ack(msg); - return msg.copyWithNewCtx(); + return msg.copyWithNewCtx().build(); } else { return msg; } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbClearAlarmNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbClearAlarmNodeTest.java index 529625eba3..b55cf63fab 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbClearAlarmNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbClearAlarmNodeTest.java @@ -27,7 +27,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; -import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.rule.engine.api.RuleEngineAlarmService; import org.thingsboard.rule.engine.api.ScriptEngine; import org.thingsboard.rule.engine.api.TbContext; @@ -84,7 +84,7 @@ class TbClearAlarmNodeTest { @BeforeEach void before() { - dbExecutor = new TestDbCallbackExecutor(); + dbExecutor = DirectListeningExecutor.INSTANCE; metadata = new TbMsgMetaData(); } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbCreateAlarmNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbCreateAlarmNodeTest.java index f5b9f1f836..de61222b8d 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbCreateAlarmNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbCreateAlarmNodeTest.java @@ -29,7 +29,7 @@ import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; -import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.rule.engine.api.RuleEngineAlarmService; import org.thingsboard.rule.engine.api.ScriptEngine; import org.thingsboard.rule.engine.api.TbContext; @@ -99,7 +99,7 @@ class TbCreateAlarmNodeTest { @BeforeEach void before() { - dbExecutor = new TestDbCallbackExecutor(); + dbExecutor = DirectListeningExecutor.INSTANCE; metadata = new TbMsgMetaData(); config = new TbCreateAlarmNodeConfiguration(); diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbCreateRelationNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbCreateRelationNodeTest.java index a0839a4920..60924002da 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbCreateRelationNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbCreateRelationNodeTest.java @@ -29,7 +29,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; -import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; @@ -142,7 +142,7 @@ public class TbCreateRelationNodeTest extends AbstractRuleNodeUpgradeTest { private final DeviceId originatorId = new DeviceId(UUID.fromString("860634b1-8a1e-4693-9ae8-e779c7f5f4da")); private final RuleNodeId ruleNodeId = new RuleNodeId(UUID.fromString("d05a0491-ee7a-484a-8c1b-91111ef39287")); - private final ListeningExecutor dbExecutor = new TestDbCallbackExecutor(); + private final ListeningExecutor dbExecutor = DirectListeningExecutor.INSTANCE; @Mock private TbContext ctxMock; diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbDeleteRelationNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbDeleteRelationNodeTest.java index ff4ecf6f74..a8087cc40b 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbDeleteRelationNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbDeleteRelationNodeTest.java @@ -29,7 +29,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; -import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; @@ -130,7 +130,7 @@ public class TbDeleteRelationNodeTest extends AbstractRuleNodeUpgradeTest { private final DeviceId originatorId = new DeviceId(UUID.fromString("574c9840-0885-4d12-be69-f557d7471a78")); - private final ListeningExecutor dbExecutor = new TestDbCallbackExecutor(); + private final ListeningExecutor dbExecutor = DirectListeningExecutor.INSTANCE; @Mock private TbContext ctxMock; diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java index 3acd94f477..cd47dd6c6f 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java @@ -41,7 +41,7 @@ import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; -import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; @@ -88,7 +88,7 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("ac4ca02e-2ae6-404a-8f7e-c4ae31c56aa7")); private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("64ad971e-9cfa-49e4-9f59-faa1a2350c6e")); - private final ListeningExecutor dbCallbackExecutor = new TestDbCallbackExecutor(); + private final ListeningExecutor dbCallbackExecutor = DirectListeningExecutor.INSTANCE; private TbSaveToCustomCassandraTableNode node; private TbSaveToCustomCassandraTableNodeConfiguration config; diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbUnassignFromCustomerNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbUnassignFromCustomerNodeTest.java index d676aef1d8..de891c13d4 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbUnassignFromCustomerNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbUnassignFromCustomerNodeTest.java @@ -28,7 +28,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; -import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; @@ -100,7 +100,7 @@ class TbUnassignFromCustomerNodeTest extends AbstractRuleNodeUpgradeTest { private final TenantId TENANT_ID = new TenantId(UUID.fromString("06fcc15f-2677-436d-a1cb-7754bd0bcccf")); - private final ListeningExecutor DB_EXECUTOR = new TestDbCallbackExecutor(); + private final ListeningExecutor DB_EXECUTOR = DirectListeningExecutor.INSTANCE; private static Stream givenUnsupportedOriginatorType_whenOnMsg_thenVerifyExceptionThrown() { return unsupportedEntityTypes.stream().flatMap(type -> Stream.of(Arguments.of(type))); diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/ai/TbAiNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/ai/TbAiNodeTest.java index 6acf820fc1..e8aad50463 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/ai/TbAiNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/ai/TbAiNodeTest.java @@ -39,7 +39,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.rule.engine.ai.TbResponseFormat.TbJsonResponseFormat; import org.thingsboard.rule.engine.ai.TbResponseFormat.TbJsonSchemaResponseFormat; import org.thingsboard.rule.engine.ai.TbResponseFormat.TbTextResponseFormat; @@ -164,7 +164,7 @@ class TbAiNodeTest { lenient().when(ctxMock.getTenantId()).thenReturn(tenantId); lenient().when(ctxMock.getAiModelService()).thenReturn(aiModelServiceMock); lenient().when(ctxMock.getAiChatModelService()).thenReturn(aiChatModelServiceMock); - lenient().when(ctxMock.getDbCallbackExecutor()).thenReturn(new TestDbCallbackExecutor()); + lenient().when(ctxMock.getDbCallbackExecutor()).thenReturn(DirectListeningExecutor.INSTANCE); lenient().when(ctxMock.getTbResourceDataCache()).thenReturn(tbResourceDataCacheMock); lenient().when(ctxMock.getResourceService()).thenReturn(resourceServiceMock); } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/aws/sns/TbSnsNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/aws/sns/TbSnsNodeTest.java index 850e2b4bdf..847a7f077c 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/aws/sns/TbSnsNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/aws/sns/TbSnsNodeTest.java @@ -32,7 +32,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.common.util.ListeningExecutor; -import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.server.common.data.id.DeviceId; @@ -59,7 +59,7 @@ import static org.mockito.BDDMockito.verifyNoMoreInteractions; class TbSnsNodeTest { private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("fccfdf2e-6a88-4a94-81dd-5cbb557019cf")); - private final ListeningExecutor executor = new TestDbCallbackExecutor(); + private final ListeningExecutor executor = DirectListeningExecutor.INSTANCE; private TbSnsNode node; private TbSnsNodeConfiguration config; diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/aws/sqs/TbSqsNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/aws/sqs/TbSqsNodeTest.java index a029a0bfbc..9f6649cb0b 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/aws/sqs/TbSqsNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/aws/sqs/TbSqsNodeTest.java @@ -33,7 +33,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.common.util.ListeningExecutor; -import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.rule.engine.aws.sqs.TbSqsNodeConfiguration.QueueType; @@ -63,7 +63,7 @@ import static org.mockito.BDDMockito.verifyNoMoreInteractions; class TbSqsNodeTest { private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("764de824-929f-4114-95ea-0ea0401ffa3d")); - private final ListeningExecutor executor = new TestDbCallbackExecutor(); + private final ListeningExecutor executor = DirectListeningExecutor.INSTANCE; private final String messageId = "msgId-1d186a16-80c7-44b3-a245-a1fc835f20c7"; private final String requestId = "reqId-bef0799b-dde9-4aa0-855b-86bbafaeaf31"; diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeTest.java new file mode 100644 index 0000000000..7e3220cdfb --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeTest.java @@ -0,0 +1,106 @@ +/** + * Copyright © 2016-2026 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.delay; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.RuleNodeId; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.data.msg.TbNodeConnectionType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.TbMsgProcessingCtx; +import org.thingsboard.server.common.msg.queue.TbMsgCallback; + +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.Mockito.lenient; + +@ExtendWith(MockitoExtension.class) +class TbMsgDelayNodeTest { + + final DeviceId deviceId = new DeviceId(UUID.fromString("5770153d-6ca2-4447-8a54-5d8a4538e052")); + final RuleNodeId ruleNodeId = new RuleNodeId(UUID.fromString("ee682a85-7f5a-4182-91bc-46e555138fe2")); + + TbMsgDelayNode node; + + @Mock + TbContext ctxMock; + + @BeforeEach + void setUp() throws TbNodeException { + node = new TbMsgDelayNode(); + var config = new TbMsgDelayNodeConfiguration().defaultConfiguration(); + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + lenient().when(ctxMock.getSelfId()).thenReturn(ruleNodeId); + } + + @Test + void shouldPreserveRuleNodeCounterAndResetCallbackWhenEnqueuingDelayedMsg() { + // GIVEN + int ruleNodeExecCounter = 5; + var originalMsg = TbMsg.newMsg() + .id(UUID.randomUUID()) + .type(TbMsgType.POST_TELEMETRY_REQUEST) + .originator(deviceId) + .metaData(TbMsgMetaData.EMPTY) + .data("{\"temperature\":42}") + .ctx(new TbMsgProcessingCtx(ruleNodeExecCounter)) + .build(); + + String originalMsgId = originalMsg.getId().toString(); + var tickMsg = TbMsg.newMsg() + .type(TbMsgType.DELAY_TIMEOUT_SELF_MSG) + .originator(ruleNodeId) + .metaData(TbMsgMetaData.EMPTY) + .data(originalMsgId) + .build(); + given(ctxMock.newMsg(null, TbMsgType.DELAY_TIMEOUT_SELF_MSG, ruleNodeId, null, TbMsgMetaData.EMPTY, originalMsgId)).willReturn(tickMsg); + + node.onMsg(ctxMock, originalMsg); + + // WHEN + node.onMsg(ctxMock, tickMsg); + + // THEN + var msgCaptor = ArgumentCaptor.forClass(TbMsg.class); + then(ctxMock).should().enqueueForTellNext(msgCaptor.capture(), eq(TbNodeConnectionType.SUCCESS)); + + var enqueuedMsg = msgCaptor.getValue(); + assertThat(enqueuedMsg).usingRecursiveComparison() + .ignoringFields("id", "ts", "callback") + .isEqualTo(originalMsg); + + assertThat(enqueuedMsg.getId()).isNotNull().isNotEqualTo(originalMsg.getId()); + assertThat(enqueuedMsg.getAndIncrementRuleNodeCounter()).isEqualTo(ruleNodeExecCounter); + assertThat(enqueuedMsg.getCallback()).isSameAs(TbMsgCallback.EMPTY); + } + +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbCheckAlarmStatusNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbCheckAlarmStatusNodeTest.java index ac94fcef29..d8a302ae92 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbCheckAlarmStatusNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbCheckAlarmStatusNodeTest.java @@ -21,7 +21,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.rule.engine.api.RuleEngineAlarmService; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; @@ -53,7 +53,7 @@ class TbCheckAlarmStatusNodeTest { private static final TenantId TENANT_ID = new TenantId(UUID.randomUUID()); private static final DeviceId DEVICE_ID = new DeviceId(UUID.randomUUID()); private static final AlarmId ALARM_ID = new AlarmId(UUID.randomUUID()); - private static final TestDbCallbackExecutor DB_EXECUTOR = new TestDbCallbackExecutor(); + private static final DirectListeningExecutor DB_EXECUTOR = DirectListeningExecutor.INSTANCE; private TbCheckAlarmStatusNode node; diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbCheckRelationNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbCheckRelationNodeTest.java index a38cfa2a4e..e6011c74d8 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbCheckRelationNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbCheckRelationNodeTest.java @@ -24,7 +24,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.mockito.ArgumentCaptor; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; -import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; @@ -65,7 +65,7 @@ class TbCheckRelationNodeTest extends AbstractRuleNodeUpgradeTest { private final TenantId TENANT_ID = new TenantId(UUID.randomUUID()); private final DeviceId ORIGINATOR_ID = new DeviceId(UUID.randomUUID()); - private final TestDbCallbackExecutor DB_EXECUTOR = new TestDbCallbackExecutor(); + private final DirectListeningExecutor DB_EXECUTOR = DirectListeningExecutor.INSTANCE; private final TbMsg EMPTY_POST_ATTRIBUTES_MSG = TbMsg.newMsg() .type(TbMsgType.POST_ATTRIBUTES_REQUEST) .originator(ORIGINATOR_ID) diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNodeTest.java index 70203437a8..7ce1c9977c 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNodeTest.java @@ -32,7 +32,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; -import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; @@ -65,7 +65,7 @@ import static org.mockito.BDDMockito.willThrow; class TbPubSubNodeTest { private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("d29849c2-3f21-48e2-8557-74cdd6403290")); - private final ListeningExecutor executor = new TestDbCallbackExecutor(); + private final ListeningExecutor executor = DirectListeningExecutor.INSTANCE; private TbPubSubNode node; private TbPubSubNodeConfiguration config; diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeTest.java index 63a7d8297a..5df7e1be7b 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/kafka/TbKafkaNodeTest.java @@ -40,7 +40,7 @@ import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; -import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; @@ -79,7 +79,7 @@ public class TbKafkaNodeTest extends AbstractRuleNodeUpgradeTest { private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("5f2eac08-bd1f-4635-a6c2-437369f996cf")); private final RuleNodeId RULE_NODE_ID = new RuleNodeId(UUID.fromString("d46bb666-ecab-4d89-a28f-5abdca23ac29")); - private final ListeningExecutor executor = new TestDbCallbackExecutor(); + private final ListeningExecutor executor = DirectListeningExecutor.INSTANCE; private final long OFFSET = 1; private final int PARTITION = 0; diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNodeTest.java index 928af5570f..f32662dfbc 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/CalculateDeltaNodeTest.java @@ -30,10 +30,10 @@ import org.mockito.Mock; import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; import org.thingsboard.common.util.AbstractListeningExecutor; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; -import org.thingsboard.rule.engine.TestDbCallbackExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; @@ -91,7 +91,7 @@ public class CalculateDeltaNodeTest extends AbstractRuleNodeUpgradeTest { private final DeviceId DUMMY_DEVICE_ORIGINATOR = new DeviceId(UUID.fromString("2ba3ded4-882b-40cf-999a-89da9ccd58f9")); private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("3842e740-0d89-43a9-8d52-ae44023847ba")); - private final ListeningExecutor DB_EXECUTOR = new TestDbCallbackExecutor(); + private final ListeningExecutor DB_EXECUTOR = DirectListeningExecutor.INSTANCE; private static final int RULE_DISPATCHER_POOL_SIZE = 2; private static final int DB_CALLBACK_POOL_SIZE = 3; diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java index 18a0a009d2..a23bfd360f 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java @@ -28,7 +28,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; -import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; @@ -85,7 +85,7 @@ public class TbGetCustomerAttributeNodeTest { private final DeviceId DUMMY_DEVICE_ORIGINATOR = new DeviceId(UUID.randomUUID()); private final TenantId TENANT_ID = TenantId.fromUUID(UUID.randomUUID()); private final CustomerId CUSTOMER_ID = new CustomerId(UUID.randomUUID()); - private final ListeningExecutor DB_EXECUTOR = new TestDbCallbackExecutor(); + private final ListeningExecutor DB_EXECUTOR = DirectListeningExecutor.INSTANCE; @Mock private TbContext ctxMock; diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerDetailsNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerDetailsNodeTest.java index dad2a2ef47..5e9726ec97 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerDetailsNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerDetailsNodeTest.java @@ -26,7 +26,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; -import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; @@ -77,7 +77,7 @@ public class TbGetCustomerDetailsNodeTest { private static final DeviceId DUMMY_DEVICE_ORIGINATOR = new DeviceId(UUID.randomUUID()); private static final TenantId TENANT_ID = new TenantId(UUID.randomUUID()); - private static final ListeningExecutor DB_EXECUTOR = new TestDbCallbackExecutor(); + private static final ListeningExecutor DB_EXECUTOR = DirectListeningExecutor.INSTANCE; @Mock private TbContext ctxMock; @Mock diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetDeviceAttrNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetDeviceAttrNodeTest.java index 6fd15d45e7..4f267c0dfb 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetDeviceAttrNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetDeviceAttrNodeTest.java @@ -26,7 +26,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; -import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; @@ -62,7 +62,7 @@ public class TbGetDeviceAttrNodeTest extends AbstractRuleNodeUpgradeTest { private final TenantId TENANT_ID = new TenantId(UUID.fromString("5aea576c-66c4-4732-86b8-dc6bfcde7443")); private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("40b6b393-6ddf-47f9-973a-18550ca70384")); - private final ListeningExecutor executor = new TestDbCallbackExecutor(); + private final ListeningExecutor executor = DirectListeningExecutor.INSTANCE; private TbGetDeviceAttrNode node; diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetOriginatorFieldsNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetOriginatorFieldsNodeTest.java index 6c2d37395c..fff7437fdf 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetOriginatorFieldsNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetOriginatorFieldsNodeTest.java @@ -25,7 +25,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; -import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; @@ -60,7 +60,7 @@ public class TbGetOriginatorFieldsNodeTest { private static final DeviceId DUMMY_DEVICE_ORIGINATOR = new DeviceId(UUID.randomUUID()); private static final TenantId DUMMY_TENANT_ID = new TenantId(UUID.randomUUID()); - private static final ListeningExecutor DB_EXECUTOR = new TestDbCallbackExecutor(); + private static final ListeningExecutor DB_EXECUTOR = DirectListeningExecutor.INSTANCE; @Mock private TbContext ctxMock; @Mock diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNodeTest.java index 14ced06c1b..a31d23db55 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNodeTest.java @@ -28,7 +28,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; -import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; @@ -92,7 +92,7 @@ public class TbGetRelatedAttributeNodeTest { private static final EntityId DUMMY_DEVICE_ORIGINATOR = new DeviceId(UUID.randomUUID()); private static final TenantId TENANT_ID = new TenantId(UUID.randomUUID()); - private static final ListeningExecutor DB_EXECUTOR = new TestDbCallbackExecutor(); + private static final ListeningExecutor DB_EXECUTOR = DirectListeningExecutor.INSTANCE; @Mock private TbContext ctxMock; @Mock diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java index a92978e929..ce3a1b5b22 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNodeTest.java @@ -29,7 +29,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; -import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; @@ -72,7 +72,7 @@ public class TbGetTelemetryNodeTest extends AbstractRuleNodeUpgradeTest { private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("5738401b-9dba-422b-b656-a62fe7431917")); private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("8a8fd749-b2ec-488b-a6c6-fc66614d8686")); - private final ListeningExecutor executor = new TestDbCallbackExecutor(); + private final ListeningExecutor executor = DirectListeningExecutor.INSTANCE; private TbGetTelemetryNode node; private TbGetTelemetryNodeConfiguration config; diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNodeTest.java index 320339ead4..88db3dc0cd 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNodeTest.java @@ -27,7 +27,7 @@ import org.mockito.ArgumentMatcher; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; @@ -71,7 +71,7 @@ public class TbGetTenantAttributeNodeTest { private static final DeviceId DUMMY_DEVICE_ORIGINATOR = new DeviceId(UUID.randomUUID()); private static final TenantId TENANT_ID = new TenantId(UUID.randomUUID()); - private static final TestDbCallbackExecutor DB_EXECUTOR = new TestDbCallbackExecutor(); + private static final DirectListeningExecutor DB_EXECUTOR = DirectListeningExecutor.INSTANCE; @Mock private TbContext ctxMock; @Mock diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNodeTest.java index db5e430a73..b8436f3304 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNodeTest.java @@ -36,7 +36,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; -import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; @@ -78,7 +78,7 @@ public class TbRabbitMqNodeTest { ); private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("b3d6f9dd-15cc-4e61-acc0-13197a090406")); - private final ListeningExecutor executor = new TestDbCallbackExecutor(); + private final ListeningExecutor executor = DirectListeningExecutor.INSTANCE; private TbRabbitMqNode node; private TbRabbitMqNodeConfiguration config; diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java index e31e5428bd..a41bad76cc 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java @@ -30,7 +30,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; -import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.rule.engine.api.RuleEngineAlarmService; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; @@ -89,7 +89,7 @@ public class TbChangeOriginatorNodeTest { private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("990605a4-db46-4ed4-942f-e18200453571")); private final AssetId ASSET_ID = new AssetId(UUID.fromString("55de3f10-1b55-4950-b711-ed132896b260")); - private final ListeningExecutor dbExecutor = new TestDbCallbackExecutor(); + private final ListeningExecutor dbExecutor = DirectListeningExecutor.INSTANCE; private TbChangeOriginatorNode node; private TbChangeOriginatorNodeConfiguration config; diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java index 1baae125ef..ec71819878 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java @@ -47,6 +47,8 @@ import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.TbMsgProcessingCtx; +import org.thingsboard.server.common.msg.queue.TbMsgCallback; import java.util.ArrayList; import java.util.List; @@ -79,6 +81,8 @@ public class TbMsgDeduplicationNodeTest extends AbstractRuleNodeUpgradeTest { private TbContext ctx; + private static final int RULE_NODE_EXEC_COUNTER = 5; + private final ScheduledExecutorService executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("de-duplication-node-test"); private final int deduplicationInterval = 1; @@ -204,6 +208,8 @@ public class TbMsgDeduplicationNodeTest extends AbstractRuleNodeUpgradeTest { Assertions.assertEquals(firstMsg.getData(), actualMsg.getData()); Assertions.assertEquals(firstMsg.getMetaData(), actualMsg.getMetaData()); Assertions.assertEquals(firstMsg.getType(), actualMsg.getType()); + Assertions.assertEquals(RULE_NODE_EXEC_COUNTER, actualMsg.getAndIncrementRuleNodeCounter()); + Assertions.assertSame(TbMsgCallback.EMPTY, actualMsg.getCallback()); if (queueName == null) { Assertions.assertEquals(firstMsg.getQueueName(), actualMsg.getQueueName()); @@ -257,6 +263,8 @@ public class TbMsgDeduplicationNodeTest extends AbstractRuleNodeUpgradeTest { Assertions.assertEquals(msgWithLatestTs.getData(), actualMsg.getData()); Assertions.assertEquals(msgWithLatestTs.getMetaData(), actualMsg.getMetaData()); Assertions.assertEquals(msgWithLatestTs.getType(), actualMsg.getType()); + Assertions.assertEquals(RULE_NODE_EXEC_COUNTER, actualMsg.getAndIncrementRuleNodeCounter()); + Assertions.assertSame(TbMsgCallback.EMPTY, actualMsg.getCallback()); } @Test @@ -402,6 +410,8 @@ public class TbMsgDeduplicationNodeTest extends AbstractRuleNodeUpgradeTest { Assertions.assertEquals(msgWithLatestTsInFirstPack.getData(), actualMsg.getData()); Assertions.assertEquals(msgWithLatestTsInFirstPack.getMetaData(), actualMsg.getMetaData()); Assertions.assertEquals(msgWithLatestTsInFirstPack.getType(), actualMsg.getType()); + Assertions.assertEquals(RULE_NODE_EXEC_COUNTER, actualMsg.getAndIncrementRuleNodeCounter()); + Assertions.assertSame(TbMsgCallback.EMPTY, actualMsg.getCallback()); // verify that newMsg is called but content of messages is the same as in the last msg for the second pack. actualMsg = resultMsgs.get(1); @@ -411,6 +421,8 @@ public class TbMsgDeduplicationNodeTest extends AbstractRuleNodeUpgradeTest { Assertions.assertEquals(msgWithLatestTsInSecondPack.getData(), actualMsg.getData()); Assertions.assertEquals(msgWithLatestTsInSecondPack.getMetaData(), actualMsg.getMetaData()); Assertions.assertEquals(msgWithLatestTsInSecondPack.getType(), actualMsg.getType()); + Assertions.assertEquals(RULE_NODE_EXEC_COUNTER, actualMsg.getAndIncrementRuleNodeCounter()); + Assertions.assertSame(TbMsgCallback.EMPTY, actualMsg.getCallback()); } @Test @@ -539,6 +551,7 @@ public class TbMsgDeduplicationNodeTest extends AbstractRuleNodeUpgradeTest { .originator(deviceId) .copyMetaData(metaData) .data(JacksonUtil.toString(dataNode)) + .ctx(new TbMsgProcessingCtx(RULE_NODE_EXEC_COUNTER)) .build(); } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/EntitiesFieldsAsyncLoaderTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/EntitiesFieldsAsyncLoaderTest.java index 7d13b0aa46..d1bcc4559e 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/EntitiesFieldsAsyncLoaderTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/EntitiesFieldsAsyncLoaderTest.java @@ -22,8 +22,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.common.util.ListeningExecutor; -import org.thingsboard.rule.engine.TestDbCallbackExecutor; import org.thingsboard.rule.engine.api.RuleEngineAlarmService; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeException; @@ -76,7 +76,7 @@ import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) public class EntitiesFieldsAsyncLoaderTest { - private static final ListeningExecutor DB_EXECUTOR = new TestDbCallbackExecutor(); + private static final ListeningExecutor DB_EXECUTOR = DirectListeningExecutor.INSTANCE; private static EnumSet SUPPORTED_ENTITY_TYPES; private static UUID RANDOM_UUID; private static TenantId TENANT_ID; diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/EntitiesRelatedDeviceIdAsyncLoaderTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/EntitiesRelatedDeviceIdAsyncLoaderTest.java index 200b355fcb..23e98dfc58 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/EntitiesRelatedDeviceIdAsyncLoaderTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/EntitiesRelatedDeviceIdAsyncLoaderTest.java @@ -21,7 +21,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.thingsboard.common.util.ListeningExecutor; -import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.data.DeviceRelationsQuery; import org.thingsboard.server.common.data.Device; @@ -49,7 +49,7 @@ public class EntitiesRelatedDeviceIdAsyncLoaderTest { private static final EntityId DUMMY_ORIGINATOR = new DeviceId(UUID.randomUUID()); private static final TenantId TENANT_ID = new TenantId(UUID.randomUUID()); - private static final ListeningExecutor DB_EXECUTOR = new TestDbCallbackExecutor(); + private static final ListeningExecutor DB_EXECUTOR = DirectListeningExecutor.INSTANCE; @Mock private TbContext ctxMock; @Mock diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoaderTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoaderTest.java index e61f51ea96..1131586a07 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoaderTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoaderTest.java @@ -21,7 +21,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentMatchers; import org.thingsboard.common.util.ListeningExecutor; -import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.data.RelationsQuery; import org.thingsboard.server.common.data.Device; @@ -56,7 +56,7 @@ public class EntitiesRelatedEntityIdAsyncLoaderTest { private static final EntityId ASSET_ORIGINATOR_ID = new AssetId(UUID.randomUUID()); private static final TenantId TENANT_ID = new TenantId(UUID.randomUUID()); - private static final ListeningExecutor DB_EXECUTOR = new TestDbCallbackExecutor(); + private static final ListeningExecutor DB_EXECUTOR = DirectListeningExecutor.INSTANCE; private TbContext ctxMock; private RelationService relationServiceMock; diff --git a/ui-ngx/package.json b/ui-ngx/package.json index 80e67c1fb6..f8a7a8b953 100644 --- a/ui-ngx/package.json +++ b/ui-ngx/package.json @@ -142,6 +142,6 @@ "esbuild": "0.25.9", "rollup": "4.52.3", "jquery.terminal/**/form-data": ">=4.0.4", - "js-beautify/**/minimatch": "^9.0.6" + "js-beautify/**/minimatch": "^9.0.7" } } diff --git a/ui-ngx/src/app/modules/home/components/widget/lib/maps/image-map.ts b/ui-ngx/src/app/modules/home/components/widget/lib/maps/image-map.ts index 15079c8c0e..3b8a8f04c5 100644 --- a/ui-ngx/src/app/modules/home/components/widget/lib/maps/image-map.ts +++ b/ui-ngx/src/app/modules/home/components/widget/lib/maps/image-map.ts @@ -83,8 +83,6 @@ export class TbImageMap extends TbMap { this.onResize(true); } else { this.onResize(); - this.initMapSubject.next(this.map); - this.initMapSubject.complete(); } }); return this.initMapSubject.asObservable(); @@ -227,6 +225,8 @@ export class TbImageMap extends TbMap { attributionControl: false }); this.updateMaxBounds(updateImage); + this.initMapSubject.next(this.map); + this.initMapSubject.complete(); } } diff --git a/ui-ngx/src/app/shared/components/toast.directive.ts b/ui-ngx/src/app/shared/components/toast.directive.ts index ac3d284474..773a6d298e 100644 --- a/ui-ngx/src/app/shared/components/toast.directive.ts +++ b/ui-ngx/src/app/shared/components/toast.directive.ts @@ -135,8 +135,10 @@ export class ToastDirective implements AfterViewInit, OnDestroy { notification: notificationMessage, panelClass, destroyToastComponent: () => { - this.viewContainerRef.detach(0); - this.toastComponentRef.destroy(); + if (this.toastComponentRef) { + this.viewContainerRef.detach(0); + this.toastComponentRef.destroy(); + } } }; const providers: StaticProvider[] = [ diff --git a/ui-ngx/yarn.lock b/ui-ngx/yarn.lock index 627a0706cb..50d1a0df4e 100644 --- a/ui-ngx/yarn.lock +++ b/ui-ngx/yarn.lock @@ -7732,24 +7732,24 @@ minimalistic-assert@^1.0.0: resolved "https://registry.yarnpkg.com/minimalistic-assert/-/minimalistic-assert-1.0.1.tgz#2e194de044626d4a10e7f7fbc00ce73e83e4d5c7" integrity sha512-UtJcAD4yEaGtjPezWuO9wC4nwUnVH/8/Im3yEHQP4b67cXlD/Qr9hdITCU1xDbSEXg2XKNaP8jsReV7vQd00/A== -minimatch@9.0.1, minimatch@^9.0.4, minimatch@^9.0.5, minimatch@^9.0.6: - version "9.0.6" - resolved "https://registry.yarnpkg.com/minimatch/-/minimatch-9.0.6.tgz#a7e3bccfcb3d78ec1bf8d51c9ba749080237a5c8" - integrity sha512-kQAVowdR33euIqeA0+VZTDqU+qo1IeVY+hrKYtZMio3Pg0P0vuh/kwRylLUddJhB6pf3q/botcOvRtx4IN1wqQ== +minimatch@9.0.1, minimatch@^9.0.4, minimatch@^9.0.5, minimatch@^9.0.7: + version "9.0.7" + resolved "https://registry.yarnpkg.com/minimatch/-/minimatch-9.0.7.tgz#d76c4d0b3b527877016d6cc1b9922fc8e0ffe7b0" + integrity sha512-MOwgjc8tfrpn5QQEvjijjmDVtMw2oL88ugTevzxQnzRLm6l3fVEF2gzU0kYeYYKD8C66+IdGX6peJ4MyUlUnPg== dependencies: brace-expansion "^5.0.2" minimatch@^10.0.3, minimatch@^10.1.1: - version "10.2.2" - resolved "https://registry.yarnpkg.com/minimatch/-/minimatch-10.2.2.tgz#361603ee323cfb83496fea2ae17cc44ea4e1f99f" - integrity sha512-+G4CpNBxa5MprY+04MbgOw1v7So6n5JY166pFi9KfYwT78fxScCeSNQSNzp6dpPSW2rONOps6Ocam1wFhCgoVw== + version "10.2.3" + resolved "https://registry.yarnpkg.com/minimatch/-/minimatch-10.2.3.tgz#c0ef582f21071b0123a5bbf275252ebda921fbf6" + integrity sha512-Rwi3pnapEqirPSbWbrZaa6N3nmqq4Xer/2XooiOKyV3q12ML06f7MOuc5DVH8ONZIFhwIYQ3yzPH4nt7iWHaTg== dependencies: brace-expansion "^5.0.2" minimatch@^3.1.2: - version "3.1.3" - resolved "https://registry.yarnpkg.com/minimatch/-/minimatch-3.1.3.tgz#6a5cba9b31f503887018f579c89f81f61162e624" - integrity sha512-M2GCs7Vk83NxkUyQV1bkABc4yxgz9kILhHImZiBPAZ9ybuvCb0/H7lEl5XvIg3g+9d4eNotkZA5IWwYl0tibaA== + version "3.1.4" + resolved "https://registry.yarnpkg.com/minimatch/-/minimatch-3.1.4.tgz#89d910ea3970a77ac8edfd30340ccd038b758079" + integrity sha512-twmL+S8+7yIsE9wsqgzU3E8/LumN3M3QELrBZ20OdmQ9jB2JvW5oZtBEmft84k/Gs5CG9mqtWc6Y9vW+JEzGxw== dependencies: brace-expansion "^1.1.7"