@ -42,10 +42,10 @@ import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.edqs.EdqsQueue ;
import org.thingsboard.server.queue.edqs.EdqsQueueFactory ;
import org.thingsboard.server.queue.edqs.KafkaEdqsComponent ;
import org.thingsboard.server.queue.util.AfterStartUp ;
import java.util.Set ;
import java.util.UUID ;
import java.util.concurrent.CountDownLatch ;
import java.util.concurrent.ExecutorService ;
import java.util.concurrent.Executors ;
import java.util.concurrent.ScheduledExecutorService ;
@ -66,6 +66,8 @@ public class KafkaEdqsStateService implements EdqsStateService {
private QueueConsumerManager < TbProtoQueueMsg < ToEdqsMsg > > eventsConsumer ;
private EdqsProducer stateProducer ;
private boolean initialRestoreDone ;
private ExecutorService consumersExecutor ;
private ExecutorService mgmtExecutor ;
private ScheduledExecutorService scheduler ;
@ -76,7 +78,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
@PostConstruct
private void init ( ) {
consumersExecutor = Executors . newCachedThreadPool ( ThingsBoardThreadFactory . forName ( "edqs-backup- consumer" ) ) ;
consumersExecutor = Executors . newCachedThreadPool ( ThingsBoardThreadFactory . forName ( "edqs-consumer" ) ) ;
mgmtExecutor = ThingsBoardExecutors . newWorkStealingPool ( 4 , "edqs-backup-consumer-mgmt" ) ;
scheduler = ThingsBoardExecutors . newSingleThreadScheduledExecutor ( "edqs-backup-scheduler" ) ;
@ -92,8 +94,8 @@ public class KafkaEdqsStateService implements EdqsStateService {
if ( stateReadCount . incrementAndGet ( ) % 100000 = = 0 ) {
log . info ( "[state] Processed {} msgs" , stateReadCount . get ( ) ) ;
}
} catch ( Throwable t ) {
log . error ( "Failed to process message: {}" , queueMsg , t ) ;
} catch ( Exception e ) {
log . error ( "Failed to process message: {}" , queueMsg , e ) ; // TODO: do something about the error - e.g. reprocess
}
}
consumer . commit ( ) ;
@ -102,39 +104,48 @@ public class KafkaEdqsStateService implements EdqsStateService {
. consumerExecutor ( consumersExecutor )
. taskExecutor ( mgmtExecutor )
. scheduler ( scheduler )
. uncaughtErrorHandler ( edqsProcessor . getErrorHandler ( ) )
. build ( ) ;
eventsConsumer = QueueConsumerManager . < TbProtoQueueMsg < ToEdqsMsg > > builder ( ) // FIXME Slavik writes to the state while we read it, slows down the start
ExecutorService backupExecutor = ThingsBoardExecutors . newLimitedTasksExecutor ( 12 , 1000 , "events-to-backup-executor" ) ;
eventsConsumer = QueueConsumerManager . < TbProtoQueueMsg < ToEdqsMsg > > builder ( ) // FIXME Slavik writes to the state while we read it, slows down the start. maybe start backup consumer after restore is finished
. name ( "edqs-events-to-backup-consumer" )
. pollInterval ( config . getPollInterval ( ) )
. msgPackProcessor ( ( msgs , consumer ) - > {
CountDownLatch resultLatch = new CountDownLatch ( msgs . size ( ) ) ;
for ( TbProtoQueueMsg < ToEdqsMsg > queueMsg : msgs ) {
try {
ToEdqsMsg msg = queueMsg . getValue ( ) ;
log . trace ( "Processing message: {}" , msg ) ;
if ( msg . hasEventMsg ( ) ) {
EdqsEventMsg eventMsg = msg . getEventMsg ( ) ;
String key = eventMsg . getKey ( ) ;
if ( eventsReadCount . incrementAndGet ( ) % 100000 = = 0 ) {
log . info ( "[events-to-backup] Processed {} msgs" , eventsReadCount . get ( ) ) ;
}
if ( eventMsg . hasVersion ( ) ) {
if ( ! versionsStore . isNew ( key , eventMsg . getVersion ( ) ) ) {
return ;
backupExecutor . submit ( ( ) - > {
try {
ToEdqsMsg msg = queueMsg . getValue ( ) ;
log . trace ( "Processing message: {}" , msg ) ;
if ( msg . hasEventMsg ( ) ) {
EdqsEventMsg eventMsg = msg . getEventMsg ( ) ;
String key = eventMsg . getKey ( ) ;
int count = eventsReadCount . incrementAndGet ( ) ;
if ( count % 100000 = = 0 ) {
log . info ( "[events-to-backup] Processed {} msgs" , count ) ;
}
if ( eventMsg . hasVersion ( ) ) {
if ( ! versionsStore . isNew ( key , eventMsg . getVersion ( ) ) ) {
return ;
}
}
}
TenantId tenantId = getTenantId ( msg ) ;
ObjectType objectType = ObjectType . valueOf ( eventMsg . getObjectType ( ) ) ;
EdqsEventType eventType = EdqsEventType . valueOf ( eventMsg . getEventType ( ) ) ;
log . debug ( "[{}] Saving to backup [{}] [{}] [{}]" , tenantId , objectType , eventType , key ) ;
stateProducer . send ( tenantId , objectType , key , msg ) ;
TenantId tenantId = getTenantId ( msg ) ;
ObjectType objectType = ObjectType . valueOf ( eventMsg . getObjectType ( ) ) ;
EdqsEventType eventType = EdqsEventType . valueOf ( eventMsg . getEventType ( ) ) ;
log . debug ( "[{}] Saving to backup [{}] [{}] [{}]" , tenantId , objectType , eventType , key ) ;
stateProducer . send ( tenantId , objectType , key , msg ) ;
}
} catch ( Throwable t ) {
log . error ( "Failed to process message: {}" , queueMsg , t ) ;
} finally {
resultLatch . countDown ( ) ;
}
} catch ( Throwable t ) {
log . error ( "Failed to process message: {}" , queueMsg , t ) ;
}
} ) ;
}
resultLatch . await ( ) ;
consumer . commit ( ) ;
} )
. consumerCreator ( ( ) - > queueFactory . createEdqsMsgConsumer ( EdqsQueue . EVENTS , "events-to-backup-consumer-group" ) ) // shared by all instances consumer group
@ -149,22 +160,21 @@ public class KafkaEdqsStateService implements EdqsStateService {
. build ( ) ;
}
@AfterStartUp ( order = AfterStartUp . REGULAR_SERVICE )
public void afterStartUp ( ) {
eventsConsumer . subscribe ( ) ;
eventsConsumer . launch ( ) ;
}
@Override
public void restore ( Set < TopicPartitionInfo > partitions ) {
stateReadCount . set ( 0 ) ; //TODO Slavik: do not support remote mode in monolith setup
long startTs = System . currentTimeMillis ( ) ;
log . info ( "Restore started for partitions {}" , partitions . stream ( ) . map ( tpi - > tpi . getPartition ( ) . orElse ( - 1 ) ) . sorted ( ) . toList ( ) ) ;
stateConsumer . doUpdate ( partitions ) ; // calling blocking doUpdate instead of update
stateConsumer . awaitStop ( 0 ) ; // consumers should stop on their own because EdqsQueue.STATE.stopWhenRead is true, we just need to wait
log . info ( "Restore finished in {} ms. Processed {} msgs" , ( System . currentTimeMillis ( ) - startTs ) , stateReadCount . get ( ) ) ;
if ( ! initialRestoreDone ) {
initialRestoreDone = true ;
eventsConsumer . subscribe ( ) ;
eventsConsumer . launch ( ) ;
}
}
@Override
@ -172,6 +182,11 @@ public class KafkaEdqsStateService implements EdqsStateService {
// do nothing here, backup is done by events consumer
}
@Override
public boolean isReady ( ) {
return initialRestoreDone ;
}
private TenantId getTenantId ( ToEdqsMsg edqsMsg ) {
return TenantId . fromUUID ( new UUID ( edqsMsg . getTenantIdMSB ( ) , edqsMsg . getTenantIdLSB ( ) ) ) ;
}