Skip to content
This repository was archived by the owner on Jul 19, 2024. It is now read-only.

Commit 12d9633

Browse files
authored
Merge pull request #436 from Azure/dev
Dev
2 parents c22bdf0 + e463b46 commit 12d9633

File tree

8 files changed

+654
-19
lines changed

8 files changed

+654
-19
lines changed

ChangeLog.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
2019.02.15 Version 10.5.0
2+
* Added uploadFromNonReplayableFlowable to support uploading arbitrary data sources (like network streams) to a block blob.
3+
14
2019.01.11 Version 10.4.0
25
* Fixed a bug that caused errors when java.io.tempdir has no trailing separator.
36
* Upgrade autorest-clientruntime dependency to include some bug fixes.

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ To get the binaries of this library as distributed by Microsoft, ready for use w
3737
<dependency>
3838
<groupId>com.microsoft.azure</groupId>
3939
<artifactId>azure-storage-blob</artifactId>
40-
<version>10.4.0</version>
40+
<version>10.5.0</version>
4141
</dependency>
4242
```
4343

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
<groupId>com.microsoft.azure</groupId>
1616
<artifactId>azure-storage-blob</artifactId>
17-
<version>10.4.0</version>
17+
<version>10.5.0</version>
1818

1919
<name>Azure Storage Blob</name>
2020
<description>The Azure Storage Java Blob library.</description>

src/main/java/com/microsoft/azure/storage/blob/Constants.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ static final class HeaderConstants {
184184
/**
185185
* Specifies the value to use for UserAgent header.
186186
*/
187-
static final String USER_AGENT_VERSION = "10.4.0";
187+
static final String USER_AGENT_VERSION = "10.5.0";
188188

189189
private HeaderConstants() {
190190
// Private to prevent construction.

src/main/java/com/microsoft/azure/storage/blob/TransferManager.java

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.microsoft.azure.storage.blob;
1717

1818
import com.microsoft.azure.storage.blob.models.BlobDownloadHeaders;
19+
import com.microsoft.azure.storage.blob.models.BlockBlobCommitBlockListResponse;
1920
import com.microsoft.azure.storage.blob.models.ModifiedAccessConditions;
2021
import com.microsoft.rest.v2.util.FlowableUtil;
2122
import io.reactivex.Flowable;
@@ -286,4 +287,135 @@ private static Single<List<Object>> getSetupSingle(BlobURL blobURL, BlobRange r,
286287
return Single.just(Arrays.asList(r.count(), o.accessConditions()));
287288
}
288289
}
290+
291+
/**
292+
* Uploads the contents of an arbitrary {@code Flowable} to a block blob. This Flowable need not be replayable and
293+
* therefore it may have as its source a network stream or any other data for which the replay behavior is unknown
294+
* (non-replayable meaning the Flowable may not return the exact same data on each subscription).
295+
*
296+
* To eliminate the need for replayability on the source, the client must perform some buffering in order to ensure
297+
* the actual data passed to the network is replayable. This is important in order to support retries, which are
298+
* crucial for reliable data transfer. Typically, the greater the number of buffers used, the greater the possible
299+
* parallelism. Larger buffers means we will have to stage fewer blocks. The tradeoffs between these values are
300+
* context-dependent, so some experimentation may be required to optimize inputs for a given scenario.
301+
*
302+
* Note that buffering must be strictly sequential. Only the upload portion of this operation may be parallelized;
303+
* the reads cannot be. Therefore, this method is not as optimal as
304+
* {@link #uploadFileToBlockBlob(AsynchronousFileChannel, BlockBlobURL, int, TransferManagerUploadToBlockBlobOptions)}
305+
* and if the source is known to be a file, that method should be preferred.
306+
*
307+
* @param source
308+
* Contains the data to upload. Unlike other upload methods in this library, this method does not require
309+
* that the Flowable be replayable.
310+
* @param blockBlobURL
311+
* Points to the blob to which the data should be uploaded.
312+
* @param blockSize
313+
* The size of each block that will be staged. This value also determines the size that each buffer used by
314+
* this method will be and determines the number of requests that need to be made. The amount of memory
315+
* consumed by this method may be up to blockSize * numBuffers. If block size is large, this method will
316+
* make fewer network calls, but each individual call will send more data and will therefore take longer.
317+
* @param numBuffers
318+
* The maximum number of buffers this method should allocate. Must be at least two. Generally this value
319+
* should have some relationship to the value for parallelism passed via the options. If the number of
320+
* available buffers is smaller than the level of parallelism, then this method will not be able to make
321+
* full use of the available parallelism. It is unlikely that the value need be more than two times the
322+
* level of parallelism as such a value means that (assuming buffering is fast enough) there are enough
323+
* available buffers to have both one occupied for each worker and one ready for all workers should they
324+
* all complete the current request at approximately the same time. The amount of memory consumed by this
325+
* method may be up to blockSize * numBuffers.
326+
* @param options
327+
* {@link TransferManagerUploadToBlockBlobOptions}
328+
* @return Emits the successful response.
329+
*
330+
* @apiNote ## Sample Code \n
331+
* [!code-java[Sample_Code](../azure-storage-java/src/test/java/com/microsoft/azure/storage/Samples.java?name=tm_nrf "Sample code for TransferManager.uploadFromNonReplayableFlowable")] \n
332+
* For more samples, please see the [Samples file](%https://github.com/Azure/azure-storage-java/blob/master/src/test/java/com/microsoft/azure/storage/Samples.java)
333+
*/
334+
public static Single<BlockBlobCommitBlockListResponse> uploadFromNonReplayableFlowable(
335+
final Flowable<ByteBuffer> source, final BlockBlobURL blockBlobURL, final int blockSize,
336+
final int numBuffers, final TransferManagerUploadToBlockBlobOptions options) {
337+
Utility.assertNotNull("source", source);
338+
Utility.assertNotNull("blockBlobURL", blockBlobURL);
339+
340+
TransferManagerUploadToBlockBlobOptions optionsReal = options == null ?
341+
TransferManagerUploadToBlockBlobOptions.DEFAULT : options;
342+
343+
// See ProgressReporter for an explanation on why this lock is necessary and why we use AtomicLong.
344+
AtomicLong totalProgress = new AtomicLong(0);
345+
Lock progressLock = new ReentrantLock();
346+
347+
// Validation done in the constructor.
348+
UploadFromNRFBufferPool pool = new UploadFromNRFBufferPool(numBuffers, blockSize);
349+
350+
/*
351+
Break the source flowable into chunks that are <= chunk size. This makes filling the pooled buffers much easier
352+
as we can guarantee we only need at most two buffers for any call to write (two in the case of one pool buffer
353+
filling up with more data to write)
354+
*/
355+
Flowable<ByteBuffer> chunkedSource = source.flatMap(buffer -> {
356+
if (buffer.remaining() <= blockSize) {
357+
return Flowable.just(buffer);
358+
}
359+
List<ByteBuffer> smallerChunks = new ArrayList<>();
360+
for (int i=0; i < Math.ceil(buffer.remaining() / (double)blockSize); i++) {
361+
// Note that duplicate does not duplicate data. It simply creates a duplicate view of the data.
362+
ByteBuffer duplicate = buffer.duplicate();
363+
duplicate.position(i * blockSize);
364+
duplicate.limit(Math.min(duplicate.limit(), (i+1) * blockSize));
365+
smallerChunks.add(duplicate);
366+
}
367+
return Flowable.fromIterable(smallerChunks);
368+
}, false, 1);
369+
370+
/*
371+
Write each buffer from the chunkedSource to the pool and call flush at the end to get the last bits.
372+
*/
373+
return chunkedSource.flatMap(pool::write, false, 1)
374+
.concatWith(Flowable.defer(pool::flush))
375+
.concatMapEager(buffer -> {
376+
// Report progress as necessary.
377+
Flowable<ByteBuffer> data = ProgressReporter.addParallelProgressReporting(Flowable.just(buffer),
378+
optionsReal.progressReceiver(), progressLock, totalProgress);
379+
380+
final String blockId = Base64.getEncoder().encodeToString(
381+
UUID.randomUUID().toString().getBytes());
382+
383+
/*
384+
Make a call to stageBlock. Instead of emitting the response, which we don't care about other
385+
than that it was successful, emit the blockId for this request. These will be collected below.
386+
Turn that into an Observable which emits one item to comply with the signature of
387+
concatMapEager.
388+
*/
389+
return blockBlobURL.stageBlock(blockId, data,
390+
buffer.remaining(), optionsReal.accessConditions().leaseAccessConditions(), null)
391+
.map(x -> {
392+
pool.returnBuffer(buffer);
393+
return blockId;
394+
}).toFlowable();
395+
396+
/*
397+
Specify the number of concurrent subscribers to this map. This determines how many concurrent
398+
rest calls are made. This is so because maxConcurrency is the number of internal subscribers
399+
available to subscribe to the Observables emitted by the source. A subscriber is not released
400+
for a new subscription until its Observable calls onComplete, which here means that the call to
401+
stageBlock is finished. Prefetch is a hint that each of the Observables emitted by the source
402+
will emit only one value, which is true here because we have converted from a Single.
403+
*/
404+
}, optionsReal.parallelism(), 1)
405+
/*
406+
collectInto will gather each of the emitted blockIds into a list. Because we used concatMap, the Ids
407+
will be emitted according to their block number, which means the list generated here will be
408+
properly ordered. This also converts into a Single.
409+
*/
410+
.collectInto(new ArrayList<String>(), ArrayList::add)
411+
/*
412+
collectInto will not emit the list until its source calls onComplete. This means that by the time we
413+
call stageBlock list, all of the stageBlock calls will have finished. By flatMapping the list, we
414+
can "map" it into a call to commitBlockList.
415+
*/
416+
.flatMap(ids ->
417+
blockBlobURL.commitBlockList(ids, optionsReal.httpHeaders(), optionsReal.metadata(),
418+
optionsReal.accessConditions(), null));
419+
420+
}
289421
}
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Copyright Microsoft Corporation
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package com.microsoft.azure.storage.blob;
17+
18+
import io.reactivex.Flowable;
19+
20+
import java.nio.ByteBuffer;
21+
import java.util.concurrent.BlockingQueue;
22+
import java.util.concurrent.LinkedBlockingQueue;
23+
import java.util.concurrent.locks.Lock;
24+
import java.util.concurrent.locks.ReentrantLock;
25+
26+
/**
27+
* This type is to support the implementation of uploadFromNonReplaybleFlowable only. It is mandatory that the caller
28+
* has broken the source into ByteBuffers that are no greater than the size of a chunk and therefore a buffer in the
29+
* pool. This is necessary because it upper bounds the number of buffers we need for a given call to write() to 2. If
30+
* the size of ByteBuffer passed into write() were unbounded, the pool could stall as it would run out of buffers before
31+
* it is able to return a result, and if it is unable to return, no data can be uploaded and therefore no pools
32+
* returned.
33+
*
34+
* It is incumbent upon the caller to return the buffers after an upload is completed. It is also the caller's
35+
* responsibility to signal to the pool when the stream is empty and call flush to return any data still sitting in
36+
* the pool.
37+
*
38+
* Broadly, the workflow of this operation is to chunk the source into reasonable sized pieces. On each piece, one
39+
* thread will call write on the pool. The pool will grab a buffer from the queue to write to, possibly waiting for one
40+
* to be available, and either store the incomplete buffer to be filled on the next write or return the buffer to be
41+
* sent. Filled buffers can be uploaded in parallel and should return buffers to the pool after the upload completes.
42+
* Once the source terminates, it should call flush.
43+
*/
44+
final class UploadFromNRFBufferPool {
45+
46+
private final BlockingQueue<ByteBuffer> buffers;
47+
48+
private final int maxBuffs;
49+
50+
private int numBuffs = 0;
51+
52+
private final int buffSize;
53+
54+
private ByteBuffer currentBuf;
55+
56+
UploadFromNRFBufferPool(final int numBuffs, final int buffSize) {
57+
/*
58+
We require at least two buffers because it is possible that a given write will spill over into a second buffer.
59+
We only need one overflow buffer because the max size of a ByteBuffer is assumed to be the size as a buffer in
60+
the pool.
61+
*/
62+
Utility.assertInBounds("numBuffs", numBuffs, 2, Integer.MAX_VALUE);
63+
this.maxBuffs = numBuffs;
64+
buffers = new LinkedBlockingQueue<>(numBuffs);
65+
66+
67+
//These buffers will be used in calls to stageBlock, so they must be no greater than block size.
68+
Utility.assertInBounds("buffSize", buffSize, 1, BlockBlobURL.MAX_STAGE_BLOCK_BYTES);
69+
this.buffSize = buffSize;
70+
71+
//We prep the queue with two buffers in case there is overflow.
72+
buffers.add(ByteBuffer.allocate(this.buffSize));
73+
buffers.add(ByteBuffer.allocate(this.buffSize));
74+
this.numBuffs = 2;
75+
}
76+
77+
public Flowable<ByteBuffer> write(ByteBuffer buf) {
78+
// Check if there's a buffer holding any data from a previous call to write. If not, get a new one.
79+
if (this.currentBuf == null) {
80+
this.currentBuf = this.getBuffer();
81+
}
82+
83+
Flowable<ByteBuffer> result;
84+
// We can fit this whole write in the buffer we currently have.
85+
if (this.currentBuf.remaining() >= buf.remaining()) {
86+
this.currentBuf.put(buf);
87+
if (this.currentBuf.remaining() == 0) {
88+
// Reset the position so that we can read the whole thing then return this buffer.
89+
this.currentBuf.position(0);
90+
result = Flowable.just(this.currentBuf);
91+
// This will force us to get a new buffer next time we try to write.
92+
this.currentBuf = null;
93+
}
94+
else {
95+
/*
96+
We are still filling the current buffer, so we have no data to return. We will return the buffer once it
97+
is filled
98+
*/
99+
result = Flowable.empty();
100+
}
101+
}
102+
// We will overflow the current buffer and require another one.
103+
else {
104+
// Adjust the window of buf so that we fill up currentBuf without going out of bounds.
105+
int oldLimit = buf.limit();
106+
buf.limit(buf.position() + this.currentBuf.remaining());
107+
this.currentBuf.put(buf);
108+
// Set the old limit so we can read to the end in the next buffer.
109+
buf.limit(oldLimit);
110+
111+
// Reset the position so we can read the buffer.
112+
this.currentBuf.position(0);
113+
result = Flowable.just(this.currentBuf);
114+
115+
/*
116+
Get a new buffer and fill it with whatever is left from buf. Note that this relies on the assumption that
117+
the source Flowable has been split up into buffers that are no bigger than chunk size. This assumption
118+
means we'll only have to over flow once, and the buffer we overflow into will not be filled. This is the
119+
buffer we will write to on the next call to write().
120+
*/
121+
this.currentBuf = this.getBuffer();
122+
this.currentBuf.put(buf);
123+
}
124+
return result;
125+
}
126+
127+
private ByteBuffer getBuffer() {
128+
ByteBuffer result;
129+
// There are no buffers in the queue and we have space to allocate one.
130+
if (this.buffers.isEmpty() && this.numBuffs < this.maxBuffs) {
131+
result = ByteBuffer.allocate(this.buffSize);
132+
this.numBuffs++;
133+
}
134+
else {
135+
try {
136+
// If empty, this will wait for an upload to finish and return a buffer.
137+
result = this.buffers.take();
138+
139+
} catch (InterruptedException e) {
140+
throw new IllegalStateException("UploadFromStream thread interrupted." + " Thread:" +
141+
Thread.currentThread().getId());
142+
}
143+
}
144+
return result;
145+
}
146+
147+
Flowable<ByteBuffer> flush() {
148+
/*
149+
Prep and return any data left in the pool. It is important to set the limit so that we don't read beyond the
150+
actual data as this buffer may have been used before and therefore may have some garbage at the end.
151+
*/
152+
if (this.currentBuf != null) {
153+
this.currentBuf.flip();
154+
ByteBuffer last = this.currentBuf;
155+
// If there is an accidental duplicate call to flush, this prevents sending the last buffer twice
156+
this.currentBuf = null;
157+
return Flowable.just(last);
158+
}
159+
return Flowable.empty();
160+
}
161+
162+
void returnBuffer(ByteBuffer b) {
163+
// Reset the buffer.
164+
b.position(0);
165+
b.limit(b.capacity());
166+
167+
try {
168+
this.buffers.put(b);
169+
}
170+
catch (InterruptedException e) {
171+
throw new IllegalStateException("UploadFromStream thread interrupted.");
172+
}
173+
}
174+
}

0 commit comments

Comments
 (0)