@ -15,17 +15,18 @@
* /
package org.thingsboard.rule.engine.math ;
import com.datastax.oss.driver.api.core.uuid.Uuids ;
import com.google.common.util.concurrent.Futures ;
import org.junit.After ;
import lombok.extern.slf4j.Slf4j ;
import org.junit.Assert ;
import org.junit.Before ;
import org.junit.Test ;
import org.junit.runner.RunWith ;
import org.junit.jupiter.api.AfterEach ;
import org.junit.jupiter.api.BeforeEach ;
import org.junit.jupiter.api.Test ;
import org.junit.jupiter.api.extension.ExtendWith ;
import org.mockito.ArgumentCaptor ;
import org.mockito.Mock ;
import org.mockito.Mockito ;
import org.mockito.junit.MockitoJUnitRunner ;
import org.mockito.junit.jupiter.MockitoExtension ;
import org.mockito.verification.Timeout ;
import org.thingsboard.common.util.AbstractListeningExecutor ;
import org.thingsboard.common.util.JacksonUtil ;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService ;
@ -47,21 +48,34 @@ import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.timeseries.TimeseriesService ;
import java.util.Arrays ;
import java.util.List ;
import java.util.Optional ;
import java.util.UUID ;
import java.util.concurrent.CountDownLatch ;
import java.util.concurrent.TimeUnit ;
import static org.assertj.core.api.Assertions.assertThat ;
import static org.junit.jupiter.api.Assertions.assertThrows ;
import static org.mockito.ArgumentMatchers.any ;
import static org.mockito.ArgumentMatchers.anyDouble ;
import static org.mockito.ArgumentMatchers.anyString ;
import static org.mockito.ArgumentMatchers.argThat ;
import static org.mockito.ArgumentMatchers.eq ;
import static org.mockito.BDDMockito.willAnswer ;
import static org.mockito.Mockito.lenient ;
import static org.mockito.Mockito.never ;
import static org.mockito.Mockito.spy ;
import static org.mockito.Mockito.times ;
import static org.mockito.Mockito.verify ;
@RunWith ( MockitoJUnitRunner . class )
@ExtendWith ( MockitoExtension . class )
@Slf4j
public class TbMathNodeTest {
private EntityId originator = new DeviceId ( Uuids . timeBased ( ) ) ;
private TenantId tenantId = TenantId . fromUUID ( Uuids . timeBased ( ) ) ;
static final int RULE_DISPATCHER_POOL_SIZE = 2 ;
static final int DB_CALLBACK_POOL_SIZE = 3 ;
private final EntityId originator = DeviceId . fromString ( "ccd71696-0586-422d-940e-755a41ec3b0d" ) ;
private final TenantId tenantId = TenantId . fromUUID ( UUID . fromString ( "e7f46b23-0c7d-42f5-9b06-fc35ab17af8a" ) ) ;
@Mock
private TbContext ctx ;
@ -71,35 +85,41 @@ public class TbMathNodeTest {
private TimeseriesService tsService ;
@Mock
private RuleEngineTelemetryService telemetryService ;
private AbstractListeningExecutor dbExecutor ;
private AbstractListeningExecutor dbCallbackExecutor ;
private AbstractListeningExecutor ruleEngineDispatcherExecutor ;
@Before
@BeforeEach
public void before ( ) {
dbExecutor = new AbstractListeningExecutor ( ) {
dbCallback Executor = new AbstractListeningExecutor ( ) {
@Override
protected int getThreadPollSize ( ) {
return 3 ;
return DB_CALLBACK_POOL_SIZE ;
}
} ;
dbExecutor . init ( ) ;
initMocks ( ) ;
dbCallbackExecutor . init ( ) ;
ruleEngineDispatcherExecutor = new AbstractListeningExecutor ( ) {
@Override
protected int getThreadPollSize ( ) {
return RULE_DISPATCHER_POOL_SIZE ;
}
} ;
ruleEngineDispatcherExecutor . init ( ) ;
lenient ( ) . when ( ctx . getAttributesService ( ) ) . thenReturn ( attributesService ) ;
lenient ( ) . when ( ctx . getTelemetryService ( ) ) . thenReturn ( telemetryService ) ;
lenient ( ) . when ( ctx . getTimeseriesService ( ) ) . thenReturn ( tsService ) ;
lenient ( ) . when ( ctx . getTenantId ( ) ) . thenReturn ( tenantId ) ;
lenient ( ) . when ( ctx . getDbCallbackExecutor ( ) ) . thenReturn ( dbCallbackExecutor ) ;
}
@After
@AfterEach
public void after ( ) {
dbExecutor . destroy ( ) ;
ruleEngineDispatcherExecutor . executor ( ) . shutdownNow ( ) ;
dbCallbackExecutor . executor ( ) . shutdownNow ( ) ;
}
private void initMocks ( ) {
Mockito . reset ( ctx ) ;
Mockito . reset ( attributesService ) ;
Mockito . reset ( tsService ) ;
Mockito . reset ( telemetryService ) ;
lenient ( ) . when ( ctx . getAttributesService ( ) ) . thenReturn ( attributesService ) ;
lenient ( ) . when ( ctx . getTelemetryService ( ) ) . thenReturn ( telemetryService ) ;
lenient ( ) . when ( ctx . getTimeseriesService ( ) ) . thenReturn ( tsService ) ;
lenient ( ) . when ( ctx . getTenantId ( ) ) . thenReturn ( tenantId ) ;
lenient ( ) . when ( ctx . getDbCallbackExecutor ( ) ) . thenReturn ( dbExecutor ) ;
Mockito . clearInvocations ( ctx , attributesService , tsService , telemetryService ) ;
}
private TbMathNode initNode ( TbRuleNodeMathFunctionType operation , TbMathResult result , TbMathArgument . . . arguments ) {
@ -496,4 +516,56 @@ public class TbMathNodeTest {
} ) ;
Assert . assertNotNull ( thrown . getMessage ( ) ) ;
}
@Test
public void testExp4j_concurrent ( ) {
TbMathNode node = spy ( initNodeWithCustomFunction ( "2a+3b" ,
new TbMathResult ( TbMathArgumentType . MESSAGE_BODY , "result" , 2 , false , false , null ) ,
new TbMathArgument ( TbMathArgumentType . MESSAGE_BODY , "a" ) ,
new TbMathArgument ( TbMathArgumentType . MESSAGE_BODY , "b" )
) ) ;
EntityId originatorSlow = DeviceId . fromString ( "7f01170d-6bba-419c-b95c-2b4c3ba32f30" ) ;
EntityId originatorFast = DeviceId . fromString ( "c45360ff-7906-4102-a2ae-3495a86168d0" ) ;
CountDownLatch slowProcessingLatch = new CountDownLatch ( 1 ) ;
List < TbMsg > slowMsgList = List . of (
TbMsg . newMsg ( "TEST" , originatorSlow , new TbMsgMetaData ( ) , JacksonUtil . newObjectNode ( ) . put ( "a" , 2 ) . put ( "b" , 2 ) . toString ( ) ) ,
TbMsg . newMsg ( "TEST" , originatorSlow , new TbMsgMetaData ( ) , JacksonUtil . newObjectNode ( ) . put ( "a" , 2 ) . put ( "b" , 2 ) . toString ( ) )
) ;
List < TbMsg > fastMsgList = List . of (
TbMsg . newMsg ( "TEST" , originatorFast , new TbMsgMetaData ( ) , JacksonUtil . newObjectNode ( ) . put ( "a" , 2 ) . put ( "b" , 2 ) . toString ( ) ) ,
TbMsg . newMsg ( "TEST" , originatorFast , new TbMsgMetaData ( ) , JacksonUtil . newObjectNode ( ) . put ( "a" , 2 ) . put ( "b" , 2 ) . toString ( ) )
) ;
log . debug ( "rule-dispatcher [{}], db-callback [{}], slowMsg [{}], fastMsg [{}]" , RULE_DISPATCHER_POOL_SIZE , DB_CALLBACK_POOL_SIZE , slowMsgList . size ( ) , fastMsgList . size ( ) ) ;
willAnswer ( invocation - > {
TbContext ctx = invocation . getArgument ( 0 ) ;
TbMsg msg = invocation . getArgument ( 1 ) ;
log . debug ( "awaiting on slowProcessingLatch [{}]" , msg ) ;
try {
assertThat ( slowProcessingLatch . await ( 30 , TimeUnit . SECONDS ) ) . as ( "await on slowProcessingLatch" ) . isTrue ( ) ;
} catch ( InterruptedException e ) {
throw new RuntimeException ( e ) ;
}
return invocation . callRealMethod ( ) ;
} ) . given ( node ) . processMsgAsync ( eq ( ctx ) , argThat ( slowMsgList : : contains ) ) ;
// submit slow msg may block all rule engine dispatcher threads
slowMsgList . forEach ( msg - > ruleEngineDispatcherExecutor . executeAsync ( ( ) - > node . onMsg ( ctx , msg ) ) ) ;
// wait until dispatcher threads started with all slowMsg
verify ( node , new Timeout ( TimeUnit . SECONDS . toMillis ( 5 ) , times ( slowMsgList . size ( ) ) ) ) . onMsg ( eq ( ctx ) , argThat ( slowMsgList : : contains ) ) ;
// submit fast have to return immediately
fastMsgList . forEach ( msg - > ruleEngineDispatcherExecutor . executeAsync ( ( ) - > node . onMsg ( ctx , msg ) ) ) ;
// wait until all fast messages processed
verify ( ctx , new Timeout ( TimeUnit . SECONDS . toMillis ( 5 ) , times ( fastMsgList . size ( ) ) ) ) . tellSuccess ( any ( ) ) ;
slowProcessingLatch . countDown ( ) ;
verify ( ctx , new Timeout ( TimeUnit . SECONDS . toMillis ( 5 ) , times ( fastMsgList . size ( ) + slowMsgList . size ( ) ) ) ) . tellSuccess ( any ( ) ) ;
verify ( ctx , never ( ) ) . tellFailure ( any ( ) , any ( ) ) ;
}
}