Skip to content

Commit 525606e

Browse files
authored
Added support for file share reliable download (#22504)
1 parent fa07d23 commit 525606e

File tree

20 files changed

+1195
-53
lines changed

20 files changed

+1195
-53
lines changed

sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/APISpec.groovy

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -672,20 +672,6 @@ class APISpec extends StorageSpec {
672672
}
673673
}
674674

675-
class MockRetryRangeResponsePolicy implements HttpPipelinePolicy {
676-
@Override
677-
Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) {
678-
return next.process().flatMap { HttpResponse response ->
679-
if (response.getRequest().getHeaders().getValue("x-ms-range") != "bytes=2-6") {
680-
return Mono.<HttpResponse> error(new IllegalArgumentException("The range header was not set correctly on retry."))
681-
} else {
682-
// ETag can be a dummy value. It's not validated, but DownloadResponse requires one
683-
return Mono.<HttpResponse> just(new MockDownloadHttpResponse(response, 206, Flux.error(new IOException())))
684-
}
685-
}
686-
}
687-
}
688-
689675
/**
690676
* Injects one retry-able IOException failure per url.
691677
*/

sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobAPITest.groovy

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import com.azure.storage.common.test.shared.extensions.LiveOnly
5050
import com.azure.storage.common.test.shared.extensions.PlaybackOnly
5151
import com.azure.storage.common.test.shared.extensions.RequiredServiceVersion
5252
import com.azure.storage.common.test.shared.policy.MockFailureResponsePolicy
53+
import com.azure.storage.common.test.shared.policy.MockRetryRangeResponsePolicy
5354
import reactor.core.Exceptions
5455
import reactor.core.publisher.Flux
5556
import reactor.core.publisher.Hooks
@@ -416,7 +417,7 @@ class BlobAPITest extends APISpec {
416417
constructed in BlobClient.download().
417418
*/
418419
setup:
419-
def bu2 = getBlobClient(env.primaryAccount.credential, bc.getBlobUrl(), new MockRetryRangeResponsePolicy())
420+
def bu2 = getBlobClient(env.primaryAccount.credential, bc.getBlobUrl(), new MockRetryRangeResponsePolicy("bytes=2-6"))
420421

421422
when:
422423
def range = new BlobRange(2, 5L)
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.storage.common.test.shared.policy;
5+
6+
import com.azure.core.http.HttpPipelineCallContext;
7+
import com.azure.core.http.HttpPipelineNextPolicy;
8+
import com.azure.core.http.HttpResponse;
9+
import com.azure.core.http.policy.HttpPipelinePolicy;
10+
import reactor.core.publisher.Flux;
11+
import reactor.core.publisher.Mono;
12+
13+
import java.io.IOException;
14+
15+
public class MockRetryRangeResponsePolicy implements HttpPipelinePolicy {
16+
17+
private final String rangeMatch;
18+
19+
public MockRetryRangeResponsePolicy(String rangeMatch) {
20+
this.rangeMatch = rangeMatch;
21+
}
22+
23+
@Override
24+
public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) {
25+
return next.process().flatMap(response -> {
26+
if (!response.getRequest().getHeaders().getValue("x-ms-range").equals(rangeMatch)) {
27+
return Mono.error(new IllegalArgumentException("The range header was not set correctly on retry."));
28+
} else {
29+
// ETag can be a dummy value. It's not validated, but DownloadResponse requires one
30+
return Mono.just(new MockDownloadHttpResponse(response, 206, Flux.error(new IOException())));
31+
}
32+
});
33+
}
34+
}

sdk/storage/azure-storage-file-datalake/src/test/java/com/azure/storage/file/datalake/APISpec.groovy

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -485,20 +485,6 @@ class APISpec extends StorageSpec {
485485
}
486486
}
487487

488-
class MockRetryRangeResponsePolicy implements HttpPipelinePolicy {
489-
@Override
490-
Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) {
491-
return next.process().flatMap { HttpResponse response ->
492-
if (response.getRequest().getHeaders().getValue("x-ms-range") != "bytes=2-6") {
493-
return Mono.<HttpResponse> error(new IllegalArgumentException("The range header was not set correctly on retry."))
494-
} else {
495-
// ETag can be a dummy value. It's not validated, but DownloadResponse requires one
496-
return Mono.<HttpResponse> just(new MockDownloadHttpResponse(response, 206, Flux.error(new IOException())))
497-
}
498-
}
499-
}
500-
}
501-
502488
def getMockRequest() {
503489
HttpHeaders headers = new HttpHeaders()
504490
headers.put(Constants.HeaderConstants.CONTENT_ENCODING, "en-US")

sdk/storage/azure-storage-file-datalake/src/test/java/com/azure/storage/file/datalake/FileAPITest.groovy

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import com.azure.storage.common.implementation.Constants
1414
import com.azure.storage.common.test.shared.extensions.LiveOnly
1515
import com.azure.storage.common.test.shared.extensions.RequiredServiceVersion
1616
import com.azure.storage.common.test.shared.policy.MockFailureResponsePolicy
17+
import com.azure.storage.common.test.shared.policy.MockRetryRangeResponsePolicy
18+
import com.azure.storage.file.datalake.models.DownloadRetryOptions
1719
import com.azure.storage.file.datalake.models.AccessTier
1820
import com.azure.storage.file.datalake.models.DataLakeRequestConditions
1921
import com.azure.storage.file.datalake.models.DataLakeStorageException
@@ -977,7 +979,7 @@ class FileAPITest extends APISpec {
977979
constructed in BlobClient.download().
978980
*/
979981
setup:
980-
def fileClient = getFileClient(env.dataLakeAccount.credential, fc.getPathUrl(), new MockRetryRangeResponsePolicy())
982+
def fileClient = getFileClient(env.dataLakeAccount.credential, fc.getPathUrl(), new MockRetryRangeResponsePolicy("bytes=2-6"))
981983

982984
fc.append(new ByteArrayInputStream(data.defaultBytes), 0, data.defaultDataSize)
983985
fc.flush(data.defaultDataSize)

sdk/storage/azure-storage-file-share/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Release History
22

33
## 12.11.0-beta.1 (Unreleased)
4+
- Added support to reliably download a file.
45
- Added support for the 2020-10-02 service version.
56

67
## 12.10.0 (2021-06-09)

sdk/storage/azure-storage-file-share/src/main/java/com/azure/storage/file/share/ShareFileAsyncClient.java

Lines changed: 105 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import com.azure.core.http.rest.PagedResponseBase;
1515
import com.azure.core.http.rest.Response;
1616
import com.azure.core.http.rest.SimpleResponse;
17+
import com.azure.core.http.rest.StreamResponse;
1718
import com.azure.core.util.Context;
1819
import com.azure.core.util.CoreUtils;
1920
import com.azure.core.util.FluxUtil;
@@ -48,6 +49,7 @@
4849
import com.azure.storage.file.share.implementation.util.ShareSasImplUtil;
4950
import com.azure.storage.file.share.models.CloseHandlesInfo;
5051
import com.azure.storage.file.share.models.CopyStatusType;
52+
import com.azure.storage.file.share.models.DownloadRetryOptions;
5153
import com.azure.storage.file.share.models.HandleItem;
5254
import com.azure.storage.file.share.models.LeaseDurationType;
5355
import com.azure.storage.file.share.models.LeaseStateType;
@@ -58,6 +60,7 @@
5860
import com.azure.storage.file.share.models.ShareErrorCode;
5961
import com.azure.storage.file.share.models.ShareFileCopyInfo;
6062
import com.azure.storage.file.share.models.ShareFileDownloadAsyncResponse;
63+
import com.azure.storage.file.share.models.ShareFileDownloadHeaders;
6164
import com.azure.storage.file.share.models.ShareFileHttpHeaders;
6265
import com.azure.storage.file.share.models.ShareFileInfo;
6366
import com.azure.storage.file.share.models.ShareFileMetadataInfo;
@@ -70,6 +73,7 @@
7073
import com.azure.storage.file.share.models.ShareFileUploadRangeFromUrlInfo;
7174
import com.azure.storage.file.share.models.ShareRequestConditions;
7275
import com.azure.storage.file.share.models.ShareStorageException;
76+
import com.azure.storage.file.share.options.ShareFileDownloadOptions;
7377
import com.azure.storage.file.share.options.ShareFileListRangesDiffOptions;
7478
import com.azure.storage.file.share.options.ShareFileUploadRangeFromUrlOptions;
7579
import com.azure.storage.file.share.sas.ShareServiceSasSignatureValues;
@@ -94,6 +98,7 @@
9498
import java.time.OffsetDateTime;
9599
import java.util.ArrayList;
96100
import java.util.Collections;
101+
import java.util.ConcurrentModificationException;
97102
import java.util.List;
98103
import java.util.Map;
99104
import java.util.Objects;
@@ -135,6 +140,7 @@ public class ShareFileAsyncClient {
135140
static final long FILE_DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024L;
136141
static final long FILE_MAX_PUT_RANGE_SIZE = 4 * Constants.MB;
137142
private static final long DOWNLOAD_UPLOAD_CHUNK_TIMEOUT = 300;
143+
private static final Duration TIMEOUT_VALUE = Duration.ofSeconds(60);
138144

139145
private final AzureFileStorageImpl azureFileStorageClient;
140146
private final String shareName;
@@ -735,7 +741,8 @@ private Mono<Response<ShareFileProperties>> downloadResponseInChunk(Response<Sha
735741
}
736742
return chunks;
737743
}).flatMapMany(Flux::fromIterable).flatMap(chunk ->
738-
downloadWithResponse(chunk, false, requestConditions, context)
744+
downloadWithResponse(new ShareFileDownloadOptions().setRange(chunk).setRangeContentMd5(false)
745+
.setRequestConditions(requestConditions), context)
739746
.map(ShareFileDownloadAsyncResponse::getValue)
740747
.subscribeOn(Schedulers.elastic())
741748
.flatMap(fbb -> FluxUtil
@@ -779,8 +786,7 @@ private void channelCleanUp(AsynchronousFileChannel channel) {
779786
*/
780787
public Flux<ByteBuffer> download() {
781788
try {
782-
return downloadWithResponse(null, null).flatMapMany(
783-
ShareFileDownloadAsyncResponse::getValue);
789+
return downloadWithResponse(null).flatMapMany(ShareFileDownloadAsyncResponse::getValue);
784790
} catch (RuntimeException ex) {
785791
return fluxError(logger, ex);
786792
}
@@ -827,25 +833,110 @@ public Mono<ShareFileDownloadAsyncResponse> downloadWithResponse(ShareFileRange
827833
*/
828834
public Mono<ShareFileDownloadAsyncResponse> downloadWithResponse(ShareFileRange range, Boolean rangeGetContentMD5,
829835
ShareRequestConditions requestConditions) {
836+
return downloadWithResponse(new ShareFileDownloadOptions().setRange(range)
837+
.setRangeContentMd5(rangeGetContentMD5).setRequestConditions(requestConditions));
838+
}
839+
840+
/**
841+
* Downloads a file from the system, including its metadata and properties
842+
*
843+
* <p><strong>Code Samples</strong></p>
844+
*
845+
* <p>Download the file from 1024 to 2048 bytes with its metadata and properties and without the contentMD5. </p>
846+
*
847+
* {@codesnippet com.azure.storage.file.share.ShareFileAsyncClient.downloadWithResponse#ShareFileDownloadOptions}
848+
*
849+
* <p>For more information, see the
850+
* <a href="https://docs.microsoft.com/rest/api/storageservices/get-file">Azure Docs</a>.</p>
851+
*
852+
* @param options {@link ShareFileDownloadOptions}
853+
* true, as long as the range is less than or equal to 4 MB in size.
854+
* @return A reactive response containing response data and the file data.
855+
*/
856+
public Mono<ShareFileDownloadAsyncResponse> downloadWithResponse(ShareFileDownloadOptions options) {
830857
try {
831-
return withContext(context -> downloadWithResponse(range, rangeGetContentMD5,
832-
requestConditions, context));
858+
return withContext(context -> downloadWithResponse(options, context));
833859
} catch (RuntimeException ex) {
834860
return monoError(logger, ex);
835861
}
836862
}
837863

838-
Mono<ShareFileDownloadAsyncResponse> downloadWithResponse(ShareFileRange range, Boolean rangeGetContentMD5,
864+
Mono<ShareFileDownloadAsyncResponse> downloadWithResponse(ShareFileDownloadOptions options, Context context) {
865+
options = options == null ? new ShareFileDownloadOptions() : options;
866+
ShareFileRange range = options.getRange() == null ? new ShareFileRange(0) : options.getRange();
867+
ShareRequestConditions requestConditions = options.getRequestConditions() == null
868+
? new ShareRequestConditions() : options.getRequestConditions();
869+
DownloadRetryOptions retryOptions = options.getRetryOptions() == null ? new DownloadRetryOptions()
870+
: options.getRetryOptions();
871+
Boolean getRangeContentMd5 = options.getRangeContentMd5();
872+
873+
return downloadRange(range, getRangeContentMd5, requestConditions, context)
874+
.map(response -> {
875+
String eTag = ModelHelper.getETag(response.getHeaders());
876+
ShareFileDownloadHeaders headers = ModelHelper.transformFileDownloadHeaders(response.getHeaders());
877+
878+
long finalEnd;
879+
if (range.getEnd() == null) {
880+
finalEnd = headers.getContentRange() == null ? headers.getContentLength()
881+
: Long.parseLong(headers.getContentRange().split("/")[1]);
882+
} else {
883+
finalEnd = range.getEnd();
884+
}
885+
886+
Flux<ByteBuffer> bufferFlux = FluxUtil.createRetriableDownloadFlux(
887+
() -> response.getValue().timeout(TIMEOUT_VALUE),
888+
(throwable, offset) -> {
889+
if (!(throwable instanceof IOException || throwable instanceof TimeoutException)) {
890+
return Flux.error(throwable);
891+
}
892+
893+
long newCount = finalEnd - (offset - range.getStart());
894+
895+
/*
896+
It is possible that the network stream will throw an error after emitting all data but before
897+
completing. Issuing a retry at this stage would leave the download in a bad state with incorrect count
898+
and offset values. Because we have read the intended amount of data, we can ignore the error at the end
899+
of the stream.
900+
*/
901+
if (newCount == 0) {
902+
logger.warning("Exception encountered in ReliableDownload after all data read from the network but "
903+
+ "but before stream signaled completion. Returning success as all data was downloaded. "
904+
+ "Exception message: " + throwable.getMessage());
905+
return Flux.empty();
906+
}
907+
908+
try {
909+
return downloadRange(
910+
new ShareFileRange(offset, range.getEnd()), getRangeContentMd5,
911+
requestConditions, context).flatMapMany(r -> {
912+
String receivedETag = ModelHelper.getETag(r.getHeaders());
913+
if (eTag != null && eTag.equals(receivedETag)) {
914+
return r.getValue().timeout(TIMEOUT_VALUE);
915+
} else {
916+
return Flux.<ByteBuffer>error(
917+
new ConcurrentModificationException(String.format("File has been modified "
918+
+ "concurrently. Expected eTag: %s, Received eTag: %s", eTag,
919+
receivedETag)));
920+
}
921+
});
922+
} catch (Exception e) {
923+
return Flux.error(e);
924+
}
925+
},
926+
retryOptions.getMaxRetryRequests(),
927+
range.getStart()
928+
).switchIfEmpty(Flux.just(ByteBuffer.wrap(new byte[0])));
929+
930+
return new ShareFileDownloadAsyncResponse(response.getRequest(), response.getStatusCode(),
931+
response.getHeaders(), bufferFlux, headers);
932+
});
933+
}
934+
935+
private Mono<StreamResponse> downloadRange(ShareFileRange range, Boolean rangeGetContentMD5,
839936
ShareRequestConditions requestConditions, Context context) {
840-
requestConditions = requestConditions == null ? new ShareRequestConditions() : requestConditions;
841937
String rangeString = range == null ? null : range.toString();
842-
843-
return azureFileStorageClient.getFiles()
844-
.downloadWithResponseAsync(shareName, filePath, null, rangeString, rangeGetContentMD5,
845-
requestConditions.getLeaseId(), context)
846-
.map(response -> new ShareFileDownloadAsyncResponse(response.getRequest(), response.getStatusCode(),
847-
response.getHeaders(), response.getValue(),
848-
ModelHelper.transformFileDownloadHeaders(response.getHeaders())));
938+
return azureFileStorageClient.getFiles().downloadWithResponseAsync(shareName, filePath, null,
939+
rangeString, rangeGetContentMD5, requestConditions.getLeaseId(), context);
849940
}
850941

851942
/**

sdk/storage/azure-storage-file-share/src/main/java/com/azure/storage/file/share/ShareFileClient.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.azure.storage.file.share.models.ShareFileUploadRangeOptions;
3434
import com.azure.storage.file.share.models.ShareRequestConditions;
3535
import com.azure.storage.file.share.models.ShareStorageException;
36+
import com.azure.storage.file.share.options.ShareFileDownloadOptions;
3637
import com.azure.storage.file.share.options.ShareFileListRangesDiffOptions;
3738
import com.azure.storage.file.share.options.ShareFileUploadRangeFromUrlOptions;
3839
import com.azure.storage.file.share.sas.ShareServiceSasSignatureValues;
@@ -562,10 +563,36 @@ public ShareFileDownloadResponse downloadWithResponse(OutputStream stream, Share
562563
*/
563564
public ShareFileDownloadResponse downloadWithResponse(OutputStream stream, ShareFileRange range,
564565
Boolean rangeGetContentMD5, ShareRequestConditions requestConditions, Duration timeout, Context context) {
566+
return downloadWithResponse(stream, new ShareFileDownloadOptions().setRange(range)
567+
.setRangeContentMd5(rangeGetContentMD5).setRequestConditions(requestConditions), timeout, context);
568+
}
569+
570+
/**
571+
* Downloads a file from the system, including its metadata and properties
572+
*
573+
* <p><strong>Code Samples</strong></p>
574+
*
575+
* <p>Download the file from 1024 to 2048 bytes with its metadata and properties and without the contentMD5. </p>
576+
*
577+
* {@codesnippet com.azure.storage.file.share.ShareFileClient.downloadWithResponse#OutputStream-ShareFileDownloadOptions-Duration-Context}
578+
*
579+
* <p>For more information, see the
580+
* <a href="https://docs.microsoft.com/rest/api/storageservices/get-file">Azure Docs</a>.</p>
581+
*
582+
* @param stream A non-null {@link OutputStream} where the downloaded data will be written.
583+
* @param options {@link ShareFileDownloadOptions}
584+
* @param timeout An optional timeout applied to the operation. If a response is not returned before the timeout
585+
* concludes a {@link RuntimeException} will be thrown.
586+
* @param context Additional context that is passed through the Http pipeline during the service call.
587+
* @return A response containing the headers and response status code
588+
* @throws NullPointerException If {@code stream} is {@code null}.
589+
* @throws RuntimeException if the operation doesn't complete before the timeout concludes.
590+
*/
591+
public ShareFileDownloadResponse downloadWithResponse(OutputStream stream, ShareFileDownloadOptions options,
592+
Duration timeout, Context context) {
565593
Objects.requireNonNull(stream, "'stream' cannot be null.");
566594

567-
Mono<ShareFileDownloadResponse> download = shareFileAsyncClient.downloadWithResponse(range, rangeGetContentMD5,
568-
requestConditions, context)
595+
Mono<ShareFileDownloadResponse> download = shareFileAsyncClient.downloadWithResponse(options, context)
569596
.flatMap(response -> response.getValue().reduce(stream, (outputStream, buffer) -> {
570597
try {
571598
outputStream.write(FluxUtil.byteBufferToArray(buffer));

sdk/storage/azure-storage-file-share/src/main/java/com/azure/storage/file/share/implementation/util/ModelHelper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,10 @@ public static ShareFileDownloadHeaders transformFileDownloadHeaders(HttpHeaders
192192
}
193193
}
194194

195+
public static String getETag(HttpHeaders headers) {
196+
return headers.getValue("ETag");
197+
}
198+
195199
public static ShareFileItemProperties transformFileProperty(FileProperty property) {
196200
if (property == null) {
197201
return null;

0 commit comments

Comments
 (0)