Skip to content

HADOOP-18501: ABFS: Partial read should add to throttling data: DRAFT #5109

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 34 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
38e47fd
change for throttling on 206, if connection-rest happens with 206, ab…
saxenapranav Oct 28, 2022
ae92e58
byteRead to be taken from the abfsHttpOperation.getBytesReceived
saxenapranav Oct 28, 2022
bbf1597
mock classes for test
saxenapranav Oct 28, 2022
4b4b407
constructor of mockAbfsRestOperation in mockabfsclient
saxenapranav Oct 31, 2022
94b7b02
intercept functioning within the mock classes; need to add test-cases
saxenapranav Oct 31, 2022
7451d45
mock intercepts
saxenapranav Oct 31, 2022
6fd4871
added testPartialReadWithConnectionReset
saxenapranav Oct 31, 2022
be66257
make a call to actual server to check if the intermediate requests cr…
saxenapranav Nov 2, 2022
702e0d4
reading original file and asserting if bytes match.
saxenapranav Nov 3, 2022
3096159
contentLen - contentLenRecvd for metric push
saxenapranav Nov 4, 2022
8b3581e
general refactors
saxenapranav Nov 4, 2022
0a2cec1
asf license
saxenapranav Nov 4, 2022
4873711
removed Bool object.
saxenapranav Nov 4, 2022
5ee854d
Partial read parallel test fix
saxenapranav Nov 7, 2022
bd56ef4
checkstyle suggestions
saxenapranav Nov 7, 2022
e4a7684
AbfsClientThrottlingInterceptTestUtil to be final
saxenapranav Nov 7, 2022
8c5e578
CONNECTION_RESET constant usee
saxenapranav Nov 7, 2022
db860cb
review1 resolution
saxenapranav Nov 7, 2022
4874fa2
using org.assertj.core.api.Assertions
saxenapranav Nov 9, 2022
7ee87be
Merge branch 'trunk' into partialReadThrottle2
saxenapranav Nov 9, 2022
ea5193f
loop in case of 0B + readRemote fix.
saxenapranav Nov 10, 2022
7a38ad6
ITestPartialRead.testRecoverZeroBytePartialRead
saxenapranav Nov 10, 2022
59758f5
Added testZeroByteFileRead in ITestAbfsInputStream
saxenapranav Nov 14, 2022
59c3270
nit refactors
saxenapranav Nov 14, 2022
791bfc8
nit refactors
saxenapranav Nov 14, 2022
d29db88
boilerplate code for using spy instead of mock classes
saxenapranav Nov 15, 2022
0af5d22
test started to work with mockito
saxenapranav Nov 15, 2022
9e43cda
partial test with spy running
saxenapranav Nov 15, 2022
d48d9b4
tests fixed with new spy methodology
saxenapranav Nov 15, 2022
00aa655
extra-created mock classes are removed
saxenapranav Nov 15, 2022
2bdfbb4
general refactors
saxenapranav Nov 15, 2022
2f59672
documentation for mockClassInterceptor, mockClassUtils; javac issues
saxenapranav Nov 16, 2022
dcac6b6
refactor for uncheck and uncast issues
saxenapranav Nov 16, 2022
01d0680
suppressWarning annotation
saxenapranav Nov 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1106,4 +1106,8 @@ public void setEnableAbfsListIterator(boolean enableAbfsListIterator) {
this.enableAbfsListIterator = enableAbfsListIterator;
}

@VisibleForTesting
void setReadAheadQueueDepth(int depth) {
readAheadQueueDepth = depth;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,5 +112,7 @@ public final class AbfsHttpConstants {
public static final char CHAR_STAR = '*';
public static final char CHAR_PLUS = '+';

public static final String CONNECTION_RESET = "Connection reset";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jdk code doesn't expose any enum or constant, even they are using a string: https://github.com/openjdk-mirror/jdk/blob/jdk8u/jdk8u/master/src/share/classes/java/net/SocketInputStream.java#L209


private AbfsHttpConstants() {}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;

Expand Down Expand Up @@ -70,7 +71,7 @@ static void updateMetrics(AbfsRestOperationType operationType,
return;
}

int status = abfsHttpOperation.getStatusCode();
final int status = abfsHttpOperation.getStatusCode();
long contentLength = 0;
// If the socket is terminated prior to receiving a response, the HTTP
// status may be 0 or -1. A status less than 200 or greater than or equal
Expand All @@ -88,9 +89,24 @@ static void updateMetrics(AbfsRestOperationType operationType,
break;
case ReadFile:
String range = abfsHttpOperation.getConnection().getRequestProperty(HttpHeaderConfigurations.RANGE);
contentLength = getContentLengthIfKnown(range);
if (contentLength > 0) {
singleton.readThrottler.addBytesTransferred(contentLength,
long contentLengthRequested = getContentLengthIfKnown(range);
long bytesToBeAddedInMetric = contentLengthRequested;

long contentLengthReceived = abfsHttpOperation.getBytesReceived();
if (status == HttpURLConnection.HTTP_PARTIAL
&& contentLengthRequested > contentLengthReceived) {
/*
* In case of server response status == 206 (partial content) and the
* contentLength received is lesser than the requested length, we have
* to take it as a throttling case and hence we need to add the remaining
* bytes (contentLengthReceived - contentLength) in failure data-point.
* */
bytesToBeAddedInMetric = contentLengthRequested - contentLengthReceived;
isFailedOperation = true;
}
if (bytesToBeAddedInMetric > 0) {
singleton.readThrottler.addBytesTransferred(
bytesToBeAddedInMetric,
isFailedOperation);
}
break;
Expand Down Expand Up @@ -140,4 +156,19 @@ private static long getContentLengthIfKnown(String range) {
}
return contentLength;
}

@VisibleForTesting
void setReadThrottler(final AbfsClientThrottlingAnalyzer readThrottler) {
this.readThrottler = readThrottler;
}

@VisibleForTesting
AbfsClientThrottlingAnalyzer getReadThrottler() {
return readThrottler;
}

@VisibleForTesting
static AbfsClientThrottlingIntercept getSingleton() {
return singleton;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSocketFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;

Expand Down Expand Up @@ -563,4 +564,14 @@ public String getResponseHeader(final String httpHeader) {
return "";
}
}

@VisibleForTesting
protected void setStatusCode(int statusCode) {
this.statusCode = statusCode;
}

@VisibleForTesting
protected void setBytesReceived(long bytesReceived) {
this.bytesReceived = bytesReceived;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public synchronized int read(final byte[] b, final int off, final int len) throw
if (currentLen <= 0 || currentLen > b.length - currentOff) {
break;
}
} while (lastReadBytes > 0);
} while (lastReadBytes >= 0); //backend server can return 0B with 206 httpStatus.
return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
}

Expand Down Expand Up @@ -505,14 +505,14 @@ private int readInternal(final long position, final byte[] b, final int offset,
}
return receivedBytes;
}

// got nothing from read-ahead, do our own read now
receivedBytes = readRemote(position, b, offset, length, new TracingContext(tracingContext));
return receivedBytes;
} else {
LOG.debug("read ahead disabled, reading remote");
return readRemote(position, b, offset, length, new TracingContext(tracingContext));
LOG.debug("read ahead disabled");
}
LOG.debug("reading remote");
final long requiredLen = Math.min(length, contentLength - position);
return readRemote(position, b, offset, (int) requiredLen,
new TracingContext(tracingContext));

}

int readRemote(long position, byte[] b, int offset, int length, TracingContext tracingContext) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.HttpURLConnection;
import java.net.SocketException;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
Expand All @@ -37,6 +39,8 @@
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CONNECTION_RESET;

/**
* The AbfsRestOperation for Rest AbfsClient.
*/
Expand Down Expand Up @@ -92,6 +96,10 @@ public URL getUrl() {
return url;
}

public String getMethod() {
return method;
}

public List<AbfsHttpHeader> getRequestHeaders() {
return requestHeaders;
}
Expand Down Expand Up @@ -161,6 +169,7 @@ String getSasToken() {
* @param bufferLength The length of the data in the buffer.
* @param sasToken A sasToken for optional re-use by AbfsInputStream/AbfsOutputStream.
*/
@VisibleForTesting
AbfsRestOperation(AbfsRestOperationType operationType,
AbfsClient client,
String method,
Expand Down Expand Up @@ -244,7 +253,7 @@ private boolean executeHttpOperation(final int retryCount,
AbfsHttpOperation httpOperation = null;
try {
// initialize the HTTP request and open the connection
httpOperation = new AbfsHttpOperation(url, method, requestHeaders);
httpOperation = getHttpOperation();
incrementCounter(AbfsStatistic.CONNECTIONS_MADE, 1);
tracingContext.constructHeader(httpOperation);

Expand Down Expand Up @@ -311,6 +320,19 @@ private boolean executeHttpOperation(final int retryCount,
LOG.debug("HttpRequestFailure: {}, {}", httpOperation, ex);
}

/*
* In case of Connection_reset with status == 206 (partial-content)from
* server for read operation, the partial-result has to be sent back to
* AbfsInputStream which would retry for the remaining bytes.
* */
if (ex instanceof SocketException && CONNECTION_RESET.equals(
ex.getMessage())
&& httpOperation.getStatusCode() == HttpURLConnection.HTTP_PARTIAL
&& operationType == AbfsRestOperationType.ReadFile) {
result = httpOperation;
return true;
}

if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) {
throw new InvalidAbfsRestOperationException(ex);
}
Expand All @@ -331,6 +353,11 @@ private boolean executeHttpOperation(final int retryCount,
return true;
}

@VisibleForTesting
AbfsHttpOperation getHttpOperation() throws IOException {
return new AbfsHttpOperation(url, method, requestHeaders);
}

/**
* Incrementing Abfs counters with a long value.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ public AzureBlobFileSystem getFileSystem() throws IOException {
return abfs;
}

public void setClient(final AbfsClient abfsClient) throws IOException {
abfs.getAbfsStore().setClient(abfsClient);
}

public AzureBlobFileSystem getFileSystem(Configuration configuration) throws Exception{
final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(configuration);
return fs;
Expand Down
Loading