Skip to content

Commit

Permalink
[FLINK-18663][rest] Improve exception handling
Browse files Browse the repository at this point in the history
- ensure that request finalization runs even if handleException throws an exception
- catch NPE in handleException, which occurs if the client closes the connection
  • Loading branch information
Tartarus0zm authored and zentol committed Aug 3, 2020
1 parent 7d39a2c commit ee4b27f
Showing 1 changed file with 13 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

/**
* Super class for netty-based handlers that work with {@link RequestBody}.
Expand Down Expand Up @@ -177,13 +178,17 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe

final FileUploads finalUploadedFiles = uploadedFiles;
requestProcessingFuture
.handle((Void ignored, Throwable throwable) -> {
if (throwable != null) {
return handleException(ExceptionUtils.stripCompletionException(throwable), ctx, httpRequest);
}
return CompletableFuture.<Void>completedFuture(null);
}).thenCompose(Function.identity())
.whenComplete((Void ignored, Throwable throwable) -> {
if (throwable != null) {
handleException(ExceptionUtils.stripCompletionException(throwable), ctx, httpRequest)
.whenComplete((Void ignored2, Throwable throwable2) -> finalizeRequestProcessing(finalUploadedFiles));
} else {
finalizeRequestProcessing(finalUploadedFiles);
log.warn("An exception occurred while handling another exception.", throwable);
}
finalizeRequestProcessing(finalUploadedFiles);
});
} catch (Throwable e) {
final FileUploads finalUploadedFiles = uploadedFiles;
Expand All @@ -199,6 +204,10 @@ private void finalizeRequestProcessing(FileUploads uploadedFiles) {

private CompletableFuture<Void> handleException(Throwable throwable, ChannelHandlerContext ctx, HttpRequest httpRequest) {
FlinkHttpObjectAggregator flinkHttpObjectAggregator = ctx.pipeline().get(FlinkHttpObjectAggregator.class);
if (flinkHttpObjectAggregator == null) {
log.warn("The connection was unexpectedly closed by the client.");
return CompletableFuture.completedFuture(null);
}
int maxLength = flinkHttpObjectAggregator.maxContentLength() - OTHER_RESP_PAYLOAD_OVERHEAD;
if (throwable instanceof RestHandlerException) {
RestHandlerException rhe = (RestHandlerException) throwable;
Expand Down

0 comments on commit ee4b27f

Please sign in to comment.