From 0723988061a74ddf06db72db7903bfaecfa51ab4 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Fri, 26 Oct 2018 18:46:27 +0300 Subject: [PATCH] Fixed Shutdown behaviour --- .../server/service/rpc/DefaultDeviceRpcService.java | 6 +++--- .../server/dao/util/AbstractBufferedRateExecutor.java | 9 +++------ 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java index 64f5213d65..7aa2505d4b 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java @@ -90,7 +90,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService { @Override public void processRestAPIRpcRequestToRuleEngine(ToDeviceRpcRequest request, Consumer responseConsumer) { - log.trace("[{}] Processing local rpc call to rule engine [{}]", request.getTenantId(), request.getDeviceId()); + log.trace("[{}][{}] Processing local rpc call to rule engine [{}]", request.getTenantId(), request.getId(), request.getDeviceId()); UUID requestId = request.getId(); localToRuleEngineRpcRequests.put(requestId, responseConsumer); sendRpcRequestToRuleEngine(request); @@ -110,7 +110,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService { @Override public void processRpcRequestToDevice(ToDeviceRpcRequest request, Consumer responseConsumer) { - log.trace("[{}] Processing local rpc call to device [{}]", request.getTenantId(), request.getDeviceId()); + log.trace("[{}][{}] Processing local rpc call to device [{}]", request.getTenantId(), request.getId(), request.getDeviceId()); UUID requestId = request.getId(); localToDeviceRpcRequests.put(requestId, responseConsumer); sendRpcRequestToDevice(request); @@ -119,7 +119,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService { @Override public void processRpcResponseFromDevice(FromDeviceRpcResponse response) { - log.trace("[{}] response to request: [{}]", this.hashCode(), response.getId()); + log.trace("response to request: [{}]", response.getId()); if (routingService.getCurrentServer().equals(response.getServerAddress())) { UUID requestId = response.getId(); Consumer consumer = localToDeviceRpcRequests.remove(requestId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java index 997142ece1..96d3870dcd 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java @@ -26,7 +26,6 @@ import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -46,7 +45,6 @@ public abstract class AbstractBufferedRateExecutor taskCtx = null; try { if (curLvl <= concurrencyLimit) { - taskCtx = queue.poll(1, TimeUnit.SECONDS); - if (taskCtx == null) { - continue; - } + taskCtx = queue.take(); final AsyncTaskContext finalTaskCtx = taskCtx; logTask("Processing", finalTaskCtx); concurrencyLevel.incrementAndGet(); @@ -151,6 +146,8 @@ public abstract class AbstractBufferedRateExecutor