Skip to content

HADOOP-19531. [ABFS][FnsOverBlob] Streaming List Path Result Should Happen Inside Retry Loop #7582

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
import org.apache.http.client.methods.HttpGet;
Expand All @@ -47,6 +48,7 @@
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.util.EntityUtils;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
Expand Down Expand Up @@ -192,14 +194,26 @@ String getConnResponseMessage() throws IOException {
public void processResponse(final byte[] buffer,
final int offset,
final int length) throws IOException {
if (!isPayloadRequest) {
prepareRequest();
LOG.debug("Sending request: {}", httpRequestBase);
httpResponse = executeRequest();
LOG.debug("Request sent: {}; response {}", httpRequestBase,
httpResponse);
try {
if (!isPayloadRequest) {
prepareRequest();
LOG.debug("Sending request: {}", httpRequestBase);
httpResponse = executeRequest();
LOG.debug("Request sent: {}; response {}", httpRequestBase,
httpResponse);
}
parseResponseHeaderAndBody(buffer, offset, length);
} finally {
if (httpResponse != null) {
try {
EntityUtils.consume(httpResponse.getEntity());
} finally {
if (httpResponse instanceof CloseableHttpResponse) {
((CloseableHttpResponse) httpResponse).close();
}
}
}
}
parseResponseHeaderAndBody(buffer, offset, length);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1605,12 +1605,9 @@ public Hashtable<String, String> getXMSProperties(AbfsHttpOperation result)
@Override
public ListResponseData parseListPathResults(AbfsHttpOperation result, URI uri)
throws AzureBlobFileSystemException {
BlobListResultSchema listResultSchema;
try (InputStream stream = result.getListResultStream()) {
if (stream == null) {
return null;
}
try {
BlobListResultSchema listResultSchema;
final SAXParser saxParser = saxParserThreadLocal.get();
saxParser.reset();
listResultSchema = new BlobListResultSchema();
Expand All @@ -1620,19 +1617,17 @@ public ListResponseData parseListPathResults(AbfsHttpOperation result, URI uri)
LOG.debug("ListBlobs listed {} blobs with {} as continuation token",
listResultSchema.paths().size(),
listResultSchema.getNextMarker());
} catch (SAXException | IOException e) {
throw new AbfsDriverException(e);
return filterDuplicateEntriesAndRenamePendingFiles(listResultSchema, uri);
} catch (SAXException | IOException ex) {
throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, ex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: We can add URI in the error message for better visibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

URI is already in log messages.

}
} catch (IOException e) {
LOG.error("Unable to deserialize list results for uri {}", uri.toString(), e);
throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, e);
}

try {
return filterDuplicateEntriesAndRenamePendingFiles(listResultSchema, uri);
} catch (IOException e) {
LOG.error("Unable to filter list results for uri {}", uri.toString(), e);
throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, e);
} catch (AbfsDriverException ex) {
// Throw as it is to avoid multiple wrapping.
LOG.error("Unable to deserialize list results for Uri {}", uri != null ? uri.toString(): "NULL", ex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uri should not be null if we are reaching till this part of the code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uri can be null for the cases where this API is called by internal operations like in create flow.
If list call is coming from user, uri won't be null.

throw ex;
} catch (Exception ex) {
LOG.error("Unable to get stream for list results for uri {}", uri != null ? uri.toString(): "NULL", ex);
throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, ex);
}
}

Expand Down Expand Up @@ -1929,8 +1924,10 @@ private List<AbfsHttpHeader> getMetadataHeadersList(final Hashtable<String, Stri
* @param listResultSchema List of entries returned by Blob Endpoint.
* @param uri URI to be used for path conversion.
* @return List of entries after removing duplicates.
* @throws IOException if path conversion fails.
*/
private ListResponseData filterDuplicateEntriesAndRenamePendingFiles(
@VisibleForTesting
public ListResponseData filterDuplicateEntriesAndRenamePendingFiles(
BlobListResultSchema listResultSchema, URI uri) throws IOException {
List<FileStatus> fileStatuses = new ArrayList<>();
Map<Path, Integer> renamePendingJsonPaths = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@
import java.util.UUID;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.hadoop.classification.VisibleForTesting;
Expand Down Expand Up @@ -1476,33 +1478,36 @@ public Hashtable<String, String> getXMSProperties(AbfsHttpOperation result)
* @throws AzureBlobFileSystemException if parsing fails.
*/
@Override
public ListResponseData parseListPathResults(AbfsHttpOperation result, URI uri) throws AzureBlobFileSystemException {
try (InputStream listResultInputStream = result.getListResultStream()) {
DfsListResultSchema listResultSchema;
public ListResponseData parseListPathResults(AbfsHttpOperation result, URI uri)
throws AzureBlobFileSystemException {
try (InputStream stream = result.getListResultStream()) {
try {
DfsListResultSchema listResultSchema;
final ObjectMapper objectMapper = new ObjectMapper();
listResultSchema = objectMapper.readValue(listResultInputStream,
DfsListResultSchema.class);
listResultSchema = objectMapper.readValue(stream, DfsListResultSchema.class);
result.setListResultSchema(listResultSchema);
LOG.debug("ListPath listed {} paths with {} as continuation token",
listResultSchema.paths().size(),
getContinuationFromResponse(result));
} catch (IOException ex) {
throw new AbfsDriverException(ex);
}

List<FileStatus> fileStatuses = new ArrayList<>();
for (DfsListResultEntrySchema entry : listResultSchema.paths()) {
fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
List<FileStatus> fileStatuses = new ArrayList<>();
for (DfsListResultEntrySchema entry : listResultSchema.paths()) {
fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
}
ListResponseData listResponseData = new ListResponseData();
listResponseData.setFileStatusList(fileStatuses);
listResponseData.setRenamePendingJsonPaths(null);
listResponseData.setContinuationToken(
getContinuationFromResponse(result));
return listResponseData;
} catch (JsonParseException | JsonMappingException ex) {
throw new AbfsDriverException(ERR_DFS_LIST_PARSING, ex);
}
ListResponseData listResponseData = new ListResponseData();
listResponseData.setFileStatusList(fileStatuses);
listResponseData.setRenamePendingJsonPaths(null);
listResponseData.setContinuationToken(
getContinuationFromResponse(result));
return listResponseData;
} catch (IOException ex) {
LOG.error("Unable to deserialize list results for Uri {}", uri.toString(), ex);
} catch (AbfsDriverException ex) {
// Throw as it is to avoid multiple wrapping.
LOG.error("Unable to deserialize list results for Uri {}", uri != null ? uri.toString(): "NULL", ex);
throw ex;
} catch (Exception ex) {
LOG.error("Unable to deserialize list results for Uri {}", uri != null ? uri.toString(): "NULL", ex);
throw new AbfsDriverException(ERR_DFS_LIST_PARSING, ex);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.fs.azurebfs.services;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
Expand Down Expand Up @@ -221,7 +223,7 @@ public ListResultSchema getListResultSchema() {
return listResultSchema;
}

public final InputStream getListResultStream() {
public InputStream getListResultStream() {
return listResultStream;
}

Expand Down Expand Up @@ -396,8 +398,7 @@ final void parseResponse(final byte[] buffer,
// consume the input stream to release resources
int totalBytesRead = 0;

try {
InputStream stream = getContentInputStream();
try (InputStream stream = getContentInputStream()) {
if (isNullInputStream(stream)) {
return;
}
Expand All @@ -409,7 +410,7 @@ final void parseResponse(final byte[] buffer,
if (url.toString().contains(QUERY_PARAM_COMP + EQUAL + BLOCKLIST)) {
parseBlockListResponse(stream);
} else {
listResultStream = stream;
parseListPathResponse(stream);
}
} else {
if (buffer != null) {
Expand Down Expand Up @@ -438,6 +439,11 @@ final void parseResponse(final byte[] buffer,
method, getMaskedUrl(), ex.getMessage());
log.debug("IO Error: ", ex);
throw ex;
} catch (Exception ex) {
log.warn("Unexpected error: {} {}: {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need both log.warn and log.debug here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were doing same for other type of exceptions.

method, getMaskedUrl(), ex.getMessage());
log.debug("Unexpected Error: ", ex);
throw new IOException(ex);
} finally {
this.recvResponseTimeMs += elapsedTimeMs(startTime);
this.bytesReceived = totalBytesRead;
Expand Down Expand Up @@ -500,6 +506,25 @@ private void parseBlockListResponse(final InputStream stream) throws IOException
blockIdList = client.parseBlockListResponse(stream);
}

/**
* Parse the list path response from the network stream and save response into a buffer.
* @param stream Network InputStream.
* @throws IOException if an error occurs while reading the stream.
*/
private void parseListPathResponse(final InputStream stream) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add javadocs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taken

if (stream == null || listResultStream != null) {
return;
}
try (ByteArrayOutputStream buffer = new ByteArrayOutputStream()) {
byte[] tempBuffer = new byte[CLEAN_UP_BUFFER_SIZE];
int bytesRead;
while ((bytesRead = stream.read(tempBuffer, 0, CLEAN_UP_BUFFER_SIZE)) != -1) {
buffer.write(tempBuffer, 0, bytesRead);
}
listResultStream = new ByteArrayInputStream(buffer.toByteArray());
}
}

public List<String> getBlockIdList() {
return blockIdList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.util.ArrayList;
Expand All @@ -42,6 +43,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler;
Expand All @@ -63,7 +65,10 @@
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_LIST_MAX_RESULTS;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_METADATA_PREFIX;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_BLOB_LIST_PARSING;
import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_RESET_MESSAGE;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_JDK_MESSAGE;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
Expand Down Expand Up @@ -130,7 +135,9 @@ public Void call() throws Exception {

/**
* Test to verify that each paginated call to ListBlobs uses a new tracing context.
* @throws Exception
* Test also verifies that the retry policy is called when a SocketTimeoutException
* Test also verifies that empty list with valid continuation token is handled.
* @throws Exception if there is an error or test assertions fails.
*/
@Test
public void testListPathTracingContext() throws Exception {
Expand Down Expand Up @@ -160,6 +167,10 @@ public void testListPathTracingContext() throws Exception {
List<FileStatus> fileStatuses = new ArrayList<>();
spiedStore.listStatus(new Path("/"), "", fileStatuses, true, null, spiedTracingContext);

// Assert that there were retries due to SocketTimeoutException
Mockito.verify(spiedClient, Mockito.times(1))
.getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION);

// Assert that there were 2 paginated ListPath calls were made 1 and 2.
// 1. Without continuation token
Mockito.verify(spiedClient, times(1)).listPath(
Expand All @@ -176,6 +187,31 @@ public void testListPathTracingContext() throws Exception {
Mockito.verify(spiedTracingContext, times(0)).constructHeader(any(), any(), any());
}

@Test
public void testListPathParsingFailure() throws Exception {
assumeBlobServiceType();
AzureBlobFileSystem spiedFs = Mockito.spy(getFileSystem());
AzureBlobFileSystemStore spiedStore = Mockito.spy(spiedFs.getAbfsStore());
AbfsBlobClient spiedClient = Mockito.spy(spiedStore.getClientHandler()
.getBlobClient());
Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore();
Mockito.doReturn(spiedClient).when(spiedStore).getClient();

Mockito.doThrow(new SocketException(CONNECTION_RESET_MESSAGE)).when(spiedClient).filterDuplicateEntriesAndRenamePendingFiles(any(), any());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reduce the retry count and verify that request was retried ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This particular test is to make sure retry does not happen as here exception is injected in parsing list result from local stream.

I have modified the test testListPathTracingContext to assert that exception thrown within httpOperation.processResponse() is retried.

List<FileStatus> fileStatuses = new ArrayList<>();
AbfsDriverException ex = intercept(AbfsDriverException.class,
() -> {
spiedStore.listStatus(new Path("/"), "", fileStatuses,
true, null, getTestTracingContext(spiedFs, true));
});
Assertions.assertThat(ex.getStatusCode())
.describedAs("Expecting Network Error status code")
.isEqualTo(-1);
Assertions.assertThat(ex.getErrorMessage())
.describedAs("Expecting COPY_ABORTED error code")
.contains(ERR_BLOB_LIST_PARSING);
}

/**
* Creates a file, verifies that listStatus returns it,
* even while the file is still open for writing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KEEP_ALIVE_CACHE_CLOSED;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_NETWORKING_LIBRARY;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
import static org.apache.hadoop.fs.azurebfs.constants.HttpOperationType.APACHE_HTTP_CLIENT;
Expand All @@ -67,6 +68,7 @@ public ITestApacheClientConnectionPool() throws Exception {
public void testKacIsClosed() throws Throwable {
Configuration configuration = new Configuration(getRawConfiguration());
configuration.set(FS_AZURE_NETWORKING_LIBRARY, APACHE_HTTP_CLIENT.name());
configuration.unset(FS_AZURE_METRIC_FORMAT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to unset this config here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was failing when this config was present in test xml files

try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
configuration)) {
KeepAliveCache kac = fs.getAbfsStore().getClientHandler().getIngressClient()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class TestAbfsClient {
public void testTimerInitializationWithoutMetricCollection() throws Exception {
final Configuration configuration = new Configuration();
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration, ACCOUNT_NAME);
abfsConfiguration.unset(FS_AZURE_METRIC_FORMAT);

AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().withAbfsCounters(abfsCounters).build();
Expand Down