-
Notifications
You must be signed in to change notification settings - Fork 9.1k
HADOOP-13600. S3a rename() to copy files in a directory in parallel #157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
break; | ||
case TRANSFER_COMPLETED_EVENT: | ||
try { | ||
innerDelete(srcStatus, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we add some threads for low priority delete/create calls, then this can be done in the background. It'd also change how failures are handed: delete won't fail; no need to catch IOEs or AmazonClientExceptions. However, I'd pool the delete operations to avoid throttling problems
9d2b4e7
to
e616400
Compare
"s3a-upload-shared"); | ||
|
||
copyThreadPoolExecutor = BlockingThreadPoolExecutorService.newInstance( | ||
maxThreads, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
COPY is server-side (no data transfer) and is thus generally much less resource-intensive and much quicker than PUT (the smaller your bandwidth to S3, the bigger the difference becomes). So I think the maxThreads
for the copyThreadpool could be (much) higher than for uploadThreadpool and should thus be configurable separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about the core pool size? I've updated both for now. Wasn't sure what reasonable defaults would be, so I just put down by best guess.
e616400
to
7fbfef3
Compare
public static final int UPLOAD_DEFAULT_MAX_THREADS = 10; | ||
|
||
// the maximum number of threads to allow in the pool used by TransferManager for copies | ||
public static final String COPY_MAX_THREADS = "fs.s3a.threads.max"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fs.s3a.copy.threads.max?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoops, fixed. Leaving the parameters for the upload TransferManager
the same since changing the name would be a backwards incompatible change.
7fbfef3
to
343b31f
Compare
343b31f
to
5d116d7
Compare
public static final int DEFAULT_MAX_THREADS = 10; | ||
// the maximum number of threads to allow in the pool used by TransferManager for uploads | ||
public static final String UPLOAD_MAX_THREADS = "fs.s3a.threads.max"; | ||
public static final int DEFAULT_UPLOAD_MAX_THREADS = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not going to rename constants because it will break anyone using them. Class is tagged {@InterfaceAudience.Public/evolving}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@@ -269,7 +273,8 @@ public StorageStatistics provide() { | |||
} | |||
useListV1 = (listVersion == 1); | |||
|
|||
initTransferManager(); | |||
this.uploads = createUploadTransferManager(conf); | |||
this.copies = createCopyTransferManager(conf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd actually like this to be on demand. That way, if not needed, no overhead of creating it. init times of s3a are already high
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a separate class called LazyTransferManager
that lazily initializes a TransferManager
.
return uploads; | ||
} | ||
|
||
private TransferManager createCopyTransferManager(Configuration conf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is almost the same as the previous one apart from the conf fields and names; should be a common method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re-factored. Most of this was moved into LazyTransferManager
. I abstracted the determination of # of threads into a common method.
|
||
try { | ||
// Launch a thread that will read from the deleteQueue and batch delete any files that have already been copied | ||
deleteFuture = new FutureTask<>(new Callable<Void>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we target java 8 only, this can be a proper lambda-expression :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to lambda expression.
@@ -143,9 +150,11 @@ | |||
private Listing listing; | |||
private long partSize; | |||
private boolean enableMultiObjectsDelete; | |||
private TransferManager transfers; | |||
private TransferManager uploads; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leave the name of this alone unless really, really needed because (a) we may do other things with it in future, (b) reduces the size of the diff hence compatibility with other patches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@@ -2167,7 +2280,34 @@ public String getCanonicalServiceName() { | |||
* @throws IOException Other IO problems | |||
*/ | |||
private void copyFile(String srcKey, String dstKey, long size) | |||
throws IOException, InterruptedIOException, AmazonClientException { | |||
throws IOException, AmazonClientException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leave as before so as to make clear it may be interrupted, unless its mentioned in the javadocs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
5d116d7
to
3f6075e
Compare
@@ -0,0 +1,34 @@ | |||
package org.apache.hadoop.fs.s3a; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will need apache license text at top.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoops, always forget those. Fixed.
@@ -0,0 +1,63 @@ | |||
package org.apache.hadoop.fs.s3a; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto here.. new files need the apache license text at top
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
Optional<DeleteObjectsRequest.KeyVersion> key = deleteQueue.take(); | ||
|
||
// The thread runs until is is given an EOF message (an Optional#empty()) | ||
if (key.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use of Optional will make this harder to backport to Java 7 branches, no? Looks like you are using it for an EOF marker only? You could also create a static final KeyVersion instance and use a reference equality comparison to detect that it is the special EOF version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm making the leap to Java 8 with the committer and retry logic. Doesn't mean we should jump to using Optional just because we can. Put differently: if you are going to just use it as a fancy null, it's overkill. Key benefit is that you and use map & foreach, but there we are crippled by java's checked exceptions stopping us throwing IOEs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the usage of Optional
. Using a private static final DeleteObjectsRequest.KeyVersion END_OF_KEYS_TO_DELETE = new DeleteObjectsRequest.KeyVersion(null, null);
as the EOF instead.
if (!deleteFutureComplete) { | ||
if (deleteFuture != null && !deleteFuture.isDone() && !deleteFuture.isCancelled()) { | ||
deleteFuture.cancel(true); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if we can move parallel rename logic to a new class.. S3AFileSystem is getting too big.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mmm. I mostly concur, handing in either the S3aFS or the (expanded) WriteOperationsHelper
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved all the parallel rename logic into a dedicated calls called ParallelDirectoryRenamer
return srcKey; | ||
} | ||
|
||
String getDstKey() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, go on, use "destKey". We can afford the letter e's
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
int totalTasks = intOption(conf, | ||
MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1); | ||
long keepAliveTime = longOption(conf, KEEPALIVE_TIME, | ||
DEFAULT_KEEPALIVE_TIME, 0); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ideally, keep superfluous edits to a min. I know, we all abuse that...but its good to try
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@@ -303,7 +296,37 @@ public StorageStatistics provide() { | |||
} catch (AmazonClientException e) { | |||
throw translateException("initializing ", new Path(name), e); | |||
} | |||
} | |||
|
|||
private int getMaxThreads(Configuration conf, String maxThreadsKey, int defaultMaxThreads) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to S3AUtils
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
return maxThreads; | ||
} | ||
|
||
private LazyTransferManager createLazyUploadTransferManager(Configuration conf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this logic could be moved into LazyUploadTransferManager itself, maybe just as a static method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
In HADOOP-13786 I'm wrapping every single s3 client call with retry policy, then expanding the inconsistent client to generate more faults (initially throttle, later connection setup/response parsing). I'd really like this work to actually await that, as without it this code isn't going to be resilient to large copies where you are much more likely to hit parallel IO. And we need to make sure there's a good failure policy set up there |
3f6075e
to
6597735
Compare
Updates:
@steveloughran your last comment on HADOOP-13786 suggested you may move the retry logic out into a separate patch? Are you planning to do that? If not, do you think this patch requires waiting for all the work in HADOOP-13786 to be completed? If there are concerns with retry behavior, we could also set the default value of the copy thread pool to be 1, that way this feature is essentially off by default. Also what do you mean by "isn't going to be resilient to large copies where you are much more likely to hit parallel IO"? What parallel IO are you referring to? |
Parallel execution: we've got another transfer manager issuing COPY requests, so more HTTPS requests to an S3 bucket/shard. The more requests to a single shard, the likelier you are to hit failures. Now, I think the xfer manager does handle the 503 replies which come back, but the rest of the FS client doesn't |
@steveloughran ok that makes sense. Thanks for the explanation. Let me know if you need any help with pulling the retry logic out. |
This is reincarnation of PR apache#157 It was not possible to merge/rebase that branch, so I had to create another one and reapply the changes. Author: Boris Shkolnik <boryas@apache.org> Reviewers: Xinyu Liu <xinyu@apache.org> Closes apache#178 from sborya/unitTestHappyPath4
TransferManager
s - one for file uploads and one for file copies; this way they can be both configured separately; users may want to set the # of parallel uploads to a lower value than the # of parallel copies because uploads require actual I/O while copies do notinnerRename
methodTranferManager
to upload them in parallelTransferManager
and then tracked using the returnedCopy
objectBlockingQueue
- once each copy is complete it adds a key to the queue, onceMAX_ENTRIES_TO_DELETE
keys need to be deleted, then theremoveKeys
method is invokedProgressListener
is used to track when a copy has been completed, once a copy finishes, the key to delete is added to the delete queue