-
Notifications
You must be signed in to change notification settings - Fork 8.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-19295. S3A: large uploads can timeout over slow links #7089
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
|
||
package org.apache.hadoop.fs.s3a.impl; | ||
|
||
import java.time.Duration; | ||
import java.util.Base64; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
|
@@ -60,6 +61,7 @@ | |
|
||
import static org.apache.commons.lang3.StringUtils.isNotEmpty; | ||
import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM; | ||
import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.setRequestTimeout; | ||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT; | ||
import static org.apache.hadoop.util.Preconditions.checkArgument; | ||
import static org.apache.hadoop.util.Preconditions.checkNotNull; | ||
|
@@ -128,6 +130,12 @@ public class RequestFactoryImpl implements RequestFactory { | |
*/ | ||
private final boolean isMultipartUploadEnabled; | ||
|
||
/** | ||
* Timeout for uploading objects/parts. | ||
* This will be set on data put/post operations only. | ||
*/ | ||
private final Duration partUploadTimeout; | ||
|
||
/** | ||
* Constructor. | ||
* @param builder builder with all the configuration. | ||
|
@@ -142,6 +150,7 @@ protected RequestFactoryImpl( | |
this.contentEncoding = builder.contentEncoding; | ||
this.storageClass = builder.storageClass; | ||
this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled; | ||
this.partUploadTimeout = builder.partUploadTimeout; | ||
} | ||
|
||
/** | ||
|
@@ -344,6 +353,11 @@ public PutObjectRequest.Builder newPutObjectRequestBuilder(String key, | |
putObjectRequestBuilder.storageClass(storageClass); | ||
} | ||
|
||
// Set the timeout for object uploads but not directory markers. | ||
if (!isDirectoryMarker) { | ||
setRequestTimeout(putObjectRequestBuilder, partUploadTimeout); | ||
} | ||
|
||
return prepareRequest(putObjectRequestBuilder); | ||
} | ||
|
||
|
@@ -595,6 +609,9 @@ public UploadPartRequest.Builder newUploadPartRequestBuilder( | |
.partNumber(partNumber) | ||
.contentLength(size); | ||
uploadPartEncryptionParameters(builder); | ||
|
||
// Set the request timeout for the part upload | ||
setRequestTimeout(builder, partUploadTimeout); | ||
return prepareRequest(builder); | ||
} | ||
|
||
|
@@ -702,6 +719,13 @@ public static final class RequestFactoryBuilder { | |
*/ | ||
private boolean isMultipartUploadEnabled = true; | ||
|
||
/** | ||
* Timeout for uploading objects/parts. | ||
* This will be set on data put/post operations only. | ||
* A zero value means "no custom timeout" | ||
*/ | ||
private Duration partUploadTimeout = Duration.ZERO; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe you have tested this explicitly. Hopefully zero doesn't mean infinite. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, I will set to the default...even though its set up properly in production, there are some test cases which didn't |
||
|
||
private RequestFactoryBuilder() { | ||
} | ||
|
||
|
@@ -799,6 +823,18 @@ public RequestFactoryBuilder withMultipartUploadEnabled( | |
this.isMultipartUploadEnabled = value; | ||
return this; | ||
} | ||
|
||
/** | ||
* Timeout for uploading objects/parts. | ||
* This will be set on data put/post operations only. | ||
* A zero value means "no custom timeout" | ||
* @param value new value | ||
* @return the builder | ||
*/ | ||
public RequestFactoryBuilder withPartUploadTimeout(final Duration value) { | ||
partUploadTimeout = value; | ||
return this; | ||
} | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ | |
import java.io.InputStream; | ||
import java.io.UncheckedIOException; | ||
import java.nio.ByteBuffer; | ||
import java.time.LocalDateTime; | ||
import java.util.function.Supplier; | ||
import javax.annotation.Nullable; | ||
|
||
|
@@ -224,6 +225,12 @@ public static abstract class BaseContentProvider<T extends InputStream> | |
*/ | ||
private T currentStream; | ||
|
||
/** | ||
* When did this upload start? | ||
* Use in error messages. | ||
*/ | ||
private final LocalDateTime startTime; | ||
|
||
/** | ||
* Constructor. | ||
* @param size size of the data. Must be non-negative. | ||
|
@@ -241,6 +248,7 @@ protected BaseContentProvider(int size, @Nullable Supplier<Boolean> isOpen) { | |
checkArgument(size >= 0, "size is negative: %s", size); | ||
this.size = size; | ||
this.isOpen = isOpen; | ||
this.startTime = LocalDateTime.now(); | ||
} | ||
|
||
/** | ||
|
@@ -275,7 +283,10 @@ public final InputStream newStream() { | |
checkOpen(); | ||
streamCreationCount++; | ||
if (streamCreationCount > 1) { | ||
LOG.info("Stream created more than once: {}", this); | ||
LOG.info("Stream recreated: {}", this); | ||
if (LOG.isDebugEnabled()) { | ||
LOG.debug("Stream creation stack", new Exception("here")); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is for some testing, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good q. should we delete it? it's actually pretty handy for troubleshooting networking issues |
||
} | ||
} | ||
return setCurrentStream(createNewStream()); | ||
} | ||
|
@@ -302,6 +313,14 @@ public int getSize() { | |
return size; | ||
} | ||
|
||
/** | ||
* When did this upload start? | ||
* @return start time | ||
*/ | ||
public LocalDateTime getStartTime() { | ||
return startTime; | ||
} | ||
|
||
/** | ||
* Current stream. | ||
* When {@link #newStream()} is called, this is set to the new value, | ||
|
@@ -330,6 +349,7 @@ protected T setCurrentStream(T stream) { | |
public String toString() { | ||
return "BaseContentProvider{" + | ||
"size=" + size + | ||
", initiated at " + startTime + | ||
", streamCreationCount=" + streamCreationCount + | ||
", currentStream=" + currentStream + | ||
'}'; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this part upload timeout different that multipart upload timeout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its my new option, same value for simple PUT as multipart; we patch the individual requests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
anyway