-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-19543. [ABFS][FnsOverBlob] Remove Duplicates from Blob Endpoint Listing Across Iterations #7614
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-19543. [ABFS][FnsOverBlob] Remove Duplicates from Blob Endpoint Listing Across Iterations #7614
Changes from all commits
bca367c
623a4ff
c067023
b276546
c05b8eb
c9c8fe2
842393d
e56a6e8
ecbf273
74d3403
d21f9f6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,34 +21,33 @@ | |
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
||
| import org.apache.hadoop.fs.FileStatus; | ||
| import org.apache.hadoop.fs.Path; | ||
|
|
||
| /** | ||
| * This class is used to hold the response data for list operations. | ||
| * It contains a list of FileStatus objects, a map of rename pending JSON paths, | ||
| * It contains a list of VersionedFileStatus objects, a map of rename pending JSON paths, | ||
| * continuation token and the executed REST operation. | ||
| */ | ||
| public class ListResponseData { | ||
|
|
||
| private List<FileStatus> fileStatusList; | ||
| private List<VersionedFileStatus> fileStatusList; | ||
| private Map<Path, Integer> renamePendingJsonPaths; | ||
| private AbfsRestOperation executedRestOperation; | ||
| private String continuationToken; | ||
|
|
||
| /** | ||
| * Returns the list of FileStatus objects. | ||
| * @return the list of FileStatus objects | ||
| * Returns the list of VersionedFileStatus objects. | ||
| * @return the list of VersionedFileStatus objects | ||
| */ | ||
| public List<FileStatus> getFileStatusList() { | ||
| public List<VersionedFileStatus> getFileStatusList() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Javadoc update is needed.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Taken |
||
| return fileStatusList; | ||
| } | ||
|
|
||
| /** | ||
| * Sets the list of FileStatus objects. | ||
| * @param fileStatusList the list of FileStatus objects | ||
| * Sets the list of VersionedFileStatus objects. | ||
| * @param fileStatusList the list of VersionedFileStatus objects | ||
| */ | ||
| public void setFileStatusList(final List<FileStatus> fileStatusList) { | ||
| public void setFileStatusList(final List<VersionedFileStatus> fileStatusList) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above, javadoc update is needed
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Taken |
||
| this.fileStatusList = fileStatusList; | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,69 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.hadoop.fs.azurebfs.utils; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.TreeMap; | ||
|
|
||
| import org.apache.hadoop.fs.FileStatus; | ||
|
|
||
| /** | ||
| * Utility class for List operations. | ||
| */ | ||
| public final class ListUtils { | ||
|
|
||
| private ListUtils() { | ||
| // Private constructor to prevent instantiation | ||
| } | ||
|
|
||
| /** | ||
| * Utility method to remove duplicates from a list of FileStatus. | ||
| * ListBlob API of blob endpoint can return duplicate entries. | ||
| * @param originalList prone to have duplicates | ||
| * @return rectified list with no duplicates. | ||
| */ | ||
| public static List<FileStatus> getUniqueListResult(List<FileStatus> originalList) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Logic looks a bit complex to read, can be refactored. One possible solution could be public static List getUniqueListResult(List originalList) { List uniqueList = new ArrayList<>(); String currentPrefix = null; for (FileStatus fileStatus : originalList) { } return uniqueList;
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice Suggestion. |
||
| if (originalList == null || originalList.isEmpty()) { | ||
| return originalList; | ||
| } | ||
|
|
||
| TreeMap<String, FileStatus> nameToEntryMap = new TreeMap<>(); | ||
| String prefix = null; | ||
| List<FileStatus> rectifiedFileStatusList = new ArrayList<>(); | ||
|
|
||
| for (FileStatus current : originalList) { | ||
| String fileName = current.getPath().getName(); | ||
|
|
||
| if (prefix == null || !fileName.startsWith(prefix)) { | ||
| // Prefix pattern breaks here. Reset Map and prefix. | ||
| prefix = fileName; | ||
| nameToEntryMap.clear(); | ||
| } | ||
|
|
||
| // Add the current entry if it is not already added. | ||
| if (!nameToEntryMap.containsKey(fileName)) { | ||
| nameToEntryMap.put(fileName, current); | ||
| rectifiedFileStatusList.add(current); | ||
| } | ||
| } | ||
|
|
||
| return rectifiedFileStatusList; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,7 +24,9 @@ | |
| import java.net.SocketTimeoutException; | ||
| import java.net.URL; | ||
| import java.util.ArrayList; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Set; | ||
| import java.util.concurrent.Callable; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
|
|
@@ -90,6 +92,8 @@ public class ITestAzureBlobFileSystemListStatus extends | |
| AbstractAbfsIntegrationTest { | ||
| private static final int TEST_FILES_NUMBER = 6000; | ||
| public static final String TEST_CONTINUATION_TOKEN = "continuation"; | ||
| private static final int TOTAL_NUMBER_OF_PATHS = 11; | ||
| private static final int NUMBER_OF_UNIQUE_PATHS = 7; | ||
|
|
||
| public ITestAzureBlobFileSystemListStatus() throws Exception { | ||
| super(); | ||
|
|
@@ -197,7 +201,7 @@ public void testListPathParsingFailure() throws Exception { | |
| Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore(); | ||
| Mockito.doReturn(spiedClient).when(spiedStore).getClient(); | ||
|
|
||
| Mockito.doThrow(new SocketException(CONNECTION_RESET_MESSAGE)).when(spiedClient).filterDuplicateEntriesAndRenamePendingFiles(any(), any()); | ||
| Mockito.doThrow(new SocketException(CONNECTION_RESET_MESSAGE)).when(spiedClient).filterRenamePendingFiles(any(), any()); | ||
| List<FileStatus> fileStatuses = new ArrayList<>(); | ||
| AbfsDriverException ex = intercept(AbfsDriverException.class, | ||
| () -> { | ||
|
|
@@ -532,6 +536,87 @@ public void testEmptyContinuationToken() throws Exception { | |
| .describedAs("Listing Size Not as expected").hasSize(1); | ||
| } | ||
|
|
||
| /** | ||
| * Test to verify that listStatus returns the correct file status | ||
| * after removing duplicates across multiple iterations of list blobs. | ||
| * Also verifies that in case of non-empty explicit dir, | ||
| * entry corresponding to marker blob is returned. | ||
| * @throws Exception if test fails. | ||
| */ | ||
| @Test | ||
| public void testDuplicateEntriesAcrossListBlobIterations() throws Exception { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can add one more test where there are more than one files in the directory and max list result is 1 and multiple list status calls and verify the overall file statuses don't have any duplicate entries
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The duplicate can only happen in case of non-empty explicit directories. The store won't allow me to create duplicate paths otherwise.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Modified test as above |
||
| AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); | ||
| AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); | ||
| store.getAbfsConfiguration().setListMaxResults(1); | ||
| AbfsClient client = Mockito.spy(store.getClient()); | ||
|
|
||
| Mockito.doReturn(store).when(fs).getAbfsStore(); | ||
| Mockito.doReturn(client).when(store).getClient(); | ||
|
|
||
| /* | ||
| * Following entries will be created inside the root path. | ||
| * 0. /A - implicit directory without any marker blob | ||
| * 1. /a - marker file for explicit directory | ||
| * 2. /a/file1 - normal file inside explicit directory | ||
| * 3. /b - normal file inside root | ||
| * 4. /c - marker file for explicit directory | ||
| * 5. /c.bak - marker file for explicit directory | ||
| * 6. /c.bak/file2 - normal file inside explicit directory | ||
| * 7. /c/file3 - normal file inside explicit directory | ||
| * 8. /d - implicit directory | ||
| * 9. /e - marker file for explicit directory | ||
| * 10. /e/file4 - normal file inside explicit directory | ||
| */ | ||
| // Create Path 0 | ||
| createAzCopyFolder(new Path("/A")); | ||
|
|
||
| // Create Path 1 and 2. | ||
| fs.create(new Path("/a/file1")); | ||
|
|
||
| // Create Path 3 | ||
| fs.create(new Path("/b")); | ||
|
|
||
| // Create Path 4 and 7 | ||
| fs.create(new Path("/c/file3")); | ||
|
|
||
| // Create Path 5 and 6 | ||
| fs.create(new Path("/c.bak/file2")); | ||
|
|
||
| // Create Path 8 | ||
| createAzCopyFolder(new Path("/d")); | ||
|
|
||
| // Create Path 9 and 10 | ||
| fs.create(new Path("/e/file4")); | ||
|
|
||
| FileStatus[] fileStatuses = fs.listStatus(new Path(ROOT_PATH)); | ||
|
|
||
| // Assert that client.listPath was called 11 times. | ||
| // This will assert server returned 11 entries in total. | ||
| Mockito.verify(client, Mockito.times(TOTAL_NUMBER_OF_PATHS)) | ||
| .listPath(eq(ROOT_PATH), eq(false), eq(1), any(), any(), any()); | ||
|
|
||
| // Assert that after duplicate removal, only 7 unique entries are returned. | ||
| Assertions.assertThat(fileStatuses.length) | ||
| .describedAs("List size is not expected").isEqualTo(NUMBER_OF_UNIQUE_PATHS); | ||
|
|
||
| // Assert that for duplicates, entry corresponding to marker blob is returned. | ||
| assertImplicitDirectoryFileStatus(fileStatuses[0], fs.makeQualified(new Path("/A"))); | ||
| assertExplicitDirectoryFileStatus(fileStatuses[1], fs.makeQualified(new Path("/a"))); | ||
| assertFilePathFileStatus(fileStatuses[2], fs.makeQualified(new Path("/b"))); | ||
| assertExplicitDirectoryFileStatus(fileStatuses[3], fs.makeQualified(new Path("/c"))); | ||
| assertExplicitDirectoryFileStatus(fileStatuses[4], fs.makeQualified(new Path("/c.bak"))); | ||
| assertImplicitDirectoryFileStatus(fileStatuses[5], fs.makeQualified(new Path("/d"))); | ||
| assertExplicitDirectoryFileStatus(fileStatuses[6], fs.makeQualified(new Path("/e"))); | ||
|
|
||
| // Assert that there are no duplicates in the returned file statuses. | ||
| Set<Path> uniquePaths = new HashSet<>(); | ||
| for (FileStatus fileStatus : fileStatuses) { | ||
| Assertions.assertThat(uniquePaths.add(fileStatus.getPath())) | ||
| .describedAs("Duplicate Entries found") | ||
| .isTrue(); | ||
| } | ||
| } | ||
|
|
||
| private void assertFilePathFileStatus(final FileStatus fileStatus, | ||
| final Path qualifiedPath) { | ||
| Assertions.assertThat(fileStatus.getPath()) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we are iterating over a list of VersionedFileStatus and maintaing a list of FileStatus, shouldn't the new list also be of VersionedFileStatus elements ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Elements are always of type VersionedFileStatus only. Just the reference type is getting changed because store.listStatus() works with FileStatus only. The final list to be returned has to be of type FileStatus. Although the objects hold by it are always VersionedFileStatus and can be type casted wherever needed.