|
|
|
@ -131,7 +131,7 @@ public class PartitionedQueueResponseTemplate<Request extends TbQueueMsg, Respon |
|
|
|
@Override |
|
|
|
public void onFailure(Throwable t) { |
|
|
|
log.error("[{}] Failed to send response {}", requestId, response, t); |
|
|
|
sendErrorResponse(requestId, responseTopic, request, t); |
|
|
|
sendErrorResponse(requestId, tpi, request, t); |
|
|
|
stats.incrementFailed(); |
|
|
|
} |
|
|
|
}); |
|
|
|
@ -158,12 +158,11 @@ public class PartitionedQueueResponseTemplate<Request extends TbQueueMsg, Respon |
|
|
|
consumer.commit(); |
|
|
|
} |
|
|
|
|
|
|
|
private void sendErrorResponse(UUID requestId, String responseTopic, Request request, Throwable cause) { |
|
|
|
private void sendErrorResponse(UUID requestId, TopicPartitionInfo tpi, Request request, Throwable cause) { |
|
|
|
Response errorResponseMsg = handler.constructErrorResponseMsg(request, cause); |
|
|
|
|
|
|
|
if (errorResponseMsg != null) { |
|
|
|
errorResponseMsg.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId)); |
|
|
|
TopicPartitionInfo tpi = TopicPartitionInfo.builder().topic(responseTopic).build(); |
|
|
|
responseProducer.send(tpi, errorResponseMsg, null); |
|
|
|
} |
|
|
|
} |
|
|
|
|