Skip to content

Commit aa4b43c

Browse files
committed
HADOOP-17770 WASB : Support disabling buffered reads in positional reads
1 parent 0934e78 commit aa4b43c

File tree

4 files changed

+123
-11
lines changed

4 files changed

+123
-11
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.Locale;
4242
import java.util.Map;
4343
import java.util.Map.Entry;
44+
import java.util.Optional;
4445
import java.util.Set;
4546

4647
import org.apache.commons.lang3.StringUtils;
@@ -241,6 +242,16 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
241242
*/
242243
public static final String KEY_ENABLE_FLAT_LISTING = "fs.azure.flatlist.enable";
243244

245+
/**
246+
* Optional config to enable a lock free pread which will bypass buffer in
247+
* BlockBlobInputStream.
248+
* This is not a config which can be set at cluster level. It can be used as
249+
* an option on FutureDataInputStreamBuilder.
250+
* @see FileSystem#openFile(org.apache.hadoop.fs.Path)
251+
*/
252+
public static final String FS_AZURE_BLOCK_BLOB_BUFFERED_PREAD_DISABLE =
253+
"fs.azure.block.blob.buffered.pread.disable";
254+
244255
/**
245256
* The set of directories where we should apply atomic folder rename
246257
* synchronized with createNonRecursive.
@@ -1591,18 +1602,22 @@ private OutputStream openOutputStream(final CloudBlobWrapper blob)
15911602
* Opens a new input stream for the given blob (page or block blob)
15921603
* to read its data.
15931604
*/
1594-
private InputStream openInputStream(CloudBlobWrapper blob)
1595-
throws StorageException, IOException {
1605+
private InputStream openInputStream(CloudBlobWrapper blob,
1606+
Optional<Configuration> options) throws StorageException, IOException {
15961607
if (blob instanceof CloudBlockBlobWrapper) {
15971608
LOG.debug("Using stream seek algorithm {}", inputStreamVersion);
15981609
switch(inputStreamVersion) {
15991610
case 1:
16001611
return blob.openInputStream(getDownloadOptions(),
16011612
getInstrumentedContext(isConcurrentOOBAppendAllowed()));
16021613
case 2:
1614+
boolean bufferedPreadDisabled = options.map(c -> c
1615+
.getBoolean(FS_AZURE_BLOCK_BLOB_BUFFERED_PREAD_DISABLE, false))
1616+
.orElse(false);
16031617
return new BlockBlobInputStream((CloudBlockBlobWrapper) blob,
16041618
getDownloadOptions(),
1605-
getInstrumentedContext(isConcurrentOOBAppendAllowed()));
1619+
getInstrumentedContext(isConcurrentOOBAppendAllowed()),
1620+
bufferedPreadDisabled);
16061621
default:
16071622
throw new IOException("Unknown seek algorithm: " + inputStreamVersion);
16081623
}
@@ -2290,6 +2305,12 @@ public InputStream retrieve(String key) throws AzureException, IOException {
22902305
@Override
22912306
public InputStream retrieve(String key, long startByteOffset)
22922307
throws AzureException, IOException {
2308+
return retrieve(key, startByteOffset, Optional.empty());
2309+
}
2310+
2311+
@Override
2312+
public InputStream retrieve(String key, long startByteOffset,
2313+
Optional<Configuration> options) throws AzureException, IOException {
22932314
try {
22942315
// Check if a session exists, if not create a session with the
22952316
// Azure storage server.
@@ -2301,7 +2322,7 @@ public InputStream retrieve(String key, long startByteOffset)
23012322
}
23022323
checkContainer(ContainerAccessType.PureRead);
23032324

2304-
InputStream inputStream = openInputStream(getBlobReference(key));
2325+
InputStream inputStream = openInputStream(getBlobReference(key), options);
23052326
if (startByteOffset > 0) {
23062327
// Skip bytes and ignore return value. This is okay
23072328
// because if you try to skip too far you will be positioned
@@ -2852,7 +2873,7 @@ public void rename(String srcKey, String dstKey, boolean acquireLease,
28522873
OutputStream opStream = null;
28532874
try {
28542875
if (srcBlob.getProperties().getBlobType() == BlobType.PAGE_BLOB){
2855-
ipStream = openInputStream(srcBlob);
2876+
ipStream = openInputStream(srcBlob, Optional.empty());
28562877
opStream = openOutputStream(dstBlob);
28572878
byte[] buffer = new byte[PageBlobFormatHelpers.PAGE_SIZE];
28582879
int len;

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,19 @@
2828
import com.microsoft.azure.storage.blob.BlobRequestOptions;
2929

3030
import org.apache.hadoop.fs.FSExceptionMessages;
31-
import org.apache.hadoop.fs.Seekable;
31+
import org.apache.hadoop.fs.FSInputStream;
3232
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
3333

3434
/**
3535
* Encapsulates the BlobInputStream used by block blobs and adds support for
3636
* random access and seek. Random access performance is improved by several
3737
* orders of magnitude.
3838
*/
39-
final class BlockBlobInputStream extends InputStream implements Seekable {
39+
final class BlockBlobInputStream extends FSInputStream {
4040
private final CloudBlockBlobWrapper blob;
4141
private final BlobRequestOptions options;
4242
private final OperationContext opContext;
43+
private final boolean bufferedPreadDisabled;
4344
private InputStream blobInputStream = null;
4445
private int minimumReadSizeInBytes = 0;
4546
private long streamPositionAfterLastRead = -1;
@@ -62,12 +63,13 @@ final class BlockBlobInputStream extends InputStream implements Seekable {
6263
* @param opContext the blob operation context.
6364
* @throws IOException IO failure
6465
*/
65-
BlockBlobInputStream(CloudBlockBlobWrapper blob,
66-
BlobRequestOptions options,
67-
OperationContext opContext) throws IOException {
66+
BlockBlobInputStream(CloudBlockBlobWrapper blob, BlobRequestOptions options,
67+
OperationContext opContext, boolean bufferedPreadDisabled)
68+
throws IOException {
6869
this.blob = blob;
6970
this.options = options;
7071
this.opContext = opContext;
72+
this.bufferedPreadDisabled = bufferedPreadDisabled;
7173

7274
this.minimumReadSizeInBytes = blob.getStreamMinimumReadSizeInBytes();
7375

@@ -263,6 +265,39 @@ private int doNetworkRead(byte[] buffer, int offset, int len)
263265
}
264266
}
265267

268+
@Override
269+
public int read(long position, byte[] buffer, int offset, int length)
270+
throws IOException {
271+
synchronized (this) {
272+
checkState();
273+
}
274+
if (!bufferedPreadDisabled) {
275+
// This will do a seek + read in which the streamBuffer will get used.
276+
return super.read(position, buffer, offset, length);
277+
}
278+
validatePositionedReadArgs(position, buffer, offset, length);
279+
if (length == 0) {
280+
return 0;
281+
}
282+
if (position >= streamLength) {
283+
throw new EOFException("position is beyond stream capacity");
284+
}
285+
MemoryOutputStream os = new MemoryOutputStream(buffer, offset, length);
286+
long bytesToRead = Math.min(minimumReadSizeInBytes,
287+
Math.min(os.capacity(), streamLength - position));
288+
try {
289+
blob.downloadRange(position, bytesToRead, os, options, opContext);
290+
} catch (StorageException e) {
291+
throw new IOException(e);
292+
}
293+
int bytesRead = os.size();
294+
if (bytesRead == 0) {
295+
// This may happen if the blob was modified after the length was obtained.
296+
throw new EOFException("End of stream reached unexpectedly.");
297+
}
298+
return bytesRead;
299+
}
300+
266301
/**
267302
* Reads up to <code>len</code> bytes of data from the input stream into an
268303
* array of bytes.

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,14 @@
3333
import java.util.EnumSet;
3434
import java.util.TimeZone;
3535
import java.util.UUID;
36+
import java.util.concurrent.CompletableFuture;
3637
import java.util.concurrent.atomic.AtomicInteger;
3738
import java.util.regex.Matcher;
3839
import java.util.regex.Pattern;
3940
import java.util.Arrays;
41+
import java.util.Collections;
4042
import java.util.List;
43+
import java.util.Optional;
4144
import java.util.Stack;
4245
import java.util.HashMap;
4346

@@ -61,6 +64,7 @@
6164
import org.apache.hadoop.fs.FileStatus;
6265
import org.apache.hadoop.fs.FileSystem;
6366
import org.apache.hadoop.fs.Path;
67+
import org.apache.hadoop.fs.PositionedReadable;
6468
import org.apache.hadoop.fs.Seekable;
6569
import org.apache.hadoop.fs.StreamCapabilities;
6670
import org.apache.hadoop.fs.Syncable;
@@ -70,6 +74,8 @@
7074
import org.apache.hadoop.fs.azure.security.Constants;
7175
import org.apache.hadoop.fs.azure.security.RemoteWasbDelegationTokenManager;
7276
import org.apache.hadoop.fs.azure.security.WasbDelegationTokenManager;
77+
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
78+
import org.apache.hadoop.fs.impl.OpenFileParameters;
7379
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
7480
import org.apache.hadoop.fs.permission.FsAction;
7581
import org.apache.hadoop.fs.permission.FsPermission;
@@ -79,6 +85,7 @@
7985
import org.apache.hadoop.security.UserGroupInformation;
8086
import org.apache.hadoop.security.token.Token;
8187
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
88+
import org.apache.hadoop.util.LambdaUtils;
8289
import org.apache.hadoop.util.Progressable;
8390
import org.apache.hadoop.util.Time;
8491

@@ -915,6 +922,33 @@ public synchronized int read(byte[] b, int off, int len) throws FileNotFoundExce
915922
}
916923
}
917924

925+
@Override
926+
public int read(long position, byte[] buffer, int offset, int length)
927+
throws IOException {
928+
if (in instanceof PositionedReadable) {
929+
try {
930+
int result = ((PositionedReadable) this.in).read(position, buffer,
931+
offset, length);
932+
if (null != statistics && result > 0) {
933+
statistics.incrementBytesRead(result);
934+
}
935+
return result;
936+
} catch (IOException e) {
937+
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
938+
if (innerException instanceof StorageException) {
939+
LOG.error("Encountered Storage Exception for read on Blob : {}"
940+
+ " Exception details: {} Error Code : {}",
941+
key, e, ((StorageException) innerException).getErrorCode());
942+
if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
943+
throw new FileNotFoundException(String.format("%s is not found", key));
944+
}
945+
}
946+
throw e;
947+
}
948+
}
949+
return super.read(position, buffer, offset, length);
950+
}
951+
918952
@Override
919953
public synchronized void close() throws IOException {
920954
if (!closed) {
@@ -3043,6 +3077,12 @@ public boolean mkdirs(Path f, FsPermission permission, boolean noUmask) throws I
30433077

30443078
@Override
30453079
public FSDataInputStream open(Path f, int bufferSize) throws FileNotFoundException, IOException {
3080+
return open(f, bufferSize, Optional.empty());
3081+
}
3082+
3083+
private FSDataInputStream open(Path f, int bufferSize,
3084+
Optional<Configuration> options)
3085+
throws FileNotFoundException, IOException {
30463086

30473087
LOG.debug("Opening file: {}", f.toString());
30483088

@@ -3077,7 +3117,7 @@ public FSDataInputStream open(Path f, int bufferSize) throws FileNotFoundExcepti
30773117

30783118
InputStream inputStream;
30793119
try {
3080-
inputStream = store.retrieve(key);
3120+
inputStream = store.retrieve(key, 0, options);
30813121
} catch(Exception ex) {
30823122
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
30833123

@@ -3094,6 +3134,18 @@ public FSDataInputStream open(Path f, int bufferSize) throws FileNotFoundExcepti
30943134
new NativeAzureFsInputStream(inputStream, key, meta.getLen()), bufferSize));
30953135
}
30963136

3137+
@Override
3138+
protected CompletableFuture<FSDataInputStream> openFileWithOptions(Path path,
3139+
OpenFileParameters parameters) throws IOException {
3140+
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
3141+
parameters.getMandatoryKeys(),
3142+
Collections.emptySet(),
3143+
"for " + path);
3144+
return LambdaUtils.eval(
3145+
new CompletableFuture<>(), () ->
3146+
open(path, parameters.getBufferSize(), Optional.of(parameters.getOptions())));
3147+
}
3148+
30973149
@Override
30983150
public boolean rename(Path src, Path dst) throws FileNotFoundException, IOException {
30993151

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.io.InputStream;
2424
import java.net.URI;
2525
import java.util.Date;
26+
import java.util.Optional;
2627

2728
import org.apache.hadoop.classification.InterfaceAudience;
2829
import org.apache.hadoop.conf.Configuration;
@@ -50,6 +51,9 @@ void storeEmptyFolder(String key, PermissionStatus permissionStatus)
5051

5152
InputStream retrieve(String key, long byteRangeStart) throws IOException;
5253

54+
InputStream retrieve(String key, long byteRangeStart,
55+
Optional<Configuration> options) throws IOException;
56+
5357
DataOutputStream storefile(String keyEncoded,
5458
PermissionStatus permissionStatus,
5559
String key) throws AzureException;

0 commit comments

Comments
 (0)