Browse Source

fix(edge): deliver FEATURE_DISABLED rejection cause instead of masking it with channel-shutdown error

pull/15325/head
Andrii Landiak 1 week ago
parent
commit
7ff40f4011
  1. 19
      common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java

19
common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java

@ -47,6 +47,7 @@ import org.thingsboard.server.gen.edge.v1.UplinkResponseMsg;
import javax.net.ssl.SSLException;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
@ -153,6 +154,14 @@ public class EdgeGrpcClient implements EdgeRpcClient {
Consumer<DownlinkMsg> onDownlink,
Consumer<Exception> onError) {
return new StreamObserver<>() {
// Set only when a connection rejection is handled in onNext(). The disconnect() we then
// issue shuts down the channel and provokes a generic stream onError; this flag lets that
// self-inflicted error be ignored instead of racing ahead of and masking the rejection cause.
// A genuine stream failure (onError with no preceding rejection) does not set it and behaves
// exactly as before.
private final AtomicBoolean terminated = new AtomicBoolean(false);
@Override
public void onNext(ResponseMsg responseMsg) {
if (responseMsg.hasConnectResponseMsg()) {
@ -166,6 +175,10 @@ public class EdgeGrpcClient implements EdgeRpcClient {
onEdgeUpdate.accept(connectResponseMsg.getConfiguration());
} else {
log.error("[{}] Failed to establish the connection! Code: {}. Error message: {}.", edgeKey, connectResponseMsg.getResponseCode(), connectResponseMsg.getErrorMsg());
// Claim the terminal-error slot before disconnecting: disconnect() shuts down the
// channel, which surfaces a generic stream onError that would otherwise be delivered
// first and mask the application-level cause reported below.
terminated.set(true);
try {
EdgeGrpcClient.this.disconnect(true);
} catch (InterruptedException e) {
@ -192,6 +205,12 @@ public class EdgeGrpcClient implements EdgeRpcClient {
@Override
public void onError(Throwable t) {
log.warn("[{}] Stream was terminated due to error:", edgeKey, t);
if (terminated.get()) {
// A connection rejection was already reported in onNext(); this is the channel-shutdown
// error provoked by our own disconnect(). Ignore it so it does not mask the real cause.
log.debug("[{}] Ignoring stream termination provoked by rejection handling.", edgeKey);
return;
}
try {
EdgeGrpcClient.this.disconnect(true);
} catch (InterruptedException e) {

Loading…
Cancel
Save