|
|
|
@ -84,17 +84,17 @@ public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> { |
|
|
|
} |
|
|
|
} |
|
|
|
} catch (Throwable t) { |
|
|
|
try { |
|
|
|
stats.incrementFailed(entities.size()); |
|
|
|
entities.forEach(entityFutureWrapper -> entityFutureWrapper.getFuture().setException(t)); |
|
|
|
} catch (Throwable th) { |
|
|
|
log.error("[{}] Failed to set future exception", logName, th); |
|
|
|
} |
|
|
|
if (t instanceof InterruptedException) { |
|
|
|
log.info("[{}] Queue polling was interrupted", logName); |
|
|
|
break; |
|
|
|
} else { |
|
|
|
log.error("[{}] Failed to save {} entities", logName, entities.size(), t); |
|
|
|
try { |
|
|
|
stats.incrementFailed(entities.size()); |
|
|
|
entities.forEach(entityFutureWrapper -> entityFutureWrapper.getFuture().setException(t)); |
|
|
|
} catch (Throwable th) { |
|
|
|
log.error("[{}] Failed to set future exception", logName, th); |
|
|
|
} |
|
|
|
} |
|
|
|
} finally { |
|
|
|
entities.clear(); |
|
|
|
|