@ -23,11 +23,14 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service ;
import org.thingsboard.common.util.ThingsBoardThreadFactory ;
import org.thingsboard.server.actors.ActorSystemContext ;
import org.thingsboard.server.common.data.id.EntityId ;
import org.thingsboard.server.common.data.id.EntityIdFactory ;
import org.thingsboard.server.common.data.id.QueueId ;
import org.thingsboard.server.common.data.id.TenantId ;
import org.thingsboard.server.common.data.queue.Queue ;
import org.thingsboard.server.common.data.rpc.RpcError ;
import org.thingsboard.server.common.msg.TbMsg ;
import org.thingsboard.server.common.msg.gen.MsgProtos ;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg ;
import org.thingsboard.server.common.msg.queue.RuleEngineException ;
import org.thingsboard.server.common.msg.queue.RuleNodeInfo ;
@ -42,12 +45,14 @@ import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.gen.transport.TransportProtos ;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg ;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg ;
import org.thingsboard.server.queue.TbQueueAdmin ;
import org.thingsboard.server.queue.TbQueueConsumer ;
import org.thingsboard.server.queue.common.TbProtoQueueMsg ;
import org.thingsboard.server.queue.discovery.PartitionService ;
import org.thingsboard.server.queue.discovery.QueueKey ;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider ;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent ;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider ;
import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory ;
import org.thingsboard.server.queue.util.DataDecodingEncodingService ;
import org.thingsboard.server.queue.util.TbRuleEngineComponent ;
@ -107,13 +112,14 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
private final TbRuleEngineDeviceRpcService tbDeviceRpcService ;
private final TbServiceInfoProvider serviceInfoProvider ;
private final QueueService queueService ;
// private final TenantId tenantId;
private final TbQueueProducerProvider producerProvider ;
private final TbQueueAdmin queueAdmin ;
private final ConcurrentMap < QueueKey , TbQueueConsumer < TbProtoQueueMsg < ToRuleEngineMsg > > > consumers = new ConcurrentHashMap < > ( ) ;
private final ConcurrentMap < QueueKey , Queue > consumerConfigurations = new ConcurrentHashMap < > ( ) ;
private final ConcurrentMap < QueueKey , TbRuleEngineConsumerStats > consumerStats = new ConcurrentHashMap < > ( ) ;
private final ConcurrentMap < QueueKey , TbTopicWithConsumerPerPartition > topicsConsumerPerPartition = new ConcurrentHashMap < > ( ) ;
final ExecutorService submitExecutor = Executors . newSingleThreadExecutor ( ThingsBoardThreadFactory . forName ( "tb-rule-engine-consumer-submit" ) ) ;
final ScheduledExecutorService repartitionExecutor = Executors . newScheduledThreadPool ( 1 , ThingsBoardThreadFactory . forName ( "tb-rule-engine-consumer-repartition" ) ) ;
final ScheduledExecutorService repartitionExecutor = Executors . newScheduledThreadPool ( 2 , ThingsBoardThreadFactory . forName ( "tb-rule-engine-consumer-repartition" ) ) ;
public DefaultTbRuleEngineConsumerService ( TbRuleEngineProcessingStrategyFactory processingStrategyFactory ,
TbRuleEngineSubmitStrategyFactory submitStrategyFactory ,
@ -128,7 +134,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
TbTenantProfileCache tenantProfileCache ,
TbApiUsageStateService apiUsageStateService ,
PartitionService partitionService , ApplicationEventPublisher eventPublisher ,
TbServiceInfoProvider serviceInfoProvider , QueueService queueService ) {
TbServiceInfoProvider serviceInfoProvider , QueueService queueService ,
TbQueueProducerProvider producerProvider , TbQueueAdmin queueAdmin ) {
super ( actorContext , encodingService , tenantProfileCache , deviceProfileCache , assetProfileCache , apiUsageStateService , partitionService , eventPublisher , tbRuleEngineQueueFactory . createToRuleEngineNotificationsMsgConsumer ( ) , Optional . empty ( ) ) ;
this . statisticsService = statisticsService ;
this . tbRuleEngineQueueFactory = tbRuleEngineQueueFactory ;
@ -138,6 +145,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
this . statsFactory = statsFactory ;
this . serviceInfoProvider = serviceInfoProvider ;
this . queueService = queueService ;
this . producerProvider = producerProvider ;
this . queueAdmin = queueAdmin ;
}
@PostConstruct
@ -230,7 +239,6 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
launchConsumer ( consumer , consumerConfigurations . get ( queueKey ) , consumerStats . get ( queueKey ) , "" + queueKey + "-" + tpi . getPartition ( ) . orElse ( - 999999 ) ) ;
consumer . subscribe ( Collections . singleton ( tpi ) ) ;
} ) ;
} finally {
tbTopicWithConsumerPerPartition . getLock ( ) . unlock ( ) ;
}
@ -264,7 +272,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
void consumerLoop ( TbQueueConsumer < TbProtoQueueMsg < ToRuleEngineMsg > > consumer , org . thingsboard . server . common . data . queue . Queue configuration , TbRuleEngineConsumerStats stats , String threadSuffix ) {
updateCurrentThreadName ( threadSuffix ) ;
while ( ! stopped & & ! consumer . isStopped ( ) ) {
while ( ! stopped & & ! consumer . isStopped ( ) & & ! consumer . isDeleted ( ) ) {
try {
List < TbProtoQueueMsg < ToRuleEngineMsg > > msgs = consumer . poll ( configuration . getPollInterval ( ) ) ;
if ( msgs . isEmpty ( ) ) {
@ -314,6 +322,10 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
}
}
}
if ( consumer . isDeleted ( ) ) {
processQueueDeletion ( configuration , consumer ) ;
}
log . info ( "TB Rule Engine Consumer stopped." ) ;
}
@ -448,22 +460,22 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
TenantId tenantId = new TenantId ( new UUID ( queueDeleteMsg . getTenantIdMSB ( ) , queueDeleteMsg . getTenantIdLSB ( ) ) ) ;
QueueKey queueKey = new QueueKey ( ServiceType . TB_RULE_ENGINE , queueDeleteMsg . getQueueName ( ) , tenantId ) ;
partitionService . removeQueue ( queueDeleteMsg ) ;
Queue queue = consumerConfigurations . remove ( queueKey ) ;
if ( queue ! = null ) {
if ( queue . isConsumerPerPartition ( ) ) {
TbTopicWithConsumerPerPartition tbTopicWithConsumerPerPartition = topicsConsumerPerPartition . remove ( queueKey ) ;
if ( tbTopicWithConsumerPerPartition ! = null ) {
tbTopicWithConsumerPerPartition . getConsumers ( ) . values ( ) . forEach ( TbQueueConsumer : : unsubscrib e) ;
tbTopicWithConsumerPerPartition . getConsumers ( ) . values ( ) . forEach ( TbQueueConsumer : : onQueueDelet e) ;
tbTopicWithConsumerPerPartition . getConsumers ( ) . clear ( ) ;
}
} else {
TbQueueConsumer < TbProtoQueueMsg < ToRuleEngineMsg > > consumer = consumers . remove ( queueKey ) ;
if ( consumer ! = null ) {
consumer . unsubscrib e( ) ;
consumer . onQueueDelet e( ) ;
}
}
}
partitionService . removeQueue ( queueDeleteMsg ) ;
}
private void forwardToRuleEngineActor ( String queueName , TenantId tenantId , ToRuleEngineMsg toRuleEngineMsg , TbMsgCallback callback ) {
@ -482,6 +494,49 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
actorContext . tell ( msg ) ;
}
private void processQueueDeletion ( Queue queue , TbQueueConsumer < TbProtoQueueMsg < ToRuleEngineMsg > > consumer ) {
long startTs = System . currentTimeMillis ( ) ;
long timeout = TimeUnit . SECONDS . toMillis ( 30 ) ;
try {
int n = 0 ;
while ( ( System . currentTimeMillis ( ) - startTs < = timeout ) ) {
List < TbProtoQueueMsg < ToRuleEngineMsg > > msgs = consumer . poll ( queue . getPollInterval ( ) ) ;
if ( ! msgs . isEmpty ( ) ) {
for ( TbProtoQueueMsg < ToRuleEngineMsg > msg : msgs ) {
try {
MsgProtos . TbMsgProto tbMsgProto = MsgProtos . TbMsgProto . parseFrom ( msg . getValue ( ) . getTbMsg ( ) . toByteArray ( ) ) ;
EntityId originator = EntityIdFactory . getByTypeAndUuid ( tbMsgProto . getEntityType ( ) , new UUID ( tbMsgProto . getEntityIdMSB ( ) , tbMsgProto . getEntityIdLSB ( ) ) ) ;
TopicPartitionInfo tpi = partitionService . resolve ( ServiceType . TB_RULE_ENGINE , queue . getName ( ) , TenantId . SYS_TENANT_ID , originator ) ;
producerProvider . getRuleEngineMsgProducer ( ) . send ( tpi , msg , null ) ;
n + + ;
} catch ( Throwable e ) {
log . debug ( "Failed to move message to system {}: {}" , consumer . getTopic ( ) , msg , e ) ;
}
}
consumer . commit ( ) ;
} else {
break ;
}
}
if ( n > 0 ) {
log . info ( "Moved {} messages from {} to system {}" , n , consumer . getFullTopicNames ( ) , consumer . getTopic ( ) ) ;
}
consumer . unsubscribe ( ) ;
for ( String topic : consumer . getFullTopicNames ( ) ) {
try {
queueAdmin . deleteTopic ( topic ) ;
log . info ( "Deleted topic {}" , topic ) ;
} catch ( Exception e ) {
log . error ( "Failed to delete topic {} after unsubscribing" , topic , e ) ;
}
}
} catch ( Exception e ) {
log . error ( "Failed to process deletion of {} ({})" , consumer . getTopic ( ) , queue . getTenantId ( ) , e ) ;
}
}
@Scheduled ( fixedDelayString = "${queue.rule-engine.stats.print-interval-ms}" )
public void printStats ( ) {
if ( statsEnabled ) {