Skip to content

HADOOP-15183 S3Guard store becomes inconsistent after partial failure of rename #843

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Map;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -108,20 +109,55 @@ public static <T> T awaitFuture(final Future<T> future,
*/
public static <T> T raiseInnerCause(final ExecutionException e)
throws IOException {
throw unwrapInnerException(e);
}

/**
* Extract the cause of a completion failure and rethrow it if an IOE
* or RTE.
* @param e exception.
* @param <T> type of return value.
* @return nothing, ever.
* @throws IOException either the inner IOException, or a wrapper around
* any non-Runtime-Exception
* @throws RuntimeException if that is the inner cause.
*/
public static <T> T raiseInnerCause(final CompletionException e)
throws IOException {
throw unwrapInnerException(e);
}

/**
* From the inner cause of an execution exception, extract the inner cause.
* If it is an RTE: throw immediately.
* If it is an IOE: Return.
* If it is a WrappedIOException: Unwrap and return
* Else: create a new IOException.
*
* Recursively handles wrapped Execution and Completion Exceptions in
* case something very complicated has happened.
* @param e exception.
* @return an IOException extracted or built from the cause.
* @throws RuntimeException if that is the inner cause.
*/
private static IOException unwrapInnerException(final Throwable e) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw (IOException) cause;
return (IOException) cause;
} else if (cause instanceof WrappedIOException){
throw ((WrappedIOException) cause).getCause();
return ((WrappedIOException) cause).getCause();
} else if (cause instanceof CompletionException){
return unwrapInnerException(cause);
} else if (cause instanceof ExecutionException){
return unwrapInnerException(cause);
} else if (cause instanceof RuntimeException){
throw (RuntimeException) cause;
} else if (cause != null) {
// other type: wrap with a new IOE
throw new IOException(cause);
return new IOException(cause);
} else {
// this only happens if somebody deliberately raises
// an ExecutionException
throw new IOException(e);
// this only happens if there was no cause.
return new IOException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1213,8 +1213,12 @@

<property>
<name>fs.s3a.connection.maximum</name>
<value>15</value>
<description>Controls the maximum number of simultaneous connections to S3.</description>
<value>72</value>
<description>Controls the maximum number of simultaneous connections to S3.
This must be bigger than the value of fs.s3a.threads.max so as to stop
threads being blocked waiting for new HTTPS connections.
Why not equal? The AWS SDK transfer manager also uses these connections.
</description>
</property>

<property>
Expand Down Expand Up @@ -1312,7 +1316,7 @@

<property>
<name>fs.s3a.threads.max</name>
<value>10</value>
<value>64</value>
<description>The total number of threads available in the filesystem for data
uploads *or any other queued filesystem operation*.</description>
</property>
Expand All @@ -1326,8 +1330,25 @@

<property>
<name>fs.s3a.max.total.tasks</name>
<value>5</value>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about sticking HADOOP-15729 in here? Essentially just providing 0 as the number of core threads to the executor, otherwise a lot of these threads never get cleaned up when idle. This manifests as a massive number of idle threads in long-running services like Impala and Hive Metastore when you've accessed many different buckets. I believe this patch would make the problem worse, so it's a good time to toss in the trivial fix and test it all together,

<description>The number of operations which can be queued for execution</description>
<value>32</value>
<description>The number of operations which can be queued for execution.
This is in addition to the number of active threads in fs.s3a.threads.max.
</description>
</property>

<property>
<name>fs.s3a.executor.capacity</name>
<value>16</value>
<description>The maximum number of submitted tasks which is a single
operation (e.g. rename(), delete()) may submit simultaneously for
execution -excluding the IO-heavy block uploads, whose capacity
is set in "fs.s3a.fast.upload.active.blocks"

All tasks are submitted to the shared thread pool whose size is
set in "fs.s3a.threads.max"; the value of capacity should be less than that
of the thread pool itself, as the goal is to stop a single operation
from overloading that thread pool.
</description>
</property>

<property>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ public void testRenamePopulatesFileAncestors() throws IOException {
* @param dst the destination root to move
* @param nestedPath the nested path to move
*/
private void validateAncestorsMoved(Path src, Path dst, String nestedPath)
protected void validateAncestorsMoved(Path src, Path dst, String nestedPath)
throws IOException {
assertIsDirectory(dst);
assertPathDoesNotExist("src path should not exist", path(src + nestedPath));
Expand Down
5 changes: 5 additions & 0 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,11 @@
<artifactId>aws-java-sdk-bundle</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,15 @@ private Constants() {
public static final String ASSUMED_ROLE_CREDENTIALS_DEFAULT =
SimpleAWSCredentialsProvider.NAME;


// the maximum number of tasks cached if all threads are already uploading
public static final String MAX_TOTAL_TASKS = "fs.s3a.max.total.tasks";

public static final int DEFAULT_MAX_TOTAL_TASKS = 32;

// number of simultaneous connections to s3
public static final String MAXIMUM_CONNECTIONS = "fs.s3a.connection.maximum";
public static final int DEFAULT_MAXIMUM_CONNECTIONS = 15;
public static final int DEFAULT_MAXIMUM_CONNECTIONS = 48;

// connect to s3 over ssl?
public static final String SECURE_CONNECTIONS =
Expand Down Expand Up @@ -194,10 +200,6 @@ private Constants() {
public static final String KEEPALIVE_TIME = "fs.s3a.threads.keepalivetime";
public static final int DEFAULT_KEEPALIVE_TIME = 60;

// the maximum number of tasks cached if all threads are already uploading
public static final String MAX_TOTAL_TASKS = "fs.s3a.max.total.tasks";
public static final int DEFAULT_MAX_TOTAL_TASKS = 5;

// size of each of or multipart pieces in bytes
public static final String MULTIPART_SIZE = "fs.s3a.multipart.size";
public static final long DEFAULT_MULTIPART_SIZE = 104857600; // 100 MB
Expand Down Expand Up @@ -283,6 +285,22 @@ private Constants() {
@InterfaceStability.Unstable
public static final int DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS = 4;

/**
* The capacity of executor queues for operations other than block
* upload, where {@link #FAST_UPLOAD_ACTIVE_BLOCKS} is used instead.
* This should be less than {@link #MAX_THREADS} for fair
* submission.
* Value: {@value}.
*/
public static final String EXECUTOR_CAPACITY = "fs.s3a.executor.capacity";

/**
* The capacity of executor queues for operations other than block
* upload, where {@link #FAST_UPLOAD_ACTIVE_BLOCKS} is used instead.
* Value: {@value}
*/
public static final int DEFAULT_EXECUTOR_CAPACITY = 16;

// Private | PublicRead | PublicReadWrite | AuthenticatedRead |
// LogDeliveryWrite | BucketOwnerRead | BucketOwnerFullControl
public static final String CANNED_ACL = "fs.s3a.acl.default";
Expand Down
Loading