From ca71ced6833231d564b3d2fd2c07e31a35442e42 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 16 Dec 2025 18:04:14 +0200 Subject: [PATCH] KafkaEdgeGrpcSession - move commit outside try/catch to propagate expection in case consumer already closed --- .../server/service/edge/rpc/KafkaEdgeGrpcSession.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java index 63669a8e3d..e6eaec23e4 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java @@ -82,16 +82,18 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession { edgeEvents.add(edgeEvent); } List downlinkMsgsPack = convertToDownlinkMsgsPack(edgeEvents); + boolean isInterrupted = true; try { - boolean isInterrupted = sendDownlinkMsgsPack(downlinkMsgsPack).get(); + isInterrupted = sendDownlinkMsgsPack(downlinkMsgsPack).get(); if (isInterrupted) { log.debug("[{}][{}] Send downlink messages task was interrupted", tenantId, edge.getId()); - } else { - consumer.commit(); } } catch (Exception e) { log.error("[{}][{}] Failed to process downlink messages", tenantId, edge.getId(), e); } + if (!isInterrupted) { + consumer.commit(); + } } @Override