-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-19543. [ABFS][FnsOverBlob] Remove Duplicates from Blob Endpoint Listing Across Iterations #7614
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-19543. [ABFS][FnsOverBlob] Remove Duplicates from Blob Endpoint Listing Across Iterations #7614
Changes from 1 commit
bca367c
623a4ff
c067023
b276546
c05b8eb
c9c8fe2
842393d
e56a6e8
ecbf273
74d3403
d21f9f6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -45,6 +45,7 @@ | |
| import java.util.Map; | ||
| import java.util.Optional; | ||
| import java.util.Set; | ||
| import java.util.TreeMap; | ||
| import java.util.WeakHashMap; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.ExecutorService; | ||
|
|
@@ -76,6 +77,8 @@ | |
| import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException; | ||
| import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException; | ||
| import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; | ||
| import org.apache.hadoop.fs.azurebfs.contracts.services.BlobListResultEntrySchema; | ||
| import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient; | ||
| import org.apache.hadoop.fs.azurebfs.services.ListResponseData; | ||
| import org.apache.hadoop.fs.azurebfs.enums.Trilean; | ||
| import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider; | ||
|
|
@@ -1273,6 +1276,7 @@ public String listStatus(final Path path, final String startFrom, | |
| } | ||
| } | ||
|
|
||
| TreeMap<String, VersionedFileStatus> nameToEntryMap = new TreeMap<>(); | ||
| do { | ||
| try (AbfsPerfInfo perfInfo = startTracking("listStatus", "listPath")) { | ||
| ListResponseData listResponseData = listingClient.listPath(relativePath, | ||
|
|
@@ -1281,9 +1285,17 @@ public String listStatus(final Path path, final String startFrom, | |
| AbfsRestOperation op = listResponseData.getOp(); | ||
| perfInfo.registerResult(op.getResult()); | ||
| continuation = listResponseData.getContinuationToken(); | ||
| List<FileStatus> fileStatusListInCurrItr = listResponseData.getFileStatusList(); | ||
| List<VersionedFileStatus> fileStatusListInCurrItr = listResponseData.getFileStatusList(); | ||
| if (fileStatusListInCurrItr != null && !fileStatusListInCurrItr.isEmpty()) { | ||
| fileStatuses.addAll(fileStatusListInCurrItr); | ||
| if (listingClient instanceof AbfsBlobClient) { | ||
| /* Blob Endpoint can return duplicate entries for non-empty explicit | ||
| * directories. Such entries can come across multiple iterations of | ||
| * list call and hence need to be filtered here. | ||
| */ | ||
| filterDuplicateEntriesForBlobClient(nameToEntryMap, fileStatusListInCurrItr, fileStatuses); | ||
| } else { | ||
| fileStatuses.addAll(fileStatusListInCurrItr); | ||
| } | ||
| } | ||
| perfInfo.registerSuccess(true); | ||
| countAggregate++; | ||
|
|
@@ -1299,6 +1311,29 @@ public String listStatus(final Path path, final String startFrom, | |
| return continuation; | ||
| } | ||
|
|
||
| private void filterDuplicateEntriesForBlobClient( | ||
| TreeMap<String, VersionedFileStatus> nameToEntryMap, | ||
| List<VersionedFileStatus> fileStatusListInCurrItr, | ||
| List<FileStatus> fileStatuses) { | ||
| for (VersionedFileStatus fileStatus : fileStatusListInCurrItr) { | ||
| String entryName = fileStatus.getPath().getName(); | ||
| if (StringUtils.isNotEmpty(fileStatus.getEtag())) { | ||
| // This is a blob entry. It is either a file or a marker blob. | ||
| // In both cases we will add this. | ||
| nameToEntryMap.put(entryName, fileStatus); | ||
| fileStatuses.add(fileStatus); | ||
| } else { | ||
|
||
| // This is a BlobPrefix entry. | ||
| // It is a directory with file inside | ||
| // This might have already been added as a marker blob. | ||
| if (!nameToEntryMap.containsKey(entryName)) { | ||
| nameToEntryMap.put(entryName, fileStatus); | ||
| fileStatuses.add(fileStatus); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // generate continuation token for xns account | ||
| private String generateContinuationTokenForXns(final String firstEntryName) { | ||
| Preconditions.checkArgument(!Strings.isNullOrEmpty(firstEntryName) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,7 +31,7 @@ | |
| */ | ||
| public class ListResponseData { | ||
|
|
||
| private List<FileStatus> fileStatusList; | ||
| private List<VersionedFileStatus> fileStatusList; | ||
| private Map<Path, Integer> renamePendingJsonPaths; | ||
| private AbfsRestOperation executedRestOperation; | ||
| private String continuationToken; | ||
|
|
@@ -40,15 +40,15 @@ public class ListResponseData { | |
| * Returns the list of FileStatus objects. | ||
| * @return the list of FileStatus objects | ||
| */ | ||
| public List<FileStatus> getFileStatusList() { | ||
| public List<VersionedFileStatus> getFileStatusList() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Javadoc update is needed.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Taken |
||
| return fileStatusList; | ||
| } | ||
|
|
||
| /** | ||
| * Sets the list of FileStatus objects. | ||
| * @param fileStatusList the list of FileStatus objects | ||
| */ | ||
| public void setFileStatusList(final List<FileStatus> fileStatusList) { | ||
| public void setFileStatusList(final List<VersionedFileStatus> fileStatusList) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above, javadoc update is needed
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Taken |
||
| this.fileStatusList = fileStatusList; | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -197,7 +197,7 @@ public void testListPathParsingFailure() throws Exception { | |
| Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore(); | ||
| Mockito.doReturn(spiedClient).when(spiedStore).getClient(); | ||
|
|
||
| Mockito.doThrow(new SocketException(CONNECTION_RESET_MESSAGE)).when(spiedClient).filterDuplicateEntriesAndRenamePendingFiles(any(), any()); | ||
| Mockito.doThrow(new SocketException(CONNECTION_RESET_MESSAGE)).when(spiedClient).filterRenamePendingFiles(any(), any()); | ||
| List<FileStatus> fileStatuses = new ArrayList<>(); | ||
| AbfsDriverException ex = intercept(AbfsDriverException.class, | ||
| () -> { | ||
|
|
@@ -532,6 +532,28 @@ public void testEmptyContinuationToken() throws Exception { | |
| .describedAs("Listing Size Not as expected").hasSize(1); | ||
| } | ||
|
|
||
| @Test | ||
| public void testDuplicateEntriesAcrossListBlobIterations() throws Exception { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can add one more test where there are more than one files in the directory and max list result is 1 and multiple list status calls and verify the overall file statuses don't have any duplicate entries
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The duplicate can only happen in case of non-empty explicit directories. The store won't allow me to create duplicate paths otherwise.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Modified test as above |
||
| AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); | ||
| AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); | ||
| store.getAbfsConfiguration().setListMaxResults(1); | ||
| AbfsClient client = Mockito.spy(store.getClient()); | ||
|
|
||
| Mockito.doReturn(store).when(fs).getAbfsStore(); | ||
| Mockito.doReturn(client).when(store).getClient(); | ||
|
|
||
| // Create a non-empty explicit directory under root | ||
| Path dir = new Path("/a"); | ||
| Path fileInsideDir = new Path("/a/file"); | ||
| createAzCopyFolder(dir); | ||
| fs.create(fileInsideDir); | ||
|
|
||
| FileStatus[] fileStatuses = fs.listStatus(new Path(ROOT_PATH)); | ||
| Assertions.assertThat(fileStatuses.length) | ||
| .describedAs("List size is not expected").isEqualTo(1); | ||
| assertExplicitDirectoryFileStatus(fileStatuses[0], fs.makeQualified(dir)); | ||
| } | ||
|
|
||
| private void assertFilePathFileStatus(final FileStatus fileStatus, | ||
| final Path qualifiedPath) { | ||
| Assertions.assertThat(fileStatus.getPath()) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add javadocs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added