Skip to content

HADOOP-13600. S3a rename() to copy files in a directory in parallel #157

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

sahilTakiar
Copy link
Contributor

@sahilTakiar sahilTakiar commented Nov 10, 2016

  • Creates two separate TransferManagers - one for file uploads and one for file copies; this way they can be both configured separately; users may want to set the # of parallel uploads to a lower value than the # of parallel copies because uploads require actual I/O while copies do not
  • Main modifications are to the innerRename method
  • Instead of renaming a directory file by file, use the copy TranferManager to upload them in parallel
  • Copies are submitted to the TransferManager and then tracked using the returned Copy object
  • Deletes are handled via a BlockingQueue - once each copy is complete it adds a key to the queue, once MAX_ENTRIES_TO_DELETE keys need to be deleted, then the removeKeys method is invoked
  • A separate thread is spawned to read from the delete queue and issue the delete requests
  • A ProgressListener is used to track when a copy has been completed, once a copy finishes, the key to delete is added to the delete queue
  • Some other re-factoring to make the above possible

break;
case TRANSFER_COMPLETED_EVENT:
try {
innerDelete(srcStatus, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we add some threads for low priority delete/create calls, then this can be done in the background. It'd also change how failures are handed: delete won't fail; no need to catch IOEs or AmazonClientExceptions. However, I'd pool the delete operations to avoid throttling problems

@sahilTakiar sahilTakiar force-pushed the HADOOP-13600 branch 2 times, most recently from 9d2b4e7 to e616400 Compare November 30, 2016 05:40
"s3a-upload-shared");

copyThreadPoolExecutor = BlockingThreadPoolExecutorService.newInstance(
maxThreads,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

COPY is server-side (no data transfer) and is thus generally much less resource-intensive and much quicker than PUT (the smaller your bandwidth to S3, the bigger the difference becomes). So I think the maxThreads for the copyThreadpool could be (much) higher than for uploadThreadpool and should thus be configurable separately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about the core pool size? I've updated both for now. Wasn't sure what reasonable defaults would be, so I just put down by best guess.

public static final int UPLOAD_DEFAULT_MAX_THREADS = 10;

// the maximum number of threads to allow in the pool used by TransferManager for copies
public static final String COPY_MAX_THREADS = "fs.s3a.threads.max";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fs.s3a.copy.threads.max?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, fixed. Leaving the parameters for the upload TransferManager the same since changing the name would be a backwards incompatible change.

public static final int DEFAULT_MAX_THREADS = 10;
// the maximum number of threads to allow in the pool used by TransferManager for uploads
public static final String UPLOAD_MAX_THREADS = "fs.s3a.threads.max";
public static final int DEFAULT_UPLOAD_MAX_THREADS = 10;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not going to rename constants because it will break anyone using them. Class is tagged {@InterfaceAudience.Public/evolving}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@@ -269,7 +273,8 @@ public StorageStatistics provide() {
}
useListV1 = (listVersion == 1);

initTransferManager();
this.uploads = createUploadTransferManager(conf);
this.copies = createCopyTransferManager(conf);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd actually like this to be on demand. That way, if not needed, no overhead of creating it. init times of s3a are already high

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a separate class called LazyTransferManager that lazily initializes a TransferManager.

return uploads;
}

private TransferManager createCopyTransferManager(Configuration conf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is almost the same as the previous one apart from the conf fields and names; should be a common method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-factored. Most of this was moved into LazyTransferManager. I abstracted the determination of # of threads into a common method.


try {
// Launch a thread that will read from the deleteQueue and batch delete any files that have already been copied
deleteFuture = new FutureTask<>(new Callable<Void>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we target java 8 only, this can be a proper lambda-expression :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to lambda expression.

@@ -143,9 +150,11 @@
private Listing listing;
private long partSize;
private boolean enableMultiObjectsDelete;
private TransferManager transfers;
private TransferManager uploads;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leave the name of this alone unless really, really needed because (a) we may do other things with it in future, (b) reduces the size of the diff hence compatibility with other patches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@@ -2167,7 +2280,34 @@ public String getCanonicalServiceName() {
* @throws IOException Other IO problems
*/
private void copyFile(String srcKey, String dstKey, long size)
throws IOException, InterruptedIOException, AmazonClientException {
throws IOException, AmazonClientException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leave as before so as to make clear it may be interrupted, unless its mentioned in the javadocs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@@ -0,0 +1,34 @@
package org.apache.hadoop.fs.s3a;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will need apache license text at top.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, always forget those. Fixed.

@@ -0,0 +1,63 @@
package org.apache.hadoop.fs.s3a;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto here.. new files need the apache license text at top

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Optional<DeleteObjectsRequest.KeyVersion> key = deleteQueue.take();

// The thread runs until is is given an EOF message (an Optional#empty())
if (key.isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use of Optional will make this harder to backport to Java 7 branches, no? Looks like you are using it for an EOF marker only? You could also create a static final KeyVersion instance and use a reference equality comparison to detect that it is the special EOF version?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm making the leap to Java 8 with the committer and retry logic. Doesn't mean we should jump to using Optional just because we can. Put differently: if you are going to just use it as a fancy null, it's overkill. Key benefit is that you and use map & foreach, but there we are crippled by java's checked exceptions stopping us throwing IOEs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the usage of Optional. Using a private static final DeleteObjectsRequest.KeyVersion END_OF_KEYS_TO_DELETE = new DeleteObjectsRequest.KeyVersion(null, null); as the EOF instead.

if (!deleteFutureComplete) {
if (deleteFuture != null && !deleteFuture.isDone() && !deleteFuture.isCancelled()) {
deleteFuture.cancel(true);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we can move parallel rename logic to a new class.. S3AFileSystem is getting too big.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmm. I mostly concur, handing in either the S3aFS or the (expanded) WriteOperationsHelper

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved all the parallel rename logic into a dedicated calls called ParallelDirectoryRenamer

return srcKey;
}

String getDstKey() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, go on, use "destKey". We can afford the letter e's

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

int totalTasks = intOption(conf,
MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1);
long keepAliveTime = longOption(conf, KEEPALIVE_TIME,
DEFAULT_KEEPALIVE_TIME, 0);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally, keep superfluous edits to a min. I know, we all abuse that...but its good to try

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@@ -303,7 +296,37 @@ public StorageStatistics provide() {
} catch (AmazonClientException e) {
throw translateException("initializing ", new Path(name), e);
}
}

private int getMaxThreads(Configuration conf, String maxThreadsKey, int defaultMaxThreads) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to S3AUtils

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return maxThreads;
}

private LazyTransferManager createLazyUploadTransferManager(Configuration conf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic could be moved into LazyUploadTransferManager itself, maybe just as a static method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@steveloughran
Copy link
Contributor

In HADOOP-13786 I'm wrapping every single s3 client call with retry policy, then expanding the inconsistent client to generate more faults (initially throttle, later connection setup/response parsing). I'd really like this work to actually await that, as without it this code isn't going to be resilient to large copies where you are much more likely to hit parallel IO. And we need to make sure there's a good failure policy set up there

@sahilTakiar
Copy link
Contributor Author

Updates:

  • Moved the parallel rename logic into a dedicated class called ParallelDirectoryRenamer
  • A few other bug fixes, the core logic remains the same

@steveloughran your last comment on HADOOP-13786 suggested you may move the retry logic out into a separate patch? Are you planning to do that? If not, do you think this patch requires waiting for all the work in HADOOP-13786 to be completed?

If there are concerns with retry behavior, we could also set the default value of the copy thread pool to be 1, that way this feature is essentially off by default.

Also what do you mean by "isn't going to be resilient to large copies where you are much more likely to hit parallel IO"? What parallel IO are you referring to?

@steveloughran
Copy link
Contributor

steveloughran commented Sep 14, 2017

  1. I don't see why this needs to be blocked waiting for all of the '13786 patch to go in
  2. but the rename/throttling stuff: yes.
  3. And yes, I do think I can pull out the retry logic once we've got it stable (the committer code is stressing it). But it will be java 8...whoever wants it in branch-2 gets to convert the closure around every s3 call to an anonymous Callable

Parallel execution: we've got another transfer manager issuing COPY requests, so more HTTPS requests to an S3 bucket/shard. The more requests to a single shard, the likelier you are to hit failures. Now, I think the xfer manager does handle the 503 replies which come back, but the rest of the FS client doesn't

@sahilTakiar
Copy link
Contributor Author

@steveloughran ok that makes sense. Thanks for the explanation. Let me know if you need any help with pulling the retry logic out.

@aajisaka aajisaka closed this Jul 26, 2019
shanthoosh pushed a commit to shanthoosh/hadoop that referenced this pull request Oct 15, 2019
This is reincarnation of PR apache#157
It was not possible to merge/rebase that branch, so I had to create another one and reapply the changes.

Author: Boris Shkolnik <boryas@apache.org>

Reviewers: Xinyu Liu <xinyu@apache.org>

Closes apache#178 from sborya/unitTestHappyPath4
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants