From ee4b27f96e8156c6a1c4b825484d238055cecf5f Mon Sep 17 00:00:00 2001 From: Tartarus0zm Date: Wed, 29 Jul 2020 18:56:28 +0800 Subject: [PATCH] [FLINK-18663][rest] Improve exception handling - ensure that request finalization runs even if handleException throws an exception - catch NPE in handleException, which occurs if the client closes the connection --- .../runtime/rest/handler/AbstractHandler.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java index 40094bb127e10..f61cac05db5f5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java @@ -57,6 +57,7 @@ import java.util.Arrays; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; /** * Super class for netty-based handlers that work with {@link RequestBody}. @@ -177,13 +178,17 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe final FileUploads finalUploadedFiles = uploadedFiles; requestProcessingFuture + .handle((Void ignored, Throwable throwable) -> { + if (throwable != null) { + return handleException(ExceptionUtils.stripCompletionException(throwable), ctx, httpRequest); + } + return CompletableFuture.completedFuture(null); + }).thenCompose(Function.identity()) .whenComplete((Void ignored, Throwable throwable) -> { if (throwable != null) { - handleException(ExceptionUtils.stripCompletionException(throwable), ctx, httpRequest) - .whenComplete((Void ignored2, Throwable throwable2) -> finalizeRequestProcessing(finalUploadedFiles)); - } else { - finalizeRequestProcessing(finalUploadedFiles); + log.warn("An exception occurred while handling another exception.", throwable); } + finalizeRequestProcessing(finalUploadedFiles); }); } catch (Throwable e) { final FileUploads finalUploadedFiles = uploadedFiles; @@ -199,6 +204,10 @@ private void finalizeRequestProcessing(FileUploads uploadedFiles) { private CompletableFuture handleException(Throwable throwable, ChannelHandlerContext ctx, HttpRequest httpRequest) { FlinkHttpObjectAggregator flinkHttpObjectAggregator = ctx.pipeline().get(FlinkHttpObjectAggregator.class); + if (flinkHttpObjectAggregator == null) { + log.warn("The connection was unexpectedly closed by the client."); + return CompletableFuture.completedFuture(null); + } int maxLength = flinkHttpObjectAggregator.maxContentLength() - OTHER_RESP_PAYLOAD_OVERHEAD; if (throwable instanceof RestHandlerException) { RestHandlerException rhe = (RestHandlerException) throwable;