Browse Source

sleep(poolInterval) replaced with parkNanos(1) for DefaultTbQueueRequestTemplate

pull/4753/head
Sergey Matvienko 5 years ago
parent
commit
eee2d83ed5
  1. 10
      application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
  2. 10
      common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java

10
application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java

@ -102,15 +102,7 @@ public class DefaultActorService extends TbApplicationEventListener<PartitionCha
} }
private ExecutorService initDispatcherExecutor(String dispatcherName, int poolSize) { private ExecutorService initDispatcherExecutor(String dispatcherName, int poolSize) {
if (poolSize == 0) { return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1, ThingsBoardThreadFactory.forName(dispatcherName));
int cores = Runtime.getRuntime().availableProcessors();
poolSize = Math.max(1, cores / 2);
}
if (poolSize == 1) {
return Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName(dispatcherName));
} else {
return ThingsBoardExecutors.newWorkStealingPool(poolSize, dispatcherName);
}
} }
@EventListener(ApplicationReadyEvent.class) @EventListener(ApplicationReadyEvent.class)

10
common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java

@ -41,6 +41,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@Slf4j @Slf4j
@ -97,7 +98,7 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
try { try {
fetchAndProcessResponses(); fetchAndProcessResponses();
} catch (Throwable e) { } catch (Throwable e) {
log.warn("Failed to obtain responses from queue. Going to sleep " + pollInterval + "ms", e); log.warn("Failed to obtain and process responses from queue. Going to sleep " + pollInterval + "ms", e);
sleep(); sleep();
} }
} }
@ -149,11 +150,8 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
} }
void sleep() { void sleep() {
try { Thread.yield();
Thread.sleep(pollInterval); LockSupport.parkNanos(1);
} catch (InterruptedException e2) {
log.trace("Failed to wait until the server has capacity to handle new responses", e2);
}
} }
void setTimeoutException(UUID key, ResponseMetaData<Response> staleRequest, long currentNs) { void setTimeoutException(UUID key, ResponseMetaData<Response> staleRequest, long currentNs) {

Loading…
Cancel
Save