Browse Source

KafkaEdgeGrpcSession - move commit outside try/catch to propagate expection in case consumer already closed

pull/14616/head
Volodymyr Babak 6 months ago
parent
commit
ca71ced683
  1. 8
      application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java

8
application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java

@ -82,16 +82,18 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession {
edgeEvents.add(edgeEvent);
}
List<DownlinkMsg> downlinkMsgsPack = convertToDownlinkMsgsPack(edgeEvents);
boolean isInterrupted = true;
try {
boolean isInterrupted = sendDownlinkMsgsPack(downlinkMsgsPack).get();
isInterrupted = sendDownlinkMsgsPack(downlinkMsgsPack).get();
if (isInterrupted) {
log.debug("[{}][{}] Send downlink messages task was interrupted", tenantId, edge.getId());
} else {
consumer.commit();
}
} catch (Exception e) {
log.error("[{}][{}] Failed to process downlink messages", tenantId, edge.getId(), e);
}
if (!isInterrupted) {
consumer.commit();
}
}
@Override

Loading…
Cancel
Save