@ -25,7 +25,6 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.ObjectType ;
import org.thingsboard.server.common.data.edqs.EdqsEventType ;
import org.thingsboard.server.common.data.id.TenantId ;
import org.thingsboard.server.common.data.queue.QueueConfig ;
import org.thingsboard.server.common.msg.queue.ServiceType ;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo ;
import org.thingsboard.server.edqs.processor.EdqsProcessor ;
@ -34,8 +33,9 @@ import org.thingsboard.server.edqs.util.VersionsStore;
import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg ;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg ;
import org.thingsboard.server.queue.common.TbProtoQueueMsg ;
import org.thingsboard.server.queue.common.consumer.Main QueueConsumerManager ;
import org.thingsboard.server.queue.common.consumer.Partitioned QueueConsumerManager ;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager ;
import org.thingsboard.server.queue.common.consumer.QueueStateService ;
import org.thingsboard.server.queue.discovery.QueueKey ;
import org.thingsboard.server.queue.edqs.EdqsConfig ;
import org.thingsboard.server.queue.edqs.EdqsQueue ;
@ -44,7 +44,6 @@ import org.thingsboard.server.queue.edqs.KafkaEdqsComponent;
import java.util.Set ;
import java.util.UUID ;
import java.util.concurrent.CountDownLatch ;
import java.util.concurrent.ExecutorService ;
import java.util.concurrent.Executors ;
import java.util.concurrent.ScheduledExecutorService ;
@ -54,19 +53,17 @@ import java.util.concurrent.atomic.AtomicInteger;
@RequiredArgsConstructor
@KafkaEdqsComponent
@Slf4j
public class KafkaEdqsStateService implements EdqsStateService {
public class KafkaEdqsStateService extends QueueStateService < TbProtoQueueMsg < ToEdqsMsg > , TbProtoQueueMsg < ToEdqsMsg > > implements EdqsStateService {
private final EdqsConfig config ;
private final EdqsPartitionService partitionService ;
private final EdqsQueueFactory queueFactory ;
private final EdqsProcessor edqsProcessor ;
private Main QueueConsumerManager< TbProtoQueueMsg < ToEdqsMsg > , QueueConfig > stateConsumer ;
private QueueConsumerManager < TbProtoQueueMsg < ToEdqsMsg > > eventsConsumer ;
private Partitioned QueueConsumerManager< TbProtoQueueMsg < ToEdqsMsg > > stateConsumer ;
private QueueConsumerManager < TbProtoQueueMsg < ToEdqsMsg > > eventsToBackup Consumer ;
private EdqsProducer stateProducer ;
private boolean initialRestoreDone ;
private ExecutorService consumersExecutor ;
private ExecutorService mgmtExecutor ;
private ScheduledExecutorService scheduler ;
@ -81,11 +78,14 @@ public class KafkaEdqsStateService implements EdqsStateService {
mgmtExecutor = ThingsBoardExecutors . newWorkStealingPool ( 4 , "edqs-backup-consumer-mgmt" ) ;
scheduler = ThingsBoardExecutors . newSingleThreadScheduledExecutor ( "edqs-backup-scheduler" ) ;
stateConsumer = Main QueueConsumerManager. < TbProtoQueueMsg < ToEdqsMsg > , QueueConfig > builder ( ) // FIXME Slavik: if topic is empty
stateConsumer = Partitioned QueueConsumerManager. < TbProtoQueueMsg < ToEdqsMsg > > create ( ) // FIXME Slavik: if topic is empty
. queueKey ( new QueueKey ( ServiceType . EDQS , EdqsQueue . STATE . getTopic ( ) ) )
. config ( QueueConfig . of ( true , config . getPollInterval ( ) ) )
. pollInterval ( config . getPollInterval ( ) )
. msgPackProcessor ( ( msgs , consumer , config ) - > {
for ( TbProtoQueueMsg < ToEdqsMsg > queueMsg : msgs ) {
if ( consumer . isStopped ( ) ) {
return ;
}
try {
ToEdqsMsg msg = queueMsg . getValue ( ) ;
log . trace ( "Processing message: {}" , msg ) ;
@ -94,7 +94,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
log . info ( "[state] Processed {} msgs" , stateReadCount . get ( ) ) ;
}
} catch ( Exception e ) {
log . error ( "Failed to process message: {}" , queueMsg , e ) ; // TODO: do something about the error - e.g. reprocess
log . error ( "Failed to process message: {}" , queueMsg , e ) ;
}
}
consumer . commit ( ) ;
@ -105,46 +105,43 @@ public class KafkaEdqsStateService implements EdqsStateService {
. scheduler ( scheduler )
. uncaughtErrorHandler ( edqsProcessor . getErrorHandler ( ) )
. build ( ) ;
super . init ( stateConsumer , edqsProcessor . getEventsConsumer ( ) ) ;
ExecutorService backupExecutor = ThingsBoardExecutors . newLimitedTasksExecutor ( 12 , 1000 , "events-to-backup-executor" ) ;
eventsConsumer = QueueConsumerManager . < TbProtoQueueMsg < ToEdqsMsg > > builder ( ) // FIXME Slavik writes to the state while we read it, slows down the start. maybe start backup consumer after restore is finished
eventsToBackupConsumer = QueueConsumerManager . < TbProtoQueueMsg < ToEdqsMsg > > builder ( )
. name ( "edqs-events-to-backup-consumer" )
. pollInterval ( config . getPollInterval ( ) )
. msgPackProcessor ( ( msgs , consumer ) - > {
CountDownLatch resultLatch = new CountDownLatch ( msgs . size ( ) ) ;
for ( TbProtoQueueMsg < ToEdqsMsg > queueMsg : msgs ) {
backupExecutor . submit ( ( ) - > {
try {
ToEdqsMsg msg = queueMsg . getValue ( ) ;
log . trace ( "Processing message: {}" , msg ) ;
if ( msg . hasEventMsg ( ) ) {
EdqsEventMsg eventMsg = msg . getEventMsg ( ) ;
String key = eventMsg . getKey ( ) ;
int count = eventsReadCount . incrementAndGet ( ) ;
if ( count % 100000 = = 0 ) {
log . info ( "[events-to-backup] Processed {} msgs" , count ) ;
}
if ( eventMsg . hasVersion ( ) ) {
if ( ! versionsStore . isNew ( key , eventMsg . getVersion ( ) ) ) {
return ;
}
}
if ( consumer . isStopped ( ) ) {
return ;
}
try {
ToEdqsMsg msg = queueMsg . getValue ( ) ;
log . trace ( "Processing message: {}" , msg ) ;
TenantId tenantId = getTenantId ( msg ) ;
ObjectType objectType = ObjectType . valueOf ( eventMsg . getObjectType ( ) ) ;
EdqsEventType eventType = EdqsEventType . valueOf ( eventMsg . getEventType ( ) ) ;
log . debug ( "[{}] Saving to backup [{}] [{}] [{}]" , tenantId , objectType , eventType , key ) ;
stateProducer . send ( tenantId , objectType , key , msg ) ;
if ( msg . hasEventMsg ( ) ) {
EdqsEventMsg eventMsg = msg . getEventMsg ( ) ;
String key = eventMsg . getKey ( ) ;
int count = eventsReadCount . incrementAndGet ( ) ;
if ( count % 100000 = = 0 ) {
log . info ( "[events-to-backup] Processed {} msgs" , count ) ;
}
if ( eventMsg . hasVersion ( ) ) {
if ( ! versionsStore . isNew ( key , eventMsg . getVersion ( ) ) ) {
continue ;
}
}
} catch ( Throwable t ) {
log . error ( "Failed to process message: {}" , queueMsg , t ) ;
} finally {
resultLatch . countDown ( ) ;
TenantId tenantId = getTenantId ( msg ) ;
ObjectType objectType = ObjectType . valueOf ( eventMsg . getObjectType ( ) ) ;
EdqsEventType eventType = EdqsEventType . valueOf ( eventMsg . getEventType ( ) ) ;
log . debug ( "[{}] Saving to backup [{}] [{}] [{}]" , tenantId , objectType , eventType , key ) ;
stateProducer . send ( tenantId , objectType , key , msg ) ;
}
} ) ;
} catch ( Throwable t ) {
log . error ( "Failed to process message: {}" , queueMsg , t ) ;
}
}
resultLatch . await ( ) ;
consumer . commit ( ) ;
} )
. consumerCreator ( ( ) - > queueFactory . createEdqsMsgConsumer ( EdqsQueue . EVENTS , "events-to-backup-consumer-group" ) ) // shared by all instances consumer group
@ -160,20 +157,12 @@ public class KafkaEdqsStateService implements EdqsStateService {
}
@Override
public void restore ( Set < TopicPartitionInfo > partitions ) {
stateReadCount . set ( 0 ) ; //TODO Slavik: do not support remote mode in monolith setup
long startTs = System . currentTimeMillis ( ) ;
log . info ( "Restore started for partitions {}" , partitions . stream ( ) . map ( tpi - > tpi . getPartition ( ) . orElse ( - 1 ) ) . sorted ( ) . toList ( ) ) ;
stateConsumer . doUpdate ( partitions ) ; // calling blocking doUpdate instead of update
stateConsumer . awaitStop ( 0 ) ; // consumers should stop on their own because EdqsQueue.STATE.stopWhenRead is true, we just need to wait
log . info ( "Restore finished in {} ms. Processed {} msgs" , ( System . currentTimeMillis ( ) - startTs ) , stateReadCount . get ( ) ) ;
if ( ! initialRestoreDone ) {
initialRestoreDone = true ;
eventsConsumer . subscribe ( ) ;
eventsConsumer . launch ( ) ;
public void process ( Set < TopicPartitionInfo > partitions ) {
if ( getPartitions ( ) = = null ) {
eventsToBackupConsumer . subscribe ( ) ;
eventsToBackupConsumer . launch ( ) ;
}
super . update ( partitions ) ;
}
@Override
@ -194,7 +183,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
private void preDestroy ( ) {
stateConsumer . stop ( ) ;
stateConsumer . awaitStop ( ) ;
eventsConsumer . stop ( ) ;
eventsToBackup Consumer . stop ( ) ;
stateProducer . stop ( ) ;
consumersExecutor . shutdownNow ( ) ;