@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
*
* http : //www.apache.org/licenses/LICENSE-2.0
* http : //www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an "AS IS" BASIS ,
@ -36,11 +36,16 @@ import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
import org.thingsboard.server.queue.discovery.ServiceType ;
import org.thingsboard.server.gen.transport.TransportProtos ;
import org.thingsboard.server.queue.provider.TbRuleEngineQueueProvider ;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingDecision ;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult ;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategy ;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategyFactory ;
import javax.annotation.PostConstruct ;
import javax.annotation.PreDestroy ;
import java.util.List ;
import java.util.UUID ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.ConcurrentMap ;
import java.util.concurrent.CountDownLatch ;
import java.util.concurrent.ExecutorService ;
@ -64,10 +69,12 @@ public class DefaultTbRuleEngineConsumerService implements TbRuleEngineConsumerS
private final ActorSystemContext actorContext ;
private final TbQueueConsumer < TbProtoQueueMsg < TransportProtos . ToRuleEngineMsg > > consumer ;
private final TbCoreConsumerStats stats = new TbCoreConsumerStats ( ) ;
private final TbRuleEngineProcessingStrategyFactory factory ;
private volatile ExecutorService mainConsumerExecutor ;
private volatile boolean stopped = false ;
public DefaultTbRuleEngineConsumerService ( TbRuleEngineQueueProvider tbRuleEngineQueueProvider , ActorSystemContext actorContext ) {
public DefaultTbRuleEngineConsumerService ( TbRuleEngineProcessingStrategyFactory factory , TbRuleEngineQueueProvider tbRuleEngineQueueProvider , ActorSystemContext actorContext ) {
this . factory = factory ;
this . consumer = tbRuleEngineQueueProvider . getToRuleEngineMsgConsumer ( ) ;
this . actorContext = actorContext ;
}
@ -75,6 +82,7 @@ public class DefaultTbRuleEngineConsumerService implements TbRuleEngineConsumerS
@PostConstruct
public void init ( ) {
this . mainConsumerExecutor = Executors . newSingleThreadExecutor ( ThingsBoardThreadFactory . forName ( "tb-core-consumer" ) ) ;
this . factory . newInstance ( ) ;
}
@Override
@ -94,29 +102,46 @@ public class DefaultTbRuleEngineConsumerService implements TbRuleEngineConsumerS
if ( msgs . isEmpty ( ) ) {
continue ;
}
ConcurrentMap < UUID , TbProtoQueueMsg < TransportProtos . ToRuleEngineMsg > > ackMap = msgs . stream ( ) . collect (
Collectors . toConcurrentMap ( s - > UUID . randomUUID ( ) , Function . identity ( ) ) ) ;
CountDownLatch processingTimeoutLatch = new CountDownLatch ( 1 ) ;
ackMap . forEach ( ( id , msg ) - > {
TbMsgCallback callback = new MsgPackCallback < > ( id , processingTimeoutLatch , ackMap ) ;
try {
TransportProtos . ToRuleEngineMsg toRuleEngineMsg = msg . getValue ( ) ;
TenantId tenantId = new TenantId ( new UUID ( toRuleEngineMsg . getTenantIdMSB ( ) , toRuleEngineMsg . getTenantIdLSB ( ) ) ) ;
if ( toRuleEngineMsg . getTbMsg ( ) ! = null & & ! toRuleEngineMsg . getTbMsg ( ) . isEmpty ( ) ) {
forwardToRuleEngineActor ( tenantId , toRuleEngineMsg . getTbMsg ( ) , callback ) ;
} else {
callback . onSuccess ( ) ;
TbRuleEngineProcessingStrategy strategy = factory . newInstance ( ) ;
TbRuleEngineProcessingDecision decision = null ;
boolean firstAttempt = true ;
while ( ! stopped & & ( firstAttempt | | ! decision . isCommit ( ) ) ) {
ConcurrentMap < UUID , TbProtoQueueMsg < TransportProtos . ToRuleEngineMsg > > allMap ;
if ( firstAttempt ) {
allMap = msgs . stream ( ) . collect (
Collectors . toConcurrentMap ( s - > UUID . randomUUID ( ) , Function . identity ( ) ) ) ;
firstAttempt = false ;
} else {
allMap = decision . getReprocessMap ( ) ;
}
ConcurrentMap < UUID , TbProtoQueueMsg < TransportProtos . ToRuleEngineMsg > > successMap = new ConcurrentHashMap < > ( ) ;
ConcurrentMap < UUID , TbProtoQueueMsg < TransportProtos . ToRuleEngineMsg > > failedMap = new ConcurrentHashMap < > ( ) ;
CountDownLatch processingTimeoutLatch = new CountDownLatch ( 1 ) ;
allMap . forEach ( ( id , msg ) - > {
TbMsgCallback callback = new MsgPackCallback < > ( id , processingTimeoutLatch , allMap , successMap , failedMap ) ;
try {
TransportProtos . ToRuleEngineMsg toRuleEngineMsg = msg . getValue ( ) ;
TenantId tenantId = new TenantId ( new UUID ( toRuleEngineMsg . getTenantIdMSB ( ) , toRuleEngineMsg . getTenantIdLSB ( ) ) ) ;
if ( toRuleEngineMsg . getTbMsg ( ) ! = null & & ! toRuleEngineMsg . getTbMsg ( ) . isEmpty ( ) ) {
forwardToRuleEngineActor ( tenantId , toRuleEngineMsg . getTbMsg ( ) , callback ) ;
} else {
callback . onSuccess ( ) ;
}
} catch ( Throwable e ) {
callback . onFailure ( e ) ;
}
} catch ( Throwable e ) {
callback . onFailure ( e ) ;
} ) ;
boolean timeout = false ;
if ( ! processingTimeoutLatch . await ( packProcessingTimeout , TimeUnit . MILLISECONDS ) ) {
timeout = true ;
}
} ) ;
if ( ! processingTimeoutLatch . await ( packProcessingTimeout , TimeUnit . MILLISECONDS ) ) {
ackMap . forEach ( ( id , msg ) - > log . warn ( "[{}] Timeout to process message: {}" , id , msg . getValue ( ) ) ) ;
decision = strategy . analyze ( new TbRuleEngineProcessingResult ( timeout , allMap , successMap , failedMap ) ) ;
}
consumer . commit ( ) ;
} catch ( Exception e ) {
log . warn ( "Failed to obtain messages from queue." , e ) ;
log . warn ( "Failed to process messages from queue." , e ) ;
try {
Thread . sleep ( pollDuration ) ;
} catch ( InterruptedException e2 ) {