Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,16 @@

package software.amazon.nio.spi.s3;

import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static software.amazon.nio.spi.s3.Containers.localStackConnectionEndpoint;

import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Paths;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static software.amazon.nio.spi.s3.Containers.localStackConnectionEndpoint;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

@DisplayName("Files$newDirectoryStream()")
public class FilesDirectoryStreamTest {
Expand All @@ -30,7 +29,7 @@ class DirectoryDoesNotExist {
public void whenBucketNotFound() {
final var path = Paths.get(URI.create(localStackConnectionEndpoint() + "/does-not-exist/some-directory"));
assertThatThrownBy(() -> Files.newDirectoryStream(path, p -> true))
.isInstanceOf(NoSuchFileException.class);
.isInstanceOf(IOException.class);
}
}

Expand Down
22 changes: 14 additions & 8 deletions src/main/java/software/amazon/nio/spi/s3/S3DirectoryStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
import java.nio.file.Path;
import java.util.Iterator;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.model.CommonPrefix;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher;
Expand Down Expand Up @@ -53,20 +55,24 @@ public void close() {
* @param finalDirName the directory name that will be streamed.
* @param listObjectsV2Publisher the publisher that returns objects and common prefixes that are iterated on.
* @return an iterator for {@code Path}s constructed from the {@code ListObjectsV2Publisher}s responses.
* @throws SdkException if there is an error with S3 access. This is an unchecked Exception
*/
private Iterator<Path> pathIteratorForPublisher(
final DirectoryStream.Filter<? super Path> filter,
final FileSystem fs, String finalDirName,
final ListObjectsV2Publisher listObjectsV2Publisher) {
final var prefixPublisher = listObjectsV2Publisher.commonPrefixes().map(CommonPrefix::prefix);
final var keysPublisher = listObjectsV2Publisher.contents().map(S3Object::key);
final ListObjectsV2Publisher listObjectsV2Publisher) throws SdkException {

final Publisher<String> prefixPublisher =
listObjectsV2Publisher.commonPrefixes().map(CommonPrefix::prefix);
final Publisher<String> keysPublisher =
listObjectsV2Publisher.contents().map(S3Object::key);

return Flowable.concat(prefixPublisher, keysPublisher)
.map(fs::getPath)
.filter(path -> !isEqualToParent(finalDirName, path)) // including the parent will induce loops
.filter(path -> tryAccept(filter, path))
.blockingStream()
.iterator();
.map(fs::getPath)
.filter(path -> !isEqualToParent(finalDirName, path)) // including the parent will induce loops
.filter(path -> tryAccept(filter, path))
.blockingIterable()
.iterator();
}


Expand Down
24 changes: 16 additions & 8 deletions src/main/java/software/amazon/nio/spi/s3/S3FileSystemProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -68,6 +67,7 @@
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.S3Response;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedCopy;
Expand Down Expand Up @@ -252,12 +252,20 @@ public DirectoryStream<Path> newDirectoryStream(Path dir, DirectoryStream.Filter

try {
return new S3DirectoryStream(s3Path.getFileSystem(), s3Path.bucketName(), dirName, filter);
} catch (CompletionException e) {
if (e.getCause() instanceof NoSuchBucketException) {
throw new NoSuchFileException("Bucket '" + s3Path.bucketName() + "' not found", s3Path.toString(),
e.getMessage());
} catch (RuntimeException e) {
if (e.getCause() instanceof ExecutionException) {
var cause = (Exception) e.getCause().getCause();
if (cause instanceof NoSuchBucketException) {
throw new FileSystemNotFoundException("Bucket '" + s3Path.bucketName() + "' not found: NoSuchBucket");
}
if (cause instanceof S3Exception && ((S3Exception) cause).statusCode() == 403) {
throw new AccessDeniedException("Access to bucket '" + s3Path.bucketName() + "' denied", s3Path.toString(),
cause.getMessage());
}
throw new IOException(cause.getMessage(), cause);
}
throw e;

throw new IOException(e.getMessage(), e);
}
}

Expand All @@ -283,8 +291,8 @@ public void createDirectory(Path dir, FileAttribute<?>... attrs) throws IOExcept
var timeOut = TIMEOUT_TIME_LENGTH_1;
final var unit = MINUTES;

try {
s3Directory.getFileSystem().client().putObject(
try (S3AsyncClient client = s3Directory.getFileSystem().client()) {
client.putObject(
PutObjectRequest.builder()
.bucket(s3Directory.bucketName())
.key(directoryKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,26 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static software.amazon.nio.spi.s3.S3Matchers.anyConsumer;

import java.io.IOException;
import java.net.URI;
import java.nio.file.AccessDeniedException;
import java.nio.file.AccessMode;
import java.nio.file.DirectoryStream;
import java.nio.file.FileAlreadyExistsException;
Expand All @@ -31,7 +44,7 @@
import java.time.Instant;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -54,9 +67,11 @@
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher;

Expand Down Expand Up @@ -184,6 +199,56 @@ public void newDirectoryStream() throws IOException {
assertThat(stream).hasSize(2);
}

@Test
public void newDirectoryStreamS3AccessDeniedException() {
when(mockClient.listObjectsV2Paginator(anyConsumer())).thenThrow(
// this is what is thrown by the paginator in the case that access is denied
new RuntimeException(new ExecutionException(
S3Exception.builder()
.statusCode(403)
.message("AccessDenied")
.build()
))
);

assertThatThrownBy(() -> provider.newDirectoryStream(fs.getPath(pathUri+"/"), entry -> true))
.isInstanceOf(AccessDeniedException.class)
.hasMessage("Access to bucket 'foo' denied -> /baa/: AccessDenied");
}

@Test
public void newDirectoryStreamS3BucketNotFoundException() {
when(mockClient.listObjectsV2Paginator(anyConsumer())).thenThrow(
// this is what is thrown by the paginator in the case that a bucket doesn't exist
new RuntimeException(new ExecutionException(
NoSuchBucketException.builder()
.statusCode(404)
.message("NoSuchBucket")
.build()
))
);

assertThatThrownBy(() -> provider.newDirectoryStream(fs.getPath(pathUri+"/"), entry -> true))
.isInstanceOf(FileSystemNotFoundException.class)
.hasMessage("Bucket 'foo' not found: NoSuchBucket");
}

@Test
public void newDirectoryStreamOtherExceptionsBecomeIOExceptions() {
when(mockClient.listObjectsV2Paginator(anyConsumer())).thenThrow(
new RuntimeException(new ExecutionException(
S3Exception.builder()
.statusCode(500)
.message("software.amazon.awssdk.services.s3.model.S3Exception: InternalError")
.build()
))
);

assertThatThrownBy(() -> provider.newDirectoryStream(fs.getPath(pathUri+"/"), entry -> true))
.isInstanceOf(IOException.class)
.hasMessage("software.amazon.awssdk.services.s3.model.S3Exception: InternalError");
}

@Test
public void newDirectoryStreamFiltersSelf() throws IOException {
final var publisher = new ListObjectsV2Publisher(mockClient, ListObjectsV2Request.builder().build());
Expand Down Expand Up @@ -221,12 +286,6 @@ public void newDirectoryStreamFilters() throws IOException {
}
}

private int countDirStreamItems(DirectoryStream<Path> stream) {
var count = new AtomicInteger(0);
stream.iterator().forEachRemaining(item -> count.incrementAndGet());
return count.get();
}

@Test
public void createDirectory() throws Exception {
when(mockClient.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class))).thenReturn(CompletableFuture.supplyAsync(() ->
Expand Down Expand Up @@ -390,15 +449,15 @@ public void checkAccessWithoutException() throws Exception {
}

@Test
public void checkAccessWithExceptionHeadObject() throws Exception {
public void checkAccessWithExceptionHeadObject() {
when(mockClient.headObject(anyConsumer())).thenReturn(CompletableFuture.failedFuture(new IOException()));

var foo = fs.getPath("/foo");
assertThrows(IOException.class, () -> provider.checkAccess(foo, AccessMode.READ));
}

@Test
public void checkAccessWithExceptionListObjectsV2() throws Exception {
public void checkAccessWithExceptionListObjectsV2() {
when(mockClient.listObjectsV2(anyConsumer())).thenReturn(CompletableFuture.failedFuture(new IOException()));

var foo = fs.getPath("/dir/");
Expand All @@ -419,7 +478,7 @@ public void getFileAttributeView() {
var foo = fs.getPath("/foo");
final var fileAttributeView = provider.getFileAttributeView(foo, BasicFileAttributeView.class);
assertNotNull(fileAttributeView);
assertTrue(fileAttributeView instanceof S3BasicFileAttributeView);
assertInstanceOf(S3BasicFileAttributeView.class, fileAttributeView);
}

@Test
Expand Down