Skip to content

HADOOP-18637. S3A to support upload of files greater than 2 GB using DiskBlocks #5543

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
<configuration>
<forkCount>${testsThreadCount}</forkCount>
<reuseForks>false</reuseForks>
<trimStackTrace>false</trimStackTrace>
<argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine>
<systemPropertyVariables>
<testsThreadCount>${testsThreadCount}</testsThreadCount>
Expand Down Expand Up @@ -272,6 +273,7 @@
<goal>verify</goal>
</goals>
<configuration>
<trimStackTrace>false</trimStackTrace>
<systemPropertyVariables>
<!-- Propagate scale parameters -->
<fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1255,4 +1255,25 @@ private Constants() {
*/
public static final String PREFETCH_BLOCK_COUNT_KEY = "fs.s3a.prefetch.block.count";
public static final int PREFETCH_BLOCK_DEFAULT_COUNT = 8;

/**
* Option to enable or disable the multipart uploads.
* Value: {@value}.
* <p>
* Default is {@link #DEFAULT_MULTIPART_UPLOAD_ENABLED}.
*/
public static final String MULTIPART_UPLOADS_ENABLED = "fs.s3a.multipart.uploads.enabled";

/**
* Default value for multipart uploads.
* {@value}
*/
public static final boolean DEFAULT_MULTIPART_UPLOAD_ENABLED = true;

/**
* Stream supports multipart uploads to the given path.
*/
public static final String STORE_CAPABILITY_DIRECTORY_MARKER_MULTIPART_UPLOAD_ENABLED =
"fs.s3a.capability.multipart.uploads.enabled";

}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class S3ABlockOutputStream extends OutputStream implements
private final String key;

/** Size of all blocks. */
private final int blockSize;
private final long blockSize;

/** IO Statistics. */
private final IOStatistics iostatistics;
Expand Down Expand Up @@ -169,6 +169,9 @@ class S3ABlockOutputStream extends OutputStream implements
/** Thread level IOStatistics Aggregator. */
private final IOStatisticsAggregator threadIOStatisticsAggregator;

/** Is multipart upload enabled? */
private final boolean isMultipartUploadEnabled;

/**
* An S3A output stream which uploads partitions in a separate pool of
* threads; different {@link S3ADataBlocks.BlockFactory}
Expand All @@ -181,7 +184,6 @@ class S3ABlockOutputStream extends OutputStream implements
this.builder = builder;
this.key = builder.key;
this.blockFactory = builder.blockFactory;
this.blockSize = (int) builder.blockSize;
this.statistics = builder.statistics;
// test instantiations may not provide statistics;
this.iostatistics = statistics.getIOStatistics();
Expand All @@ -195,17 +197,26 @@ class S3ABlockOutputStream extends OutputStream implements
(ProgressListener) progress
: new ProgressableListener(progress);
downgradeSyncableExceptions = builder.downgradeSyncableExceptions;
// create that first block. This guarantees that an open + close sequence
// writes a 0-byte entry.
createBlockIfNeeded();
LOG.debug("Initialized S3ABlockOutputStream for {}" +
" output to {}", key, activeBlock);

// look for multipart support.
this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled;
// block size is infinite if multipart is disabled, so ignore
// what was passed in from the builder.
this.blockSize = isMultipartUploadEnabled
? builder.blockSize
: -1;

if (putTracker.initialize()) {
LOG.debug("Put tracker requests multipart upload");
initMultipartUpload();
}
this.isCSEEnabled = builder.isCSEEnabled;
this.threadIOStatisticsAggregator = builder.ioStatisticsAggregator;
// create that first block. This guarantees that an open + close sequence
// writes a 0-byte entry.
createBlockIfNeeded();
LOG.debug("Initialized S3ABlockOutputStream for {}" +
" output to {}", key, activeBlock);
}

/**
Expand Down Expand Up @@ -318,7 +329,15 @@ public synchronized void write(byte[] source, int offset, int len)
statistics.writeBytes(len);
S3ADataBlocks.DataBlock block = createBlockIfNeeded();
int written = block.write(source, offset, len);
int remainingCapacity = block.remainingCapacity();
if (!isMultipartUploadEnabled) {
// no need to check for space as multipart uploads
// are not available...everything is saved to a single
// (disk) block.
return;
}
// look to see if another block is needed to complete
// the upload or exactly a block was written.
int remainingCapacity = (int) block.remainingCapacity();
if (written < len) {
// not everything was written —the block has run out
// of capacity
Expand Down Expand Up @@ -369,6 +388,8 @@ private synchronized void uploadCurrentBlock(boolean isLast)
*/
@Retries.RetryTranslated
private void initMultipartUpload() throws IOException {
Preconditions.checkState(isMultipartUploadEnabled,
"multipart upload is disabled");
if (multiPartUpload == null) {
LOG.debug("Initiating Multipart upload");
multiPartUpload = new MultiPartUpload(key);
Expand Down Expand Up @@ -558,19 +579,20 @@ public String toString() {
}

/**
* Upload the current block as a single PUT request; if the buffer
* is empty a 0-byte PUT will be invoked, as it is needed to create an
* entry at the far end.
* @throws IOException any problem.
* @return number of bytes uploaded. If thread was interrupted while
* waiting for upload to complete, returns zero with interrupted flag set
* on this thread.
* Upload the current block as a single PUT request; if the buffer is empty a
* 0-byte PUT will be invoked, as it is needed to create an entry at the far
* end.
* @return number of bytes uploaded. If thread was interrupted while waiting
* for upload to complete, returns zero with interrupted flag set on this
* thread.
* @throws IOException
* any problem.
*/
private int putObject() throws IOException {
private long putObject() throws IOException {
LOG.debug("Executing regular upload for {}", writeOperationHelper);

final S3ADataBlocks.DataBlock block = getActiveBlock();
int size = block.dataSize();
long size = block.dataSize();
final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
final PutObjectRequest putObjectRequest = uploadData.hasFile() ?
writeOperationHelper.createPutObjectRequest(
Expand Down Expand Up @@ -617,6 +639,7 @@ public String toString() {
"S3ABlockOutputStream{");
sb.append(writeOperationHelper.toString());
sb.append(", blockSize=").append(blockSize);
sb.append(", isMultipartUploadEnabled=").append(isMultipartUploadEnabled);
// unsynced access; risks consistency in exchange for no risk of deadlock.
S3ADataBlocks.DataBlock block = activeBlock;
if (block != null) {
Expand Down Expand Up @@ -835,7 +858,7 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block,
Preconditions.checkNotNull(uploadId, "Null uploadId");
maybeRethrowUploadFailure();
partsSubmitted++;
final int size = block.dataSize();
final long size = block.dataSize();
bytesSubmitted += size;
final int currentPartNumber = partETagsFutures.size() + 1;
final UploadPartRequest request;
Expand Down Expand Up @@ -1011,7 +1034,7 @@ public void progressChanged(ProgressEvent progressEvent) {
ProgressEventType eventType = progressEvent.getEventType();
long bytesTransferred = progressEvent.getBytesTransferred();

int size = block.dataSize();
long size = block.dataSize();
switch (eventType) {

case REQUEST_BYTE_TRANSFER_EVENT:
Expand Down Expand Up @@ -1126,6 +1149,11 @@ public static final class BlockOutputStreamBuilder {
*/
private IOStatisticsAggregator ioStatisticsAggregator;

/**
* Is Multipart Uploads enabled for the given upload.
*/
private boolean isMultipartUploadEnabled;

private BlockOutputStreamBuilder() {
}

Expand Down Expand Up @@ -1276,5 +1304,11 @@ public BlockOutputStreamBuilder withIOStatisticsAggregator(
ioStatisticsAggregator = value;
return this;
}

public BlockOutputStreamBuilder withMultipartEnabled(
final boolean value) {
isMultipartUploadEnabled = value;
return this;
}
}
}
Loading