Skip to content

Commit

Permalink
AWS: Call abortUpload only once when any of the completable future fa…
Browse files Browse the repository at this point in the history
…ils (apache#5366)

Co-authored-by: Prashant Singh <psinghvk@amazon.com>
  • Loading branch information
singhpk234 and Prashant Singh authored Aug 1, 2022
1 parent 00e0f7b commit aae6155
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 9 deletions.
23 changes: 17 additions & 6 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -337,8 +338,10 @@ private void uploadParts() {
}

if (thrown != null) {
// Exception observed here will be thrown as part of
// CompletionException
// when we will join completable futures.
LOG.error("Failed to upload part: {}", uploadRequest, thrown);
abortUpload();
}
});

Expand All @@ -349,11 +352,19 @@ private void uploadParts() {
private void completeMultiPartUpload() {
Preconditions.checkState(closed, "Complete upload called on open stream: " + location);

List<CompletedPart> completedParts =
multiPartMap.values().stream()
.map(CompletableFuture::join)
.sorted(Comparator.comparing(CompletedPart::partNumber))
.collect(Collectors.toList());
List<CompletedPart> completedParts;
try {
completedParts =
multiPartMap.values().stream()
.map(CompletableFuture::join)
.sorted(Comparator.comparing(CompletedPart::partNumber))
.collect(Collectors.toList());
} catch (CompletionException ce) {
// cancel the remaining futures.
multiPartMap.values().forEach(c -> c.cancel(true));
abortUpload();
throw ce;
}

CompleteMultipartUploadRequest request =
CompleteMultipartUploadRequest.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static org.junit.Assert.fail;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
Expand Down Expand Up @@ -136,7 +135,7 @@ public void testAbortAfterFailedPartUpload() {
.isInstanceOf(mockException.getClass())
.hasMessageContaining(mockException.getMessage());

verify(s3mock, atLeastOnce()).abortMultipartUpload((AbortMultipartUploadRequest) any());
verify(s3mock, times(1)).abortMultipartUpload((AbortMultipartUploadRequest) any());
}

@Test
Expand All @@ -156,7 +155,7 @@ public void testAbortMultipart() {
.isInstanceOf(mockException.getClass())
.hasMessageContaining(mockException.getMessage());

verify(s3mock).abortMultipartUpload((AbortMultipartUploadRequest) any());
verify(s3mock, times(1)).abortMultipartUpload((AbortMultipartUploadRequest) any());
}

@Test
Expand Down

0 comments on commit aae6155

Please sign in to comment.