Browse Source

TbSqlBlockingQueue failover improvement

pull/9796/head
Sergey Matvienko 3 years ago
parent
commit
4c8cf69632
  1. 18
      dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java

18
dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java

@ -54,7 +54,7 @@ public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
String logName = params.getLogName();
int batchSize = params.getBatchSize();
long maxDelay = params.getMaxDelay();
List<TbSqlQueueElement<E>> entities = new ArrayList<>(batchSize);
final List<TbSqlQueueElement<E>> entities = new ArrayList<>(batchSize);
while (!Thread.interrupted()) {
try {
long currentTs = System.currentTimeMillis();
@ -83,19 +83,23 @@ public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
Thread.sleep(remainingDelay);
}
}
} catch (Exception e) {
stats.incrementFailed(entities.size());
entities.forEach(entityFutureWrapper -> entityFutureWrapper.getFuture().setException(e));
if (e instanceof InterruptedException) {
} catch (Throwable t) {
log.error("[{}] Failed to save {} entities", logName, entities.size(), t);
try {
stats.incrementFailed(entities.size());
entities.forEach(entityFutureWrapper -> entityFutureWrapper.getFuture().setException(t));
} catch (Throwable th) {
log.error("[{}] Failed to set future exception", logName, th);
}
if (t instanceof InterruptedException) {
log.info("[{}] Queue polling was interrupted", logName);
break;
} else {
log.error("[{}] Failed to save {} entities", logName, entities.size(), e);
}
} finally {
entities.clear();
}
}
log.info("[{}] Queue polling completed", logName);
});
logExecutor.scheduleAtFixedRate(() -> {

Loading…
Cancel
Save