Skip to content

HADOOP-17250 ABFS short reads can be merged with readahead. #2307

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_READ_BUFFER_SIZE)
private int readBufferSize;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_READ_AHEAD_RANGE,
MinValue = MIN_BUFFER_SIZE,
MaxValue = MAX_BUFFER_SIZE,
DefaultValue = DEFAULT_READ_AHEAD_RANGE)
private int readAheadRange;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_MIN_BACKOFF_INTERVAL,
DefaultValue = DEFAULT_MIN_BACKOFF_INTERVAL)
private int minBackoffInterval;
Expand Down Expand Up @@ -765,6 +771,10 @@ public SASTokenProvider getSASTokenProvider() throws AzureBlobFileSystemExceptio
}
}

public int getReadAheadRange() {
return this.readAheadRange;
}

int validateInt(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
IntegerConfigurationValidatorAnnotation validator = field.getAnnotation(IntegerConfigurationValidatorAnnotation.class);
String value = get(validator.ConfigurationKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext() {
.withReadBufferSize(abfsConfiguration.getReadBufferSize())
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
.withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
.withReadAheadRange(abfsConfiguration.getReadAheadRange())
.withStreamStatistics(new AbfsInputStreamStatisticsImpl())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ public final class ConfigurationKeys {
public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = "fs.azure.write.max.requests.to.queue";
public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size";
public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size";

/**
* Read ahead range parameter which can be set by user.
* Default value is {@code FileSystemConfigurations#DEFAULT_READ_AHEAD_RANGE}.
* This might reduce number of calls to remote as next requested
* data could already be present in buffer.
*/
public static final String AZURE_READ_AHEAD_RANGE = "fs.azure.readahead.range";
public static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size";
public static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME = "fs.azure.block.location.impersonatedhost";
public static final String AZURE_CONCURRENT_CONNECTION_VALUE_OUT = "fs.azure.concurrentRequestCount.out";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public final class FileSystemConfigurations {
public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB; // 8 MB
public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
// Default value of read ahead range.
public static final int DEFAULT_READ_AHEAD_RANGE = 64 * ONE_KB; // 64 KB
public static final int MIN_BUFFER_SIZE = 16 * ONE_KB; // 16 KB
public static final int MAX_BUFFER_SIZE = 100 * ONE_MB; // 100 MB
public static final long MAX_AZURE_BLOCK_SIZE = 256 * 1024 * 1024L; // changing default abfs blocksize to 256MB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
private final String eTag; // eTag of the path when InputStream are created
private final boolean tolerateOobAppends; // whether tolerate Oob Appends
private final boolean readAheadEnabled; // whether enable readAhead;
// User configured size of read ahead.
private final int readAheadRange;

// SAS tokens can be re-used until they expire
private CachedSASToken cachedSasToken;
Expand All @@ -73,6 +75,12 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
private long bytesFromReadAhead; // bytes read from readAhead; for testing
private long bytesFromRemoteRead; // bytes read remotely; for testing

/**
* This is the actual position within the object, used by
* lazy seek to decide whether to seek on the next read or not.
*/
private long nextReadPos;

public AbfsInputStream(
final AbfsClient client,
final Statistics statistics,
Expand All @@ -88,6 +96,7 @@ public AbfsInputStream(
this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth();
this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
this.eTag = eTag;
this.readAheadRange = abfsInputStreamContext.getReadAheadRange();
this.readAheadEnabled = true;
this.cachedSasToken = new CachedSASToken(
abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
Expand Down Expand Up @@ -128,6 +137,22 @@ public synchronized int read(final byte[] b, final int off, final int len) throw
}
incrementReadOps();
do {
if (nextReadPos >= fCursor - limit && nextReadPos <= fCursor) {
// data can be read from buffer.
bCursor = (int) (nextReadPos - (fCursor - limit));

// When bCursor == limit, buffer will be filled again.
// So in this case we are not actually reading from buffer.
if (bCursor != limit) {
streamStatistics.seekInBuffer();
}
} else {
// Clearing the buffer and setting the file pointer
// based on previous seek() call.
fCursor = nextReadPos;
limit = 0;
bCursor = 0;
}
lastReadBytes = readOneBlock(b, currentOff, currentLen);
if (lastReadBytes > 0) {
currentOff += lastReadBytes;
Expand Down Expand Up @@ -180,9 +205,13 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO

// Enable readAhead when reading sequentially
if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
LOG.debug("Sequential read with read ahead size of {}", bufferSize);
bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
} else {
bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
// Enabling read ahead for random reads as well to reduce number of remote calls.
int lengthWithReadAhead = Math.min(b.length + readAheadRange, bufferSize);
LOG.debug("Random read with read ahead size of {}", lengthWithReadAhead);
bytesRead = readInternal(fCursor, buffer, 0, lengthWithReadAhead, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As with Parquet and ORC we have seen read patterns move from sequential to random and vice versa. That being the case would it not be better to read ahead to bufferSize always ? Providing options to read to lower bytes like 64 KB can actually lead to more IOPs. From our meeting yesterday too , one thing we all agree to was lower the IOPs better and also better to read more than smaller size.
So let remove the config for readAheadRange and instead always readAhead for whats configured for bufferSize.

Copy link
Contributor

@steveloughran steveloughran Sep 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the S3A experience (which didn't always read into a buffer, BTW), the "penalty" of having a large readahead range is there is more data to drain when you want to cancel the read (ie. a seek out of range).
That code does the draining in the active thread. If that were to be done in a background thread, the penalty of a larger readahead would be less, as you would only see a delay from the draining if there were no free HTTPS connections in the pool. Setting up a new HTTPS connection is expensive though. If there were no free HTTPS connections in the pool, you would be better off draining the stream in the active thread. Maybe.

(Disclaimer: all my claims about cost of HTTPS are based on S3 +Java7/8, and S3 is very slow to set up a connection. If the ADLS Gen2 store is faster to negotiate then it becomes a lot more justifiable to drain in a separate thread)

}

if (bytesRead == -1) {
Expand All @@ -200,6 +229,7 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO
int bytesToRead = Math.min(len, bytesRemaining);
System.arraycopy(buffer, bCursor, b, off, bytesToRead);
bCursor += bytesToRead;
nextReadPos += bytesToRead;
if (statistics != null) {
statistics.incrementBytesRead(bytesToRead);
}
Expand Down Expand Up @@ -278,12 +308,12 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti
final AbfsRestOperation op;
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) {
LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length);
op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get());
cachedSasToken.update(op.getSasToken());
if (streamStatistics != null) {
streamStatistics.remoteReadOperation();
}
LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length);
op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get());
cachedSasToken.update(op.getSasToken());
LOG.debug("issuing HTTP GET request params position = {} b.length = {} "
+ "offset = {} length = {}", position, b.length, offset, length);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
Expand Down Expand Up @@ -341,21 +371,9 @@ public synchronized void seek(long n) throws IOException {
streamStatistics.seek(n, fCursor);
}

if (n>=fCursor-limit && n<=fCursor) { // within buffer
bCursor = (int) (n-(fCursor-limit));
if (streamStatistics != null) {
streamStatistics.seekInBuffer();
}
return;
}

// next read will read from here
fCursor = n;
LOG.debug("set fCursor to {}", fCursor);

//invalidate buffer
limit = 0;
bCursor = 0;
nextReadPos = n;
LOG.debug("set nextReadPos to {}", nextReadPos);
}

@Override
Expand Down Expand Up @@ -426,7 +444,7 @@ public synchronized long getPos() throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
return fCursor - limit + bCursor;
return nextReadPos < 0 ? 0 : nextReadPos;
}

/**
Expand Down Expand Up @@ -492,6 +510,11 @@ byte[] getBuffer() {
return buffer;
}

@VisibleForTesting
public int getReadAheadRange() {
return readAheadRange;
}

@VisibleForTesting
protected void setCachedSasToken(final CachedSASToken cachedSasToken) {
this.cachedSasToken = cachedSasToken;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.fs.azurebfs.services;

import com.google.common.base.Preconditions;

/**
* Class to hold extra input stream configs.
*/
Expand All @@ -29,6 +31,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {

private boolean tolerateOobAppends;

private int readAheadRange;

private AbfsInputStreamStatistics streamStatistics;

public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
Expand All @@ -54,6 +58,12 @@ public AbfsInputStreamContext withTolerateOobAppends(
return this;
}

public AbfsInputStreamContext withReadAheadRange(
final int readAheadRange) {
this.readAheadRange = readAheadRange;
return this;
}

public AbfsInputStreamContext withStreamStatistics(
final AbfsInputStreamStatistics streamStatistics) {
this.streamStatistics = streamStatistics;
Expand All @@ -62,6 +72,8 @@ public AbfsInputStreamContext withStreamStatistics(

public AbfsInputStreamContext build() {
// Validation of parameters to be done here.
Preconditions.checkArgument(readAheadRange > 0,
"Read ahead range should be greater than 0");
return this;
}

Expand All @@ -77,6 +89,10 @@ public boolean isTolerateOobAppends() {
return tolerateOobAppends;
}

public int getReadAheadRange() {
return readAheadRange;
}

public AbfsInputStreamStatistics getStreamStatistics() {
return streamStatistics;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public void testSeekStatistics() throws IOException {
*/
for (int i = 0; i < OPERATIONS; i++) {
in.seek(0);
in.read();
in.seek(ONE_MB);
}

Expand Down Expand Up @@ -150,7 +151,7 @@ public void testSeekStatistics() throws IOException {
* are in buffer.
*
* seekInBuffer - Since all seeks were in buffer, the seekInBuffer
* would be equal to 2 * OPERATIONS.
* would be equal to OPERATIONS.
*
*/
assertEquals("Mismatch in seekOps value", 2 * OPERATIONS,
Expand All @@ -163,7 +164,7 @@ public void testSeekStatistics() throws IOException {
-1 * OPERATIONS * ONE_MB, stats.getBytesBackwardsOnSeek());
assertEquals("Mismatch in bytesSkippedOnSeek value",
0, stats.getBytesSkippedOnSeek());
assertEquals("Mismatch in seekInBuffer value", 2 * OPERATIONS,
assertEquals("Mismatch in seekInBuffer value", OPERATIONS,
stats.getSeekInBuffer());

in.close();
Expand Down Expand Up @@ -255,6 +256,7 @@ public void testWithNullStreamStatistics() throws IOException {
.withReadBufferSize(getConfiguration().getReadBufferSize())
.withReadAheadQueueDepth(getConfiguration().getReadAheadQueueDepth())
.withStreamStatistics(null)
.withReadAheadRange(getConfiguration().getReadAheadRange())
.build();

AbfsOutputStream out = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.fs.azurebfs.utils.Base64;

import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SSL_CHANNEL_MODE_KEY;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_AHEAD_RANGE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_MAX_RETRY_ATTEMPTS;
Expand Down Expand Up @@ -141,6 +142,7 @@ public void testConfigServiceImplAnnotatedFieldsInitialized() throws Exception {
assertEquals(DEFAULT_MAX_RETRY_ATTEMPTS, abfsConfiguration.getMaxIoRetries());
assertEquals(MAX_AZURE_BLOCK_SIZE, abfsConfiguration.getAzureBlockSize());
assertEquals(AZURE_BLOCK_LOCATION_HOST_DEFAULT, abfsConfiguration.getAzureBlockLocationHost());
assertEquals(DEFAULT_READ_AHEAD_RANGE, abfsConfiguration.getReadAheadRange());
}

@Test
Expand Down
Loading