-
Notifications
You must be signed in to change notification settings - Fork 9.1k
HADOOP-19531. [ABFS][FnsOverBlob] Streaming List Path Result Should Happen Inside Retry Loop #7582
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
55dbf22
fde7917
e3f272a
f965d4b
37f0191
2689733
60f1c38
f968baa
5df635d
5b9dfa5
fde1783
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1605,12 +1605,9 @@ public Hashtable<String, String> getXMSProperties(AbfsHttpOperation result) | |
@Override | ||
public ListResponseData parseListPathResults(AbfsHttpOperation result, URI uri) | ||
throws AzureBlobFileSystemException { | ||
BlobListResultSchema listResultSchema; | ||
try (InputStream stream = result.getListResultStream()) { | ||
if (stream == null) { | ||
return null; | ||
} | ||
try { | ||
BlobListResultSchema listResultSchema; | ||
final SAXParser saxParser = saxParserThreadLocal.get(); | ||
saxParser.reset(); | ||
listResultSchema = new BlobListResultSchema(); | ||
|
@@ -1620,19 +1617,17 @@ public ListResponseData parseListPathResults(AbfsHttpOperation result, URI uri) | |
LOG.debug("ListBlobs listed {} blobs with {} as continuation token", | ||
listResultSchema.paths().size(), | ||
listResultSchema.getNextMarker()); | ||
} catch (SAXException | IOException e) { | ||
throw new AbfsDriverException(e); | ||
return filterDuplicateEntriesAndRenamePendingFiles(listResultSchema, uri); | ||
} catch (SAXException | IOException ex) { | ||
throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, ex); | ||
} | ||
} catch (IOException e) { | ||
LOG.error("Unable to deserialize list results for uri {}", uri.toString(), e); | ||
throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, e); | ||
} | ||
|
||
try { | ||
return filterDuplicateEntriesAndRenamePendingFiles(listResultSchema, uri); | ||
} catch (IOException e) { | ||
LOG.error("Unable to filter list results for uri {}", uri.toString(), e); | ||
throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, e); | ||
} catch (AbfsDriverException ex) { | ||
// Throw as it is to avoid multiple wrapping. | ||
LOG.error("Unable to deserialize list results for Uri {}", uri != null ? uri.toString(): "NULL", ex); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. uri should not be null if we are reaching till this part of the code There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. uri can be null for the cases where this API is called by internal operations like in create flow. |
||
throw ex; | ||
} catch (Exception ex) { | ||
LOG.error("Unable to get stream for list results for uri {}", uri != null ? uri.toString(): "NULL", ex); | ||
throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, ex); | ||
} | ||
} | ||
|
||
|
@@ -1929,8 +1924,10 @@ private List<AbfsHttpHeader> getMetadataHeadersList(final Hashtable<String, Stri | |
* @param listResultSchema List of entries returned by Blob Endpoint. | ||
* @param uri URI to be used for path conversion. | ||
* @return List of entries after removing duplicates. | ||
* @throws IOException if path conversion fails. | ||
*/ | ||
private ListResponseData filterDuplicateEntriesAndRenamePendingFiles( | ||
@VisibleForTesting | ||
public ListResponseData filterDuplicateEntriesAndRenamePendingFiles( | ||
BlobListResultSchema listResultSchema, URI uri) throws IOException { | ||
List<FileStatus> fileStatuses = new ArrayList<>(); | ||
Map<Path, Integer> renamePendingJsonPaths = new HashMap<>(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,8 @@ | |
|
||
package org.apache.hadoop.fs.azurebfs.services; | ||
|
||
import java.io.ByteArrayInputStream; | ||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.net.HttpURLConnection; | ||
|
@@ -221,7 +223,7 @@ public ListResultSchema getListResultSchema() { | |
return listResultSchema; | ||
} | ||
|
||
public final InputStream getListResultStream() { | ||
public InputStream getListResultStream() { | ||
return listResultStream; | ||
} | ||
|
||
|
@@ -396,8 +398,7 @@ final void parseResponse(final byte[] buffer, | |
// consume the input stream to release resources | ||
int totalBytesRead = 0; | ||
|
||
try { | ||
InputStream stream = getContentInputStream(); | ||
try (InputStream stream = getContentInputStream()) { | ||
if (isNullInputStream(stream)) { | ||
return; | ||
} | ||
|
@@ -409,7 +410,7 @@ final void parseResponse(final byte[] buffer, | |
if (url.toString().contains(QUERY_PARAM_COMP + EQUAL + BLOCKLIST)) { | ||
parseBlockListResponse(stream); | ||
} else { | ||
listResultStream = stream; | ||
parseListPathResponse(stream); | ||
} | ||
} else { | ||
if (buffer != null) { | ||
|
@@ -438,6 +439,11 @@ final void parseResponse(final byte[] buffer, | |
method, getMaskedUrl(), ex.getMessage()); | ||
log.debug("IO Error: ", ex); | ||
throw ex; | ||
} catch (Exception ex) { | ||
log.warn("Unexpected error: {} {}: {}", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need both log.warn and log.debug here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We were doing same for other type of exceptions. |
||
method, getMaskedUrl(), ex.getMessage()); | ||
log.debug("Unexpected Error: ", ex); | ||
throw new IOException(ex); | ||
} finally { | ||
this.recvResponseTimeMs += elapsedTimeMs(startTime); | ||
this.bytesReceived = totalBytesRead; | ||
|
@@ -500,6 +506,25 @@ private void parseBlockListResponse(final InputStream stream) throws IOException | |
blockIdList = client.parseBlockListResponse(stream); | ||
} | ||
|
||
/** | ||
* Parse the list path response from the network stream and save response into a buffer. | ||
* @param stream Network InputStream. | ||
* @throws IOException if an error occurs while reading the stream. | ||
*/ | ||
private void parseListPathResponse(final InputStream stream) throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add javadocs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Taken |
||
if (stream == null || listResultStream != null) { | ||
return; | ||
} | ||
try (ByteArrayOutputStream buffer = new ByteArrayOutputStream()) { | ||
byte[] tempBuffer = new byte[CLEAN_UP_BUFFER_SIZE]; | ||
int bytesRead; | ||
while ((bytesRead = stream.read(tempBuffer, 0, CLEAN_UP_BUFFER_SIZE)) != -1) { | ||
buffer.write(tempBuffer, 0, bytesRead); | ||
} | ||
listResultStream = new ByteArrayInputStream(buffer.toByteArray()); | ||
} | ||
} | ||
|
||
public List<String> getBlockIdList() { | ||
return blockIdList; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ | |
|
||
import java.io.FileNotFoundException; | ||
import java.io.IOException; | ||
import java.net.SocketException; | ||
import java.net.SocketTimeoutException; | ||
import java.net.URL; | ||
import java.util.ArrayList; | ||
|
@@ -42,6 +43,7 @@ | |
import org.apache.hadoop.fs.Path; | ||
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; | ||
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; | ||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException; | ||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; | ||
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient; | ||
import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler; | ||
|
@@ -63,7 +65,10 @@ | |
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE; | ||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_LIST_MAX_RESULTS; | ||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_METADATA_PREFIX; | ||
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_BLOB_LIST_PARSING; | ||
import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX; | ||
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_RESET_MESSAGE; | ||
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION; | ||
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_JDK_MESSAGE; | ||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs; | ||
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; | ||
|
@@ -130,7 +135,9 @@ public Void call() throws Exception { | |
|
||
/** | ||
* Test to verify that each paginated call to ListBlobs uses a new tracing context. | ||
* @throws Exception | ||
* Test also verifies that the retry policy is called when a SocketTimeoutException | ||
* Test also verifies that empty list with valid continuation token is handled. | ||
* @throws Exception if there is an error or test assertions fails. | ||
*/ | ||
@Test | ||
public void testListPathTracingContext() throws Exception { | ||
|
@@ -160,6 +167,10 @@ public void testListPathTracingContext() throws Exception { | |
List<FileStatus> fileStatuses = new ArrayList<>(); | ||
spiedStore.listStatus(new Path("/"), "", fileStatuses, true, null, spiedTracingContext); | ||
|
||
// Assert that there were retries due to SocketTimeoutException | ||
Mockito.verify(spiedClient, Mockito.times(1)) | ||
.getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION); | ||
|
||
// Assert that there were 2 paginated ListPath calls were made 1 and 2. | ||
// 1. Without continuation token | ||
Mockito.verify(spiedClient, times(1)).listPath( | ||
|
@@ -176,6 +187,31 @@ public void testListPathTracingContext() throws Exception { | |
Mockito.verify(spiedTracingContext, times(0)).constructHeader(any(), any(), any()); | ||
} | ||
|
||
@Test | ||
public void testListPathParsingFailure() throws Exception { | ||
assumeBlobServiceType(); | ||
AzureBlobFileSystem spiedFs = Mockito.spy(getFileSystem()); | ||
AzureBlobFileSystemStore spiedStore = Mockito.spy(spiedFs.getAbfsStore()); | ||
AbfsBlobClient spiedClient = Mockito.spy(spiedStore.getClientHandler() | ||
.getBlobClient()); | ||
Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore(); | ||
Mockito.doReturn(spiedClient).when(spiedStore).getClient(); | ||
|
||
Mockito.doThrow(new SocketException(CONNECTION_RESET_MESSAGE)).when(spiedClient).filterDuplicateEntriesAndRenamePendingFiles(any(), any()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we reduce the retry count and verify that request was retried ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This particular test is to make sure retry does not happen as here exception is injected in parsing list result from local stream. I have modified the test |
||
List<FileStatus> fileStatuses = new ArrayList<>(); | ||
AbfsDriverException ex = intercept(AbfsDriverException.class, | ||
() -> { | ||
spiedStore.listStatus(new Path("/"), "", fileStatuses, | ||
true, null, getTestTracingContext(spiedFs, true)); | ||
}); | ||
Assertions.assertThat(ex.getStatusCode()) | ||
.describedAs("Expecting Network Error status code") | ||
.isEqualTo(-1); | ||
Assertions.assertThat(ex.getErrorMessage()) | ||
.describedAs("Expecting COPY_ABORTED error code") | ||
.contains(ERR_BLOB_LIST_PARSING); | ||
} | ||
|
||
/** | ||
* Creates a file, verifies that listStatus returns it, | ||
* even while the file is still open for writing. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,6 +45,7 @@ | |
|
||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON; | ||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KEEP_ALIVE_CACHE_CLOSED; | ||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT; | ||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_NETWORKING_LIBRARY; | ||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME; | ||
import static org.apache.hadoop.fs.azurebfs.constants.HttpOperationType.APACHE_HTTP_CLIENT; | ||
|
@@ -67,6 +68,7 @@ public ITestApacheClientConnectionPool() throws Exception { | |
public void testKacIsClosed() throws Throwable { | ||
Configuration configuration = new Configuration(getRawConfiguration()); | ||
configuration.set(FS_AZURE_NETWORKING_LIBRARY, APACHE_HTTP_CLIENT.name()); | ||
configuration.unset(FS_AZURE_METRIC_FORMAT); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need to unset this config here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test was failing when this config was present in test xml files |
||
try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance( | ||
configuration)) { | ||
KeepAliveCache kac = fs.getAbfsStore().getClientHandler().getIngressClient() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: We can add URI in the error message for better visibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
URI is already in log messages.