@ -21,22 +21,28 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueConsumer ;
import org.thingsboard.server.queue.TbQueueMsg ;
import javax.annotation.Nonnull ;
import java.io.IOException ;
import java.util.ArrayList ;
import java.util.Collections ;
import java.util.List ;
import java.util.Queue ;
import java.util.Set ;
import java.util.concurrent.locks.Lock ;
import java.util.concurrent.ConcurrentLinkedQueue ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.locks.ReentrantLock ;
import java.util.stream.Collectors ;
import static java.util.Collections.emptyList ;
@Slf4j
public abstract class AbstractTbQueueConsumerTemplate < R , T extends TbQueueMsg > implements TbQueueConsumer < T > {
private volatile boolean subscribed ;
protected volatile boolean stopped = false ;
protected volatile Set < TopicPartitionInfo > partitions ;
protected final Lock consumerLock = new ReentrantLock ( ) ;
protected final ReentrantLock consumerLock = new ReentrantLock ( ) ; //NonfairSync
final Queue < Set < TopicPartitionInfo > > subscribeQueue = new ConcurrentLinkedQueue < > ( ) ;
@Getter
private final String topic ;
@ -47,84 +53,101 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
@Override
public void subscribe ( ) {
consumerLock . lock ( ) ;
try {
partitions = Collections . singleton ( new TopicPartitionInfo ( topic , null , null , true ) ) ;
subscribed = false ;
} finally {
consumerLock . unlock ( ) ;
log . info ( "enqueue topic subscribe {} " , topic ) ;
if ( stopped ) {
log . error ( "trying subscribe, but consumer stopped for topic {}" , topic ) ;
return ;
}
subscribeQueue . add ( Collections . singleton ( new TopicPartitionInfo ( topic , null , null , true ) ) ) ;
}
@Override
public void subscribe ( Set < TopicPartitionInfo > partitions ) {
consumerLock . lock ( ) ;
try {
this . partitions = partitions ;
subscribed = false ;
} finally {
consumerLock . unlock ( ) ;
log . info ( "enqueue topics subscribe {} " , partitions ) ;
if ( stopped ) {
log . error ( "trying subscribe, but consumer stopped for topic {}" , topic ) ;
return ;
}
subscribeQueue . add ( partitions ) ;
}
@Override
public List < T > poll ( long durationInMillis ) {
List < R > records ;
long startNanos = System . nanoTime ( ) ;
if ( stopped ) {
return errorAndReturnEmpty ( ) ;
}
if ( ! subscribed & & partitions = = null ) {
try {
Thread . sleep ( durationInMillis ) ;
} catch ( InterruptedException e ) {
log . debug ( "Failed to await subscription" , e ) ;
return sleepAndReturnEmpty ( startNanos , durationInMillis ) ;
}
if ( consumerLock . isLocked ( ) ) {
log . error ( "poll. consumerLock is locked. will wait with no timeout. it looks like a race conditions or deadlock" , new RuntimeException ( "stacktrace" ) ) ;
}
consumerLock . lock ( ) ;
try {
while ( ! subscribeQueue . isEmpty ( ) ) {
subscribed = false ;
partitions = subscribeQueue . poll ( ) ;
}
if ( ! subscribed ) {
List < String > topicNames = partitions . stream ( ) . map ( TopicPartitionInfo : : getFullTopicName ) . collect ( Collectors . toList ( ) ) ;
doSubscribe ( topicNames ) ;
subscribed = true ;
}
} else {
long pollStartTs = System . currentTimeMillis ( ) ;
consumerLock . lock ( ) ;
records = partitions . isEmpty ( ) ? emptyList ( ) : doPoll ( durationInMillis ) ;
} finally {
consumerLock . unlock ( ) ;
}
if ( records . isEmpty ( ) ) { return sleepAndReturnEmpty ( startNanos , durationInMillis ) ; }
return decodeRecords ( records ) ;
}
@Nonnull
List < T > decodeRecords ( @Nonnull List < R > records ) {
List < T > result = new ArrayList < > ( records . size ( ) ) ;
records . forEach ( record - > {
try {
if ( ! subscribed ) {
List < String > topicNames = partitions . stream ( ) . map ( TopicPartitionInfo : : getFullTopicName ) . collect ( Collectors . toList ( ) ) ;
doSubscribe ( topicNames ) ;
subscribed = true ;
if ( record ! = null ) {
result . add ( decode ( record ) ) ;
}
} catch ( IOException e ) {
log . error ( "Failed decode record: [{}]" , record ) ;
throw new RuntimeException ( "Failed to decode record: " , e ) ;
}
} ) ;
return result ;
}
List < R > records ;
if ( partitions . isEmpty ( ) ) {
records = Collections . emptyList ( ) ;
} else {
records = doPoll ( durationInMillis ) ;
}
if ( ! records . isEmpty ( ) ) {
List < T > result = new ArrayList < > ( records . size ( ) ) ;
records . forEach ( record - > {
try {
if ( record ! = null ) {
result . add ( decode ( record ) ) ;
}
} catch ( IOException e ) {
log . error ( "Failed decode record: [{}]" , record ) ;
throw new RuntimeException ( "Failed to decode record: " , e ) ;
}
} ) ;
return result ;
} else {
long pollDuration = System . currentTimeMillis ( ) - pollStartTs ;
if ( pollDuration < durationInMillis ) {
try {
Thread . sleep ( durationInMillis - pollDuration ) ;
} catch ( InterruptedException e ) {
if ( ! stopped ) {
log . error ( "Failed to wait." , e ) ;
}
}
}
List < T > errorAndReturnEmpty ( ) {
log . error ( "poll invoked but consumer stopped for topic" + topic , new RuntimeException ( "stacktrace" ) ) ;
return emptyList ( ) ;
}
List < T > sleepAndReturnEmpty ( final long startNanos , final long durationInMillis ) {
long durationNanos = TimeUnit . MILLISECONDS . toNanos ( durationInMillis ) ;
long spentNanos = System . nanoTime ( ) - startNanos ;
if ( spentNanos < durationNanos ) {
try {
Thread . sleep ( Math . max ( TimeUnit . NANOSECONDS . toMillis ( durationNanos - spentNanos ) , 1 ) ) ;
} catch ( InterruptedException e ) {
if ( ! stopped ) {
log . error ( "Failed to wait" , e ) ;
}
} finally {
consumerLock . unlock ( ) ;
}
}
return Collections . emptyList ( ) ;
return emptyList ( ) ;
}
@Override
public void commit ( ) {
if ( consumerLock . isLocked ( ) ) {
log . error ( "commit. consumerLock is locked. will wait with no timeout. it looks like a race conditions or deadlock" , new RuntimeException ( "stacktrace" ) ) ;
}
consumerLock . lock ( ) ;
try {
doCommit ( ) ;
@ -135,6 +158,7 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
@Override
public void unsubscribe ( ) {
log . info ( "unsubscribe topic and stop consumer {}" , getTopic ( ) ) ;
stopped = true ;
consumerLock . lock ( ) ;
try {