Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File Shares Sync Stack Upload APIs #41664

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.storage.common.implementation;

import com.azure.core.exception.UnexpectedLengthException;
import com.azure.core.http.HttpMethod;
import com.azure.core.http.HttpResponse;
import com.azure.core.util.Context;
Expand Down Expand Up @@ -497,7 +498,9 @@ public static <T> T sendRequest(Callable<T> operation, Duration timeout,
} else if (cause instanceof RuntimeException) {
// Throw as is if it's already a RuntimeException
throw (RuntimeException) cause;
} else if (cause instanceof Error) {
} else if (e instanceof UnexpectedLengthException) {
throw (UnexpectedLengthException) e;
} else if (cause instanceof Error) {
// Propagate if it's an Error
throw (Error) cause;
} else {
Expand Down
2 changes: 1 addition & 1 deletion sdk/storage/azure-storage-file-share/perf-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ PackageVersions:
- 'com.azure:azure-storage-common': 12.26.1 # {x-version-update;com.azure:azure-storage-common;dependency}
'com.azure:azure-storage-blob': 12.27.1 # {x-version-update;com.azure:azure-storage-blob;dependency}
'com.azure:azure-storage-blob-cryptography': 12.26.1 # {x-version-update;com.azure:azure-storage-blob-cryptography;dependency}
'com.azure:azure-storage-file-share': 12.23.1 # {x-version-update;com.azure:azure-storage-file-share;dependency}
'com.azure:azure-storage-file-share': 12.24.0 # {x-version-update;com.azure:azure-storage-file-share;dependency}
'com.azure:azure-storage-file-datalake': 12.20.1 # {x-version-update;com.azure:azure-storage-file-datalake;dependency}
'com.azure:azure-core': 1.51.0 # {x-version-update;com.azure:azure-core;dependency}
'com.azure:azure-core-http-netty': 1.15.3 # {x-version-update;com.azure:azure-core-http-netty;dependency}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.credential.AzureSasCredential;
import com.azure.core.exception.HttpResponseException;
import com.azure.core.exception.UnexpectedLengthException;
import com.azure.core.http.HttpPipeline;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.rest.PagedIterable;
Expand All @@ -16,6 +17,7 @@
import com.azure.core.http.rest.Response;
import com.azure.core.http.rest.ResponseBase;
import com.azure.core.http.rest.SimpleResponse;
import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.FluxUtil;
Expand Down Expand Up @@ -83,16 +85,30 @@
import com.azure.storage.file.share.sas.ShareServiceSasSignatureValues;
import reactor.core.publisher.Mono;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand Down Expand Up @@ -238,7 +254,7 @@ public final StorageFileOutputStream getFileOutputStream() {
* @throws ShareStorageException If a storage service error occurred.
*/
public final StorageFileOutputStream getFileOutputStream(long offset) {
return new StorageFileOutputStream(shareFileAsyncClient, offset);
return new StorageFileOutputStream(this, offset);
}

/**
Expand Down Expand Up @@ -2058,11 +2074,99 @@ public ShareFileUploadInfo upload(InputStream data, long length, ParallelTransfe
* @return The {@link ShareFileUploadInfo file upload info}
*/
public Response<ShareFileUploadInfo> uploadWithResponse(ShareFileUploadOptions options,
Duration timeout, Context context) {
return StorageImplUtils.blockWithOptionalTimeout(
shareFileAsyncClient.uploadWithResponse(options, context), timeout);
Duration timeout, Context context) {
StorageImplUtils.assertNotNull("options", options);
ShareRequestConditions validatedRequestConditions = options.getRequestConditions() == null
? new ShareRequestConditions()
: options.getRequestConditions();
final ParallelTransferOptions validatedParallelTransferOptions =
ModelHelper.populateAndApplyDefaults(options.getParallelTransferOptions());
long validatedOffset = options.getOffset() == null ? 0 : options.getOffset();

final long blockSize = Math.min(validatedParallelTransferOptions.getBlockSizeLong(), 4L * 1024L * 1024L); // 4 MB
final int maxConcurrency = validatedParallelTransferOptions.getMaxConcurrency();

ExecutorService executor = Executors.newFixedThreadPool(maxConcurrency);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should look into designing this differently, as this will create a new thread pool per API call and will result in many, many Threads being created and torn down. I'd want to look into leveraging SharedExecutorService in azure-core as the thread pool that manages parallelization here, but that generally will have many more threads available than parallelization will be set to, so we'll need to implement some level of rate limiting.

I think we should look initially into limiting the number of calls in parallel by using a Semaphore with a number of permits equal to parallelization (and remembering to release them when that upload chunk is complete).

List<Future<Response<ShareFileUploadInfo>>> futures = new ArrayList<>();

Callable<Response<ShareFileUploadInfo>> uploadOperation = () -> {
long totalBytesUploaded = 0; // Track the total bytes uploaded

try (InputStream dataStream = options.getDataStream()) {
byte[] buffer = new byte[(int) blockSize];
int readBytes;
long currentOffset = validatedOffset;

// Read and upload data in chunks
while ((readBytes = dataStream.read(buffer)) != -1) {
totalBytesUploaded += readBytes; // Track bytes as we read

// Validate byte count immediately against the expected length
if (options.getLength() != null) {
if (totalBytesUploaded > options.getLength()) {
throw LOGGER.logExceptionAsError(
new UnexpectedLengthException(String.format(
"Request body emitted %d bytes, more than the expected %d bytes.",
totalBytesUploaded, options.getLength()), totalBytesUploaded, options.getLength()));
}
}

final long chunkOffset = currentOffset;
final byte[] chunkData = Arrays.copyOf(buffer, readBytes);
final int finalReadBytes = readBytes;

// Submit each chunk to be uploaded in parallel
futures.add(executor.submit(() -> {
InputStream chunkStream = new ByteArrayInputStream(chunkData);
ShareFileUploadRangeOptions rangeOptions = new ShareFileUploadRangeOptions(chunkStream, finalReadBytes)
.setOffset(chunkOffset)
.setRequestConditions(validatedRequestConditions);
return uploadRangeWithResponse(rangeOptions, timeout, context);
}));

currentOffset += readBytes;
}

// Validate total bytes at the end of the upload
if (options.getLength() != null && totalBytesUploaded < options.getLength()) {
throw LOGGER.logExceptionAsError(
new UnexpectedLengthException(String.format(
"Request body emitted %d bytes, less than the expected %d bytes.",
totalBytesUploaded, options.getLength()), totalBytesUploaded, options.getLength()));
}

Response<ShareFileUploadInfo> lastResponse = null;
for (Future<Response<ShareFileUploadInfo>> future : futures) {
lastResponse = future.get();
}

return lastResponse;

} catch (IOException e) {
throw LOGGER.logExceptionAsError(new UncheckedIOException(e));
} catch (InterruptedException | ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof ShareStorageException) {
throw LOGGER.logExceptionAsError((ShareStorageException) cause); // Re-throw ShareStorageException
}
throw LOGGER.logExceptionAsError(new RuntimeException(e));
} finally {
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException ie) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
};

return sendRequest(uploadOperation, timeout, ShareStorageException.class);
}


/**
* Uploads a range of bytes to the specified offset of a file in storage file service. Upload operations perform an
* in-place write on the specified file.
Expand Down Expand Up @@ -2124,10 +2228,24 @@ public ShareFileUploadInfo uploadRange(InputStream data, long length) {
*/
public Response<ShareFileUploadInfo> uploadRangeWithResponse(ShareFileUploadRangeOptions options,
Duration timeout, Context context) {
return StorageImplUtils.blockWithOptionalTimeout(
shareFileAsyncClient.uploadRangeWithResponse(options, context), timeout);
ShareRequestConditions requestConditions = options.getRequestConditions() == null
? new ShareRequestConditions() : options.getRequestConditions();
long rangeOffset = (options.getOffset() == null) ? 0L : options.getOffset();
ShareFileRange range = new ShareFileRange(rangeOffset, rangeOffset + options.getLength() - 1);
Context finalContext = context == null ? Context.NONE : context;

BinaryData binaryData = BinaryData.fromStream(options.getDataStream());

Callable<ResponseBase<FilesUploadRangeHeaders, Void>> operation = () ->
azureFileStorageClient.getFiles().uploadRangeWithResponse(shareName, filePath, range.toString(),
ShareFileRangeWriteType.UPDATE, options.getLength(), null, null, requestConditions.getLeaseId(),
options.getLastWrittenMode(), binaryData, finalContext);

return ModelHelper.uploadRangeHeadersToShareFileInfo(sendRequest(operation, timeout,
ShareStorageException.class));
}


/**
* Uploads a range of bytes from one file to another file.
*
Expand Down Expand Up @@ -2437,7 +2555,76 @@ public void uploadFromFile(String uploadFilePath) {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public void uploadFromFile(String uploadFilePath, ShareRequestConditions requestConditions) {
shareFileAsyncClient.uploadFromFile(uploadFilePath, requestConditions).block();
StorageImplUtils.assertNotNull("uploadFilePath", uploadFilePath);

// Use ShareRequestConditions or default to a new instance if null
ShareRequestConditions validatedRequestConditions = requestConditions == null
? new ShareRequestConditions()
: requestConditions;

// Open a FileChannel to read the file synchronously
try (FileChannel fileChannel = FileChannel.open(Paths.get(uploadFilePath), StandardOpenOption.READ)) {
long fileSize = fileChannel.size();

// Calculate ranges for chunked upload
List<ShareFileRange> fileRanges = sliceFile(fileSize);

// Upload each chunk sequentially
for (ShareFileRange range : fileRanges) {
uploadFileRange(fileChannel, range, validatedRequestConditions);
}

} catch (IOException ex) {
throw LOGGER.logExceptionAsError(new UncheckedIOException(ex));
}
}

/**
* Splits the file into chunks based on the default block size.
*
* @param fileSize The size of the file.
* @return A list of {@link ShareFileRange} representing each chunk.
*/
private static List<ShareFileRange> sliceFile(long fileSize) {
List<ShareFileRange> ranges = new ArrayList<>();
for (long pos = 0; pos < fileSize; pos += ModelHelper.FILE_DEFAULT_BLOCK_SIZE) {
long count = ModelHelper.FILE_DEFAULT_BLOCK_SIZE;
if (pos + count > fileSize) {
count = fileSize - pos;
}
ranges.add(new ShareFileRange(pos, pos + count - 1));
}
return ranges;
}

/**
* Uploads a specific file range synchronously.
*
* @param fileChannel The FileChannel to read from.
* @param range The specific range to upload.
* @param requestConditions {@link ShareRequestConditions} for the upload.
*/
private void uploadFileRange(FileChannel fileChannel, ShareFileRange range, ShareRequestConditions requestConditions) {
long rangeSize = range.getEnd() - range.getStart() + 1;

// Read the data from the file into a ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate((int) rangeSize);
try {
fileChannel.read(buffer, range.getStart());
buffer.flip(); // Prepare buffer for reading

// Use ShareFileUploadRangeOptions to upload the range synchronously
ShareFileUploadRangeOptions uploadRangeOptions = new ShareFileUploadRangeOptions(
new ByteArrayInputStream(buffer.array()), rangeSize)
.setOffset(range.getStart())
.setRequestConditions(requestConditions);

// Perform the upload
uploadRangeWithResponse(uploadRangeOptions, null, null);

} catch (IOException ex) {
throw LOGGER.logExceptionAsError(new UncheckedIOException(ex));
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,47 +5,62 @@

import com.azure.storage.common.StorageOutputStream;
import com.azure.storage.common.implementation.Constants;
import com.azure.storage.file.share.models.ShareFileUploadRangeOptions;
import com.azure.storage.file.share.models.ShareStorageException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.io.UncheckedIOException;
import java.util.Arrays;

/**
* Provides an output stream to write a given storage file resource.
*/
public class StorageFileOutputStream extends StorageOutputStream {
private long offsetPos;

private final ShareFileAsyncClient client;
private final ShareFileClient client;

StorageFileOutputStream(final ShareFileAsyncClient client, long offsetPos) {
StorageFileOutputStream(final ShareFileClient client, long offsetPos) {
super(4 * Constants.MB);
this.client = client;
this.offsetPos = offsetPos;
}

private Mono<Void> uploadData(Flux<ByteBuffer> inputData, long writeLength, long offset) {
return client.uploadWithResponse(inputData, writeLength, offset)
.then()
.onErrorResume(t -> t instanceof IOException || t instanceof ShareStorageException, e -> {
this.lastError = new IOException(e);
return null;
});
private void uploadData(byte[] inputData, int writeLength, long offset) throws IOException {
try {
ByteArrayInputStream inputStream = new ByteArrayInputStream(inputData, 0, writeLength);
ShareFileUploadRangeOptions options = new ShareFileUploadRangeOptions(inputStream, writeLength).setOffset(offset);
client.uploadRangeWithResponse(options, null, null);
} catch (ShareStorageException e) {
this.lastError = new IOException(e);
throw this.lastError; // Ensure the exception is propagated
}
}

@Override
protected Mono<Void> dispatchWrite(byte[] data, int writeLength, long offset) {
if (writeLength == 0) {
return Mono.empty();
}
return Mono.fromRunnable(() -> {
try {
// Calculate the correct file offset before uploading data
long fileOffset = this.offsetPos;
this.offsetPos += writeLength; // Update the global offset after writing

Flux<ByteBuffer> fbb = Mono.fromCallable(() -> ByteBuffer.wrap(data, (int) offset, writeLength)).flux();

long fileOffset = this.offsetPos;
this.offsetPos = this.offsetPos + writeLength;

return this.uploadData(fbb, writeLength, fileOffset);
this.uploadData(Arrays.copyOfRange(data, (int) offset, (int) (offset + writeLength)), writeLength, fileOffset); // Perform the upload
} catch (IOException e) {
this.lastError = e; // Capture the IOException
throw new RuntimeException(e); // Wrap it in an unchecked exception to allow propagation
}
}).onErrorMap(RuntimeException.class, ex -> {
// Handle the IOException and other exceptions as needed
if (ex.getCause() instanceof IOException) {
throw new UncheckedIOException((IOException) ex.getCause()); // Propagate the IOException if that was the cause
}
throw ex; // Propagate any other exception
}).then();
}
}
Loading