Skip to content

HADOOP-16490. Improve S3Guard handling of FNFEs in copy #1229

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ public void displayError(Exception e) {
if (e instanceof InterruptedIOException) {
throw new CommandInterruptException();
}

LOG.debug("{} failure", getName(), e);
String errorMessage = e.getLocalizedMessage();
if (errorMessage == null) {
// this is an unexpected condition, so dump the whole exception since
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import java.util.Map.Entry;
import java.util.NoSuchElementException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -58,7 +61,11 @@
* a source and resolved target. Sources are resolved as children of
* a destination directory.
*/
abstract class CommandWithDestination extends FsCommand {
abstract class CommandWithDestination extends FsCommand {

protected static final Logger LOG = LoggerFactory.getLogger(
CommandWithDestination.class);

protected PathData dst;
private boolean overwrite = false;
private boolean verifyChecksum = true;
Expand Down Expand Up @@ -220,6 +227,7 @@ protected void processArguments(LinkedList<PathData> args)
}
} else if (dst.exists) {
if (!dst.stat.isDirectory() && !overwrite) {
LOG.debug("Destination file exists: {}", dst.stat);
throw new PathExistsException(dst.toString());
}
} else if (!dst.parentExists()) {
Expand Down Expand Up @@ -407,6 +415,7 @@ protected void copyStreamToTarget(InputStream in, PathData target)
targetFs.setWriteChecksum(writeChecksum);
targetFs.writeStreamToFile(in, tempTarget, lazyPersist, direct);
if (!direct) {
targetFs.deleteOnExit(tempTarget.path);
targetFs.rename(tempTarget, target);
}
} finally {
Expand Down Expand Up @@ -484,6 +493,15 @@ void writeStreamToFile(InputStream in, PathData target,
try {
out = create(target, lazyPersist, direct);
IOUtils.copyBytes(in, out, getConf(), true);
} catch (IOException e) {
// failure: clean up if we got as far as creating the file
if (!direct && out != null) {
try {
fs.delete(target.path, false);
} catch (IOException ignored) {
}
}
throw e;
} finally {
IOUtils.closeStream(out); // just in case copyBytes didn't
}
Expand All @@ -493,37 +511,31 @@ void writeStreamToFile(InputStream in, PathData target,
FSDataOutputStream create(PathData item, boolean lazyPersist,
boolean direct)
throws IOException {
try {
if (lazyPersist) {
long defaultBlockSize;
try {
defaultBlockSize = getDefaultBlockSize();
} catch (NotInMountpointException ex) {
// ViewFileSystem#getDefaultBlockSize() throws an exception as it
// needs a target FS to retrive the default block size from.
// Hence, for ViewFs, we should call getDefaultBlockSize with the
// target path.
defaultBlockSize = getDefaultBlockSize(item.path);
}

EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE, LAZY_PERSIST);
return create(item.path,
FsPermission.getFileDefault().applyUMask(
FsPermission.getUMask(getConf())),
createFlags,
getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT),
(short) 1,
defaultBlockSize,
null,
null);
} else {
return create(item.path, true);
}
} finally { // might have been created but stream was interrupted
if (!direct) {
deleteOnExit(item.path);
if (lazyPersist) {
long defaultBlockSize;
try {
defaultBlockSize = getDefaultBlockSize();
} catch (NotInMountpointException ex) {
// ViewFileSystem#getDefaultBlockSize() throws an exception as it
// needs a target FS to retrive the default block size from.
// Hence, for ViewFs, we should call getDefaultBlockSize with the
// target path.
defaultBlockSize = getDefaultBlockSize(item.path);
}

EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE, LAZY_PERSIST);
return create(item.path,
FsPermission.getFileDefault().applyUMask(
FsPermission.getUMask(getConf())),
createFlags,
getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT),
(short) 1,
defaultBlockSize,
null,
null);
} else {
return create(item.path, true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1663,7 +1663,7 @@
<value>7</value>
<description>
Number of times to retry any repeatable S3 client request on failure,
excluding throttling requests.
excluding throttling requests and S3Guard inconsistency resolution.
</description>
</property>

Expand All @@ -1672,7 +1672,7 @@
<value>500ms</value>
<description>
Initial retry interval when retrying operations for any reason other
than S3 throttle errors.
than S3 throttle errors and S3Guard inconsistency resolution.
</description>
</property>

Expand All @@ -1692,6 +1692,27 @@
</description>
</property>

<property>
<name>fs.s3a.s3guard.consistency.retry.limit</name>
<value>7</value>
<description>
Number of times to retry attempts to read/open/copy files when
S3Guard believes a specific version of the file to be available,
but the S3 request does not find any version of a file, or a different
version.
</description>
</property>

<property>
<name>fs.s3a.s3guard.consistency.retry.interval</name>
<value>2s</value>
<description>
Initial interval between attempts to retry operations while waiting for S3
to become consistent with the S3Guard data.
An exponential back-off is used here: every failure doubles the delay.
</description>
</property>

<property>
<name>fs.s3a.committer.name</name>
<value>file</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.fs.shell;

import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

Expand Down Expand Up @@ -77,10 +78,19 @@ public void testCopyStreamTarget() throws Exception {
when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);

tryCopyStream(in, true);
verify(in).close();
verify(out, times(2)).close();
// no data was written.
verify(out, never()).write(any(byte[].class), anyInt(), anyInt());
verify(mockFs, never()).delete(eq(path), anyBoolean());
verify(mockFs).rename(eq(tmpPath), eq(path));
verify(mockFs, never()).delete(eq(tmpPath), anyBoolean());
verify(mockFs, never()).close();
// temp path never had is existence checked. This is critical for S3 as it
// avoids the successful path accidentally getting a 404 into the S3 load
// balancer cache
verify(mockFs, never()).exists(eq(tmpPath));
verify(mockFs, never()).exists(eq(path));
}

@Test
Expand Down Expand Up @@ -110,6 +120,31 @@ public void testInterruptedCreate() throws Exception {
FSDataInputStream in = mock(FSDataInputStream.class);

tryCopyStream(in, false);
verify(mockFs, never()).rename(any(Path.class), any(Path.class));
verify(mockFs, never()).delete(eq(tmpPath), anyBoolean());
verify(mockFs, never()).delete(eq(path), anyBoolean());
verify(mockFs, never()).close();
}

/**
* Create a file but fail in the write.
* The copy operation should attempt to clean up by
* closing the output stream then deleting it.
*/
@Test
public void testFailedWrite() throws Exception {
FSDataOutputStream out = mock(FSDataOutputStream.class);
doThrow(new IOException("mocked"))
.when(out).write(any(byte[].class), anyInt(), anyInt());
whenFsCreate().thenReturn(out);
when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
FSInputStream in = mock(FSInputStream.class);
doReturn(0)
.when(in).read(any(byte[].class), anyInt(), anyInt());
Throwable thrown = tryCopyStream(in, false);
assertExceptionContains("mocked", thrown);
verify(in).close();
verify(out, times(2)).close();
verify(mockFs).delete(eq(tmpPath), anyBoolean());
verify(mockFs, never()).rename(any(Path.class), any(Path.class));
verify(mockFs, never()).delete(eq(path), anyBoolean());
Expand Down Expand Up @@ -155,14 +190,21 @@ private OngoingStubbing<FSDataOutputStream> whenFsCreate() throws IOException {
anyBoolean(), anyInt(), anyShort(), anyLong(), any()));
}

private void tryCopyStream(InputStream in, boolean shouldPass) {
private Throwable tryCopyStream(InputStream in, boolean shouldPass) {
try {
cmd.copyStreamToTarget(new FSDataInputStream(in), target);
return null;
} catch (InterruptedIOException e) {
assertFalse("copy failed", shouldPass);
if (shouldPass) {
throw new AssertionError("copy failed", e);
}
return e;
} catch (Throwable e) {
assertFalse(e.getMessage(), shouldPass);
}
if (shouldPass) {
throw new AssertionError(e.getMessage(), e);
}
return e;
}
}

static class MockFileSystem extends FilterFileSystem {
Expand All @@ -183,4 +225,4 @@ public Configuration getConf() {
return conf;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -761,4 +761,29 @@ private Constants() {
* Default change detection require version: true.
*/
public static final boolean CHANGE_DETECT_REQUIRE_VERSION_DEFAULT = true;

/**
* Number of times to retry any repeatable S3 client request on failure,
* excluding throttling requests: {@value}.
*/
public static final String S3GUARD_CONSISTENCY_RETRY_LIMIT =
"fs.s3a.s3guard.consistency.retry.limit";

/**
* Default retry limit: {@value}.
*/
public static final int S3GUARD_CONSISTENCY_RETRY_LIMIT_DEFAULT = 7;

/**
* Initial retry interval: {@value}.
*/
public static final String S3GUARD_CONSISTENCY_RETRY_INTERVAL =
"fs.s3a.s3guard.consistency.retry.interval";

/**
* Default initial retry interval: {@value}.
*/
public static final String S3GUARD_CONSISTENCY_RETRY_INTERVAL_DEFAULT =
"2s";

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
/**
* Indicates the S3 object is out of sync with the expected version. Thrown in
* cases such as when the object is updated while an {@link S3AInputStream} is
* open.
* open, or when a file expected was never found.
*/
@SuppressWarnings("serial")
@InterfaceAudience.Public
Expand All @@ -35,6 +35,20 @@ public class RemoteFileChangedException extends PathIOException {
public static final String PRECONDITIONS_FAILED =
"Constraints of request were unsatisfiable";

/**
* While trying to get information on a file known to S3Guard, the
* file never became visible in S3.
*/
public static final String FILE_NEVER_FOUND =
"File to rename not found on guarded S3 store after repeated attempts";

/**
* The file wasn't found in rename after a single attempt -the unguarded
* codepath.
*/
public static final String FILE_NOT_FOUND_SINGLE_ATTEMPT =
"File to rename not found on unguarded S3 store";

/**
* Constructs a RemoteFileChangedException.
*
Expand Down
Loading