diff --git a/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java b/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java index 1fae79cb28..fcff055f15 100644 --- a/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java +++ b/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java @@ -47,6 +47,7 @@ import org.thingsboard.server.gen.edge.v1.UplinkResponseMsg; import javax.net.ssl.SSLException; import java.net.InetSocketAddress; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; @@ -153,6 +154,14 @@ public class EdgeGrpcClient implements EdgeRpcClient { Consumer onDownlink, Consumer onError) { return new StreamObserver<>() { + + // Set only when a connection rejection is handled in onNext(). The disconnect() we then + // issue shuts down the channel and provokes a generic stream onError; this flag lets that + // self-inflicted error be ignored instead of racing ahead of and masking the rejection cause. + // A genuine stream failure (onError with no preceding rejection) does not set it and behaves + // exactly as before. + private final AtomicBoolean terminated = new AtomicBoolean(false); + @Override public void onNext(ResponseMsg responseMsg) { if (responseMsg.hasConnectResponseMsg()) { @@ -166,6 +175,10 @@ public class EdgeGrpcClient implements EdgeRpcClient { onEdgeUpdate.accept(connectResponseMsg.getConfiguration()); } else { log.error("[{}] Failed to establish the connection! Code: {}. Error message: {}.", edgeKey, connectResponseMsg.getResponseCode(), connectResponseMsg.getErrorMsg()); + // Claim the terminal-error slot before disconnecting: disconnect() shuts down the + // channel, which surfaces a generic stream onError that would otherwise be delivered + // first and mask the application-level cause reported below. + terminated.set(true); try { EdgeGrpcClient.this.disconnect(true); } catch (InterruptedException e) { @@ -192,6 +205,12 @@ public class EdgeGrpcClient implements EdgeRpcClient { @Override public void onError(Throwable t) { log.warn("[{}] Stream was terminated due to error:", edgeKey, t); + if (terminated.get()) { + // A connection rejection was already reported in onNext(); this is the channel-shutdown + // error provoked by our own disconnect(). Ignore it so it does not mask the real cause. + log.debug("[{}] Ignoring stream termination provoked by rejection handling.", edgeKey); + return; + } try { EdgeGrpcClient.this.disconnect(true); } catch (InterruptedException e) {