Skip to content

HADOOP-17770 WASB : Support disabling buffered reads in positional reads #3149

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 13, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -241,6 +242,16 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
*/
public static final String KEY_ENABLE_FLAT_LISTING = "fs.azure.flatlist.enable";

/**
* Optional config to enable a lock free pread which will bypass buffer in
* BlockBlobInputStream.
* This is not a config which can be set at cluster level. It can be used as
* an option on FutureDataInputStreamBuilder.
* @see FileSystem#openFile(org.apache.hadoop.fs.Path)
*/
public static final String FS_AZURE_BLOCK_BLOB_BUFFERED_PREAD_DISABLE =
"fs.azure.block.blob.buffered.pread.disable";

/**
* The set of directories where we should apply atomic folder rename
* synchronized with createNonRecursive.
Expand Down Expand Up @@ -1591,18 +1602,22 @@ private OutputStream openOutputStream(final CloudBlobWrapper blob)
* Opens a new input stream for the given blob (page or block blob)
* to read its data.
*/
private InputStream openInputStream(CloudBlobWrapper blob)
throws StorageException, IOException {
private InputStream openInputStream(CloudBlobWrapper blob,
Optional<Configuration> options) throws StorageException, IOException {
if (blob instanceof CloudBlockBlobWrapper) {
LOG.debug("Using stream seek algorithm {}", inputStreamVersion);
switch(inputStreamVersion) {
case 1:
return blob.openInputStream(getDownloadOptions(),
getInstrumentedContext(isConcurrentOOBAppendAllowed()));
case 2:
boolean bufferedPreadDisabled = options.map(c -> c
.getBoolean(FS_AZURE_BLOCK_BLOB_BUFFERED_PREAD_DISABLE, false))
.orElse(false);
return new BlockBlobInputStream((CloudBlockBlobWrapper) blob,
getDownloadOptions(),
getInstrumentedContext(isConcurrentOOBAppendAllowed()));
getInstrumentedContext(isConcurrentOOBAppendAllowed()),
bufferedPreadDisabled);
default:
throw new IOException("Unknown seek algorithm: " + inputStreamVersion);
}
Expand Down Expand Up @@ -2290,6 +2305,12 @@ public InputStream retrieve(String key) throws AzureException, IOException {
@Override
public InputStream retrieve(String key, long startByteOffset)
throws AzureException, IOException {
return retrieve(key, startByteOffset, Optional.empty());
}

@Override
public InputStream retrieve(String key, long startByteOffset,
Optional<Configuration> options) throws AzureException, IOException {
try {
// Check if a session exists, if not create a session with the
// Azure storage server.
Expand All @@ -2301,7 +2322,7 @@ public InputStream retrieve(String key, long startByteOffset)
}
checkContainer(ContainerAccessType.PureRead);

InputStream inputStream = openInputStream(getBlobReference(key));
InputStream inputStream = openInputStream(getBlobReference(key), options);
if (startByteOffset > 0) {
// Skip bytes and ignore return value. This is okay
// because if you try to skip too far you will be positioned
Expand Down Expand Up @@ -2852,7 +2873,7 @@ public void rename(String srcKey, String dstKey, boolean acquireLease,
OutputStream opStream = null;
try {
if (srcBlob.getProperties().getBlobType() == BlobType.PAGE_BLOB){
ipStream = openInputStream(srcBlob);
ipStream = openInputStream(srcBlob, Optional.empty());
opStream = openOutputStream(dstBlob);
byte[] buffer = new byte[PageBlobFormatHelpers.PAGE_SIZE];
int len;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,19 @@
import com.microsoft.azure.storage.blob.BlobRequestOptions;

import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;

/**
* Encapsulates the BlobInputStream used by block blobs and adds support for
* random access and seek. Random access performance is improved by several
* orders of magnitude.
*/
final class BlockBlobInputStream extends InputStream implements Seekable {
final class BlockBlobInputStream extends FSInputStream {
private final CloudBlockBlobWrapper blob;
private final BlobRequestOptions options;
private final OperationContext opContext;
private final boolean bufferedPreadDisabled;
private InputStream blobInputStream = null;
private int minimumReadSizeInBytes = 0;
private long streamPositionAfterLastRead = -1;
Expand All @@ -62,12 +63,13 @@ final class BlockBlobInputStream extends InputStream implements Seekable {
* @param opContext the blob operation context.
* @throws IOException IO failure
*/
BlockBlobInputStream(CloudBlockBlobWrapper blob,
BlobRequestOptions options,
OperationContext opContext) throws IOException {
BlockBlobInputStream(CloudBlockBlobWrapper blob, BlobRequestOptions options,
OperationContext opContext, boolean bufferedPreadDisabled)
throws IOException {
this.blob = blob;
this.options = options;
this.opContext = opContext;
this.bufferedPreadDisabled = bufferedPreadDisabled;

this.minimumReadSizeInBytes = blob.getStreamMinimumReadSizeInBytes();

Expand Down Expand Up @@ -263,6 +265,39 @@ private int doNetworkRead(byte[] buffer, int offset, int len)
}
}

@Override
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
synchronized (this) {
checkState();
}
if (!bufferedPreadDisabled) {
// This will do a seek + read in which the streamBuffer will get used.
return super.read(position, buffer, offset, length);
}
validatePositionedReadArgs(position, buffer, offset, length);
if (length == 0) {
return 0;
}
if (position >= streamLength) {
throw new EOFException("position is beyond stream capacity");
}
MemoryOutputStream os = new MemoryOutputStream(buffer, offset, length);
long bytesToRead = Math.min(minimumReadSizeInBytes,
Math.min(os.capacity(), streamLength - position));
try {
blob.downloadRange(position, bytesToRead, os, options, opContext);
} catch (StorageException e) {
throw new IOException(e);
}
int bytesRead = os.size();
if (bytesRead == 0) {
// This may happen if the blob was modified after the length was obtained.
throw new EOFException("End of stream reached unexpectedly.");
}
return bytesRead;
}

/**
* Reads up to <code>len</code> bytes of data from the input stream into an
* array of bytes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@
import java.util.EnumSet;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Stack;
import java.util.HashMap;

Expand All @@ -61,6 +64,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
Expand All @@ -70,6 +74,8 @@
import org.apache.hadoop.fs.azure.security.Constants;
import org.apache.hadoop.fs.azure.security.RemoteWasbDelegationTokenManager;
import org.apache.hadoop.fs.azure.security.WasbDelegationTokenManager;
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
Expand All @@ -79,6 +85,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;

Expand Down Expand Up @@ -915,6 +922,43 @@ public synchronized int read(byte[] b, int off, int len) throws FileNotFoundExce
}
}

@Override
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
// SpotBugs reports bug type IS2_INCONSISTENT_SYNC here.
// This report is not valid here.
// 'this.in' is instance of BlockBlobInputStream and read(long, byte[], int, int)
// calls it's Super class method when 'fs.azure.block.blob.buffered.pread.disable'
// is configured false. Super class FSInputStream's implementation is having
// proper synchronization.
// When 'fs.azure.block.blob.buffered.pread.disable' is true, we want a lock free
// implementation of blob read. Here we don't use any of the InputStream's
// shared resource (buffer) and also don't change any cursor position etc.
// So its safe to go with unsynchronized way of read.
if (in instanceof PositionedReadable) {
try {
int result = ((PositionedReadable) this.in).read(position, buffer,
offset, length);
if (null != statistics && result > 0) {
statistics.incrementBytesRead(result);
}
return result;
} catch (IOException e) {
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException) {
LOG.error("Encountered Storage Exception for read on Blob : {}"
+ " Exception details: {} Error Code : {}",
key, e, ((StorageException) innerException).getErrorCode());
if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
}
throw e;
}
}
return super.read(position, buffer, offset, length);
}

@Override
public synchronized void close() throws IOException {
if (!closed) {
Expand Down Expand Up @@ -3043,6 +3087,12 @@ public boolean mkdirs(Path f, FsPermission permission, boolean noUmask) throws I

@Override
public FSDataInputStream open(Path f, int bufferSize) throws FileNotFoundException, IOException {
return open(f, bufferSize, Optional.empty());
}

private FSDataInputStream open(Path f, int bufferSize,
Optional<Configuration> options)
throws FileNotFoundException, IOException {

LOG.debug("Opening file: {}", f.toString());

Expand Down Expand Up @@ -3077,7 +3127,7 @@ public FSDataInputStream open(Path f, int bufferSize) throws FileNotFoundExcepti

InputStream inputStream;
try {
inputStream = store.retrieve(key);
inputStream = store.retrieve(key, 0, options);
} catch(Exception ex) {
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);

Expand All @@ -3094,6 +3144,18 @@ public FSDataInputStream open(Path f, int bufferSize) throws FileNotFoundExcepti
new NativeAzureFsInputStream(inputStream, key, meta.getLen()), bufferSize));
}

@Override
protected CompletableFuture<FSDataInputStream> openFileWithOptions(Path path,
OpenFileParameters parameters) throws IOException {
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
parameters.getMandatoryKeys(),
Collections.emptySet(),
"for " + path);
return LambdaUtils.eval(
new CompletableFuture<>(), () ->
open(path, parameters.getBufferSize(), Optional.of(parameters.getOptions())));
}

@Override
public boolean rename(Path src, Path dst) throws FileNotFoundException, IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.InputStream;
import java.net.URI;
import java.util.Date;
import java.util.Optional;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -50,6 +51,9 @@ void storeEmptyFolder(String key, PermissionStatus permissionStatus)

InputStream retrieve(String key, long byteRangeStart) throws IOException;

InputStream retrieve(String key, long byteRangeStart,
Optional<Configuration> options) throws IOException;

DataOutputStream storefile(String keyEncoded,
PermissionStatus permissionStatus,
String key) throws AzureException;
Expand Down
11 changes: 11 additions & 0 deletions hadoop-tools/hadoop-azure/src/site/markdown/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,17 @@ The maximum number of entries that that cache can hold can be customized using t
</property>
```

### Performance optimization configurations

`fs.azure.block.blob.buffered.pread.disable`: By default the positional read API will do a
seek and read on input stream. This read will fill the buffer cache in
BlockBlobInputStream. If this configuration is true it will skip usage of buffer and do a
lock free call for reading from blob. This optimization is very much helpful for HBase kind
of short random read over a shared InputStream instance.
Note: This is not a config which can be set at cluster level. It can be used as
an option on FutureDataInputStreamBuilder.
See FileSystem#openFile(Path path)

## Further Reading

* [Testing the Azure WASB client](testing_azure.html).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.integration.AbstractAzureScaleTest;
import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
Expand Down Expand Up @@ -306,6 +307,61 @@ private void verifyConsistentReads(FSDataInputStream inputStreamV1,
assertArrayEquals("Mismatch in read data", bufferV1, bufferV2);
}

@Test
public void test_202_PosReadTest() throws Exception {
assumeHugeFileExists();
FutureDataInputStreamBuilder builder = accountUsingInputStreamV2
.getFileSystem().openFile(TEST_FILE_PATH);
builder.opt(AzureNativeFileSystemStore.FS_AZURE_BLOCK_BLOB_BUFFERED_PREAD_DISABLE, true);
try (
FSDataInputStream inputStreamV1
= accountUsingInputStreamV1.getFileSystem().open(TEST_FILE_PATH);
FSDataInputStream inputStreamV2
= accountUsingInputStreamV2.getFileSystem().open(TEST_FILE_PATH);
FSDataInputStream inputStreamV2NoBuffer = builder.build().get();
) {
final int bufferSize = 4 * KILOBYTE;
byte[] bufferV1 = new byte[bufferSize];
byte[] bufferV2 = new byte[bufferSize];
byte[] bufferV2NoBuffer = new byte[bufferSize];

verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, 0,
bufferV1, bufferV2, bufferV2NoBuffer);

int pos = 2 * KILOBYTE;
verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, pos,
bufferV1, bufferV2, bufferV2NoBuffer);

pos = 10 * KILOBYTE;
verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, pos,
bufferV1, bufferV2, bufferV2NoBuffer);

pos = 4100 * KILOBYTE;
verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, pos,
bufferV1, bufferV2, bufferV2NoBuffer);
}
}

private void verifyConsistentReads(FSDataInputStream inputStreamV1,
FSDataInputStream inputStreamV2, FSDataInputStream inputStreamV2NoBuffer,
int pos, byte[] bufferV1, byte[] bufferV2, byte[] bufferV2NoBuffer)
throws IOException {
int size = bufferV1.length;
int numBytesReadV1 = inputStreamV1.read(pos, bufferV1, 0, size);
assertEquals("Bytes read from V1 stream", size, numBytesReadV1);

int numBytesReadV2 = inputStreamV2.read(pos, bufferV2, 0, size);
assertEquals("Bytes read from V2 stream", size, numBytesReadV2);

int numBytesReadV2NoBuffer = inputStreamV2NoBuffer.read(pos,
bufferV2NoBuffer, 0, size);
assertEquals("Bytes read from V2 stream (buffered pread disabled)", size,
numBytesReadV2NoBuffer);

assertArrayEquals("Mismatch in read data", bufferV1, bufferV2);
assertArrayEquals("Mismatch in read data", bufferV2, bufferV2NoBuffer);
}

/**
* Validates the implementation of InputStream.markSupported.
* @throws IOException
Expand Down