Skip to content

Commit 757207b

Browse files
committed
HADOOP-18925. S3A: option to enable/disable CopyFromLocalOperation
Add a new option fs.s3a.optimized.copy.from.local.enabled This will enable (default) or disable the optimized CopyFromLocalOperation upload operation when copyFromLocalFile() is invoked. When false the superclass implementation is used; duration statistics are still collected, though audit span entries in logs will be for the individual fs operations, not the overall operation. ITestS3ACopyFromLocalFile parameterized to run with/without option. This is for debugging. Change-Id: I208b8babd9c9951acbd603d811b4f5f94620892d
1 parent cf3a4b3 commit 757207b

File tree

4 files changed

+138
-29
lines changed

4 files changed

+138
-29
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1347,4 +1347,17 @@ private Constants() {
13471347
*/
13481348
public static final boolean DIRECTORY_OPERATIONS_PURGE_UPLOADS_DEFAULT = false;
13491349

1350+
1351+
/**
1352+
* Is the higher performance copy from local file to S3 enabled?
1353+
* This switch allows for it to be disabled if there are problems.
1354+
* Value: {@value}.
1355+
*/
1356+
public static final String OPTIMIZED_COPY_FROM_LOCAL = "fs.s3a.optimized.copy.from.local.enabled";
1357+
1358+
/**
1359+
* Default value for {@link #OPTIMIZED_COPY_FROM_LOCAL}.
1360+
* Value: {@value}.
1361+
*/
1362+
public static final boolean OPTIMIZED_COPY_FROM_LOCAL_DEFAULT = true;
13501363
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 64 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
462462
*/
463463
private String scheme = FS_S3A;
464464

465+
/**
466+
* Flag to indicate that the higher performance copyFromLocalFile implementation
467+
* should be used.
468+
*/
469+
private boolean optimizedCopyFromLocal;
470+
465471
/** Add any deprecated keys. */
466472
@SuppressWarnings("deprecation")
467473
private static void addDeprecatedKeys() {
@@ -696,6 +702,9 @@ public void initialize(URI name, Configuration originalConf)
696702
AWS_S3_VECTOR_ACTIVE_RANGE_READS, DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS, 1);
697703
vectoredIOContext = populateVectoredIOContext(conf);
698704
scheme = (this.uri != null && this.uri.getScheme() != null) ? this.uri.getScheme() : FS_S3A;
705+
optimizedCopyFromLocal = conf.getBoolean(OPTIMIZED_COPY_FROM_LOCAL,
706+
OPTIMIZED_COPY_FROM_LOCAL_DEFAULT);
707+
LOG.debug("Using optimized copyFromLocal implementation: {}", optimizedCopyFromLocal);
699708
} catch (SdkException e) {
700709
// amazon client exception: stop all services then throw the translation
701710
cleanupWithLogger(LOG, span);
@@ -4021,45 +4030,69 @@ private boolean s3Exists(final Path path, final Set<StatusProbeEnum> probes)
40214030
* the given dst name.
40224031
*
40234032
* This version doesn't need to create a temporary file to calculate the md5.
4024-
* Sadly this doesn't seem to be used by the shell cp :(
4033+
* If {@link Constants#OPTIMIZED_COPY_FROM_LOCAL} is set to false,
4034+
* the superclass implementation is used.
40254035
*
4026-
* delSrc indicates if the source should be removed
40274036
* @param delSrc whether to delete the src
40284037
* @param overwrite whether to overwrite an existing file
40294038
* @param src path
40304039
* @param dst path
40314040
* @throws IOException IO problem
40324041
* @throws FileAlreadyExistsException the destination file exists and
40334042
* overwrite==false
4034-
* @throws SdkException failure in the AWS SDK
40354043
*/
40364044
@Override
40374045
@AuditEntryPoint
40384046
public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
40394047
Path dst) throws IOException {
40404048
checkNotClosed();
4041-
LOG.debug("Copying local file from {} to {}", src, dst);
4042-
trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst,
4043-
() -> new CopyFromLocalOperation(
4044-
createStoreContext(),
4045-
src,
4046-
dst,
4047-
delSrc,
4048-
overwrite,
4049-
createCopyFromLocalCallbacks()).execute());
4049+
LOG.debug("Copying local file from {} to {} (delSrc={} overwrite={}",
4050+
src, dst, delSrc, overwrite);
4051+
if (optimizedCopyFromLocal) {
4052+
trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, () ->
4053+
new CopyFromLocalOperation(
4054+
createStoreContext(),
4055+
src,
4056+
dst,
4057+
delSrc,
4058+
overwrite,
4059+
createCopyFromLocalCallbacks(getActiveAuditSpan()))
4060+
.execute());
4061+
} else {
4062+
// call the superclass, but still count statistics.
4063+
// there is no overall span here, as each FS API call will
4064+
// be in its own span.
4065+
LOG.debug("Using base copyFromLocalFile implementation");
4066+
trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, () -> {
4067+
super.copyFromLocalFile(delSrc, overwrite, src, dst);
4068+
return null;
4069+
});
4070+
}
40504071
}
40514072

4073+
/**
4074+
* Create the CopyFromLocalCallbacks;
4075+
* protected to assist in mocking.
4076+
* @param span audit span.
4077+
* @return the callbacks
4078+
* @throws IOException failure to get the local fs.
4079+
*/
40524080
protected CopyFromLocalOperation.CopyFromLocalOperationCallbacks
4053-
createCopyFromLocalCallbacks() throws IOException {
4081+
createCopyFromLocalCallbacks(final AuditSpanS3A span) throws IOException {
40544082
LocalFileSystem local = getLocal(getConf());
4055-
return new CopyFromLocalCallbacksImpl(local);
4083+
return new CopyFromLocalCallbacksImpl(span, local);
40564084
}
40574085

40584086
protected final class CopyFromLocalCallbacksImpl implements
40594087
CopyFromLocalOperation.CopyFromLocalOperationCallbacks {
4088+
4089+
/** Span to use for all operations. */
4090+
private final AuditSpanS3A span;
40604091
private final LocalFileSystem local;
40614092

4062-
private CopyFromLocalCallbacksImpl(LocalFileSystem local) {
4093+
private CopyFromLocalCallbacksImpl(final AuditSpanS3A span,
4094+
LocalFileSystem local) {
4095+
this.span = span;
40634096
this.local = local;
40644097
}
40654098

@@ -4081,20 +4114,18 @@ public boolean deleteLocal(Path path, boolean recursive) throws IOException {
40814114

40824115
@Override
40834116
public void copyLocalFileFromTo(File file, Path from, Path to) throws IOException {
4084-
trackDurationAndSpan(
4085-
OBJECT_PUT_REQUESTS,
4086-
to,
4087-
() -> {
4088-
final String key = pathToKey(to);
4089-
Progressable progress = null;
4090-
PutObjectRequest.Builder putObjectRequestBuilder =
4091-
newPutObjectRequestBuilder(key, file.length(), false);
4092-
S3AFileSystem.this.invoker.retry("putObject(" + "" + ")", to.toString(), true,
4093-
() -> executePut(putObjectRequestBuilder.build(), progress, putOptionsForPath(to),
4094-
file));
4095-
4096-
return null;
4097-
});
4117+
// the duration of the put is measured, but the active span is the
4118+
// constructor-supplied one -this ensures all audit log events are grouped correctly
4119+
span.activate();
4120+
trackDuration(getDurationTrackerFactory(), OBJECT_PUT_REQUESTS.getSymbol(), () -> {
4121+
final String key = pathToKey(to);
4122+
PutObjectRequest.Builder putObjectRequestBuilder =
4123+
newPutObjectRequestBuilder(key, file.length(), false);
4124+
final String dest = to.toString();
4125+
S3AFileSystem.this.invoker.retry("putObject(" + dest + ")", dest, true, () ->
4126+
executePut(putObjectRequestBuilder.build(), null, putOptionsForPath(to), file));
4127+
return null;
4128+
});
40984129
}
40994130

41004131
@Override
@@ -5399,6 +5430,10 @@ public boolean hasPathCapability(final Path path, final String capability)
53995430
case FS_S3A_CREATE_PERFORMANCE_ENABLED:
54005431
return performanceCreation;
54015432

5433+
// is the optimized copy from local enabled.
5434+
case OPTIMIZED_COPY_FROM_LOCAL:
5435+
return optimizedCopyFromLocal;
5436+
54025437
default:
54035438
return super.hasPathCapability(p, cap);
54045439
}

hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1544,3 +1544,13 @@ software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP
15441544

15451545
When this happens, try to set `fs.s3a.connection.request.timeout` to a larger value or disable it
15461546
completely by setting it to `0`.
1547+
1548+
### <a name="debug-switches"></a> Debugging Switches
1549+
1550+
There are some switches which can be set to enable/disable features and assist
1551+
in isolating problems and at least make them "go away".
1552+
1553+
1554+
| Key | Default | Action |
1555+
|------|---------|----------|
1556+
| `fs.s3a.optimized.copy.from.local.enabled` | `true` | [HADOOP-18925](https://issues.apache.org/jira/browse/HADOOP-18925) enable/disable CopyFromLocalOperation. Also a path capability. |

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACopyFromLocalFile.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,76 @@
1919
package org.apache.hadoop.fs.s3a;
2020

2121
import java.io.File;
22+
import java.util.Arrays;
23+
import java.util.Collection;
2224

2325
import org.apache.hadoop.conf.Configuration;
2426
import org.apache.hadoop.fs.contract.AbstractContractCopyFromLocalTest;
2527
import org.apache.hadoop.fs.contract.AbstractFSContract;
2628
import org.apache.hadoop.fs.contract.s3a.S3AContract;
2729

2830
import org.apache.hadoop.fs.Path;
31+
32+
import org.assertj.core.api.Assertions;
2933
import org.junit.Test;
34+
import org.junit.runner.RunWith;
35+
import org.junit.runners.Parameterized;
3036

37+
import static org.apache.hadoop.fs.s3a.Constants.OPTIMIZED_COPY_FROM_LOCAL;
38+
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
39+
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
40+
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
3141
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
3242

43+
/**
44+
* Test copying files from the local filesystem to S3A.
45+
* Parameterized on whether or not the optimized
46+
* copyFromLocalFile is enabled.
47+
*/
48+
@RunWith(Parameterized.class)
3349
public class ITestS3ACopyFromLocalFile extends
3450
AbstractContractCopyFromLocalTest {
51+
/**
52+
* Parameterization.
53+
*/
54+
@Parameterized.Parameters(name = "enabled={0}")
55+
public static Collection<Object[]> params() {
56+
return Arrays.asList(new Object[][]{
57+
{true},
58+
{false},
59+
});
60+
}
61+
private final boolean enabled;
62+
63+
public ITestS3ACopyFromLocalFile(final boolean enabled) {
64+
this.enabled = enabled;
65+
}
66+
67+
@Override
68+
protected Configuration createConfiguration() {
69+
final Configuration conf = super.createConfiguration();
70+
71+
removeBaseAndBucketOverrides(getTestBucketName(conf), conf,
72+
OPTIMIZED_COPY_FROM_LOCAL);
73+
conf.setBoolean(OPTIMIZED_COPY_FROM_LOCAL, enabled);
74+
disableFilesystemCaching(conf);
75+
return conf;
76+
}
3577

3678
@Override
3779
protected AbstractFSContract createContract(Configuration conf) {
3880
return new S3AContract(conf);
3981
}
4082

83+
@Test
84+
public void testOptionPropagation() throws Throwable {
85+
Assertions.assertThat(getFileSystem().hasPathCapability(new Path("/"),
86+
OPTIMIZED_COPY_FROM_LOCAL))
87+
.describedAs("path capability of %s", OPTIMIZED_COPY_FROM_LOCAL)
88+
.isEqualTo(enabled);
89+
90+
}
91+
4192
@Test
4293
public void testLocalFilesOnly() throws Throwable {
4394
describe("Copying into other file systems must fail");

0 commit comments

Comments
 (0)