Skip to content

Commit a13f530

Browse files
authored
HADOOP-19543. [ABFS][FnsOverBlob] Remove Duplicates from Blob Endpoint Listing Across Iterations (#7614) (#7632)
Contributed by Anuj Modi Reviewed by Anmol Asrani, Manish Bhatt, Manika Joshi Signed off by Anuj Modi<anujmodi@apache.org>
1 parent bd7659e commit a13f530

File tree

7 files changed

+295
-41
lines changed

7 files changed

+295
-41
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
7777
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException;
7878
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
79+
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
7980
import org.apache.hadoop.fs.azurebfs.services.ListResponseData;
8081
import org.apache.hadoop.fs.azurebfs.enums.Trilean;
8182
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
@@ -117,6 +118,7 @@
117118
import org.apache.hadoop.fs.azurebfs.utils.CRC64;
118119
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
119120
import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
121+
import org.apache.hadoop.fs.azurebfs.utils.ListUtils;
120122
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
121123
import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
122124
import org.apache.hadoop.fs.impl.BackReference;
@@ -1272,7 +1274,7 @@ public String listStatus(final Path path, final String startFrom,
12721274
: generateContinuationTokenForNonXns(relativePath, startFrom);
12731275
}
12741276
}
1275-
1277+
List<FileStatus> fileStatusList = new ArrayList<>();
12761278
do {
12771279
try (AbfsPerfInfo perfInfo = startTracking("listStatus", "listPath")) {
12781280
ListResponseData listResponseData = listingClient.listPath(relativePath,
@@ -1281,9 +1283,9 @@ public String listStatus(final Path path, final String startFrom,
12811283
AbfsRestOperation op = listResponseData.getOp();
12821284
perfInfo.registerResult(op.getResult());
12831285
continuation = listResponseData.getContinuationToken();
1284-
List<FileStatus> fileStatusListInCurrItr = listResponseData.getFileStatusList();
1286+
List<VersionedFileStatus> fileStatusListInCurrItr = listResponseData.getFileStatusList();
12851287
if (fileStatusListInCurrItr != null && !fileStatusListInCurrItr.isEmpty()) {
1286-
fileStatuses.addAll(fileStatusListInCurrItr);
1288+
fileStatusList.addAll(fileStatusListInCurrItr);
12871289
}
12881290
perfInfo.registerSuccess(true);
12891291
countAggregate++;
@@ -1296,6 +1298,14 @@ public String listStatus(final Path path, final String startFrom,
12961298
}
12971299
} while (shouldContinue);
12981300

1301+
if (listingClient instanceof AbfsBlobClient) {
1302+
fileStatuses.addAll(ListUtils.getUniqueListResult(fileStatusList));
1303+
LOG.debug("ListBlob API returned a total of {} elements including duplicates."
1304+
+ "Number of unique Elements are {}", fileStatusList.size(), fileStatuses.size());
1305+
} else {
1306+
fileStatuses.addAll(fileStatusList);
1307+
}
1308+
12991309
return continuation;
13001310
}
13011311

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java

Lines changed: 8 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import java.util.Hashtable;
4343
import java.util.List;
4444
import java.util.Map;
45-
import java.util.TreeMap;
4645
import java.util.UUID;
4746

4847
import org.w3c.dom.Document;
@@ -52,7 +51,6 @@
5251

5352
import org.apache.commons.io.IOUtils;
5453
import org.apache.commons.lang3.StringUtils;
55-
import org.apache.hadoop.fs.FileStatus;
5654
import org.apache.hadoop.fs.FileSystem;
5755
import org.apache.hadoop.classification.VisibleForTesting;
5856
import org.apache.hadoop.fs.Path;
@@ -402,7 +400,7 @@ public ListResponseData listPath(final String relativePath, final boolean recurs
402400
AbfsRestOperation pathStatus = this.getPathStatus(relativePath, tracingContext, null, false);
403401
BlobListResultSchema listResultSchema = getListResultSchemaFromPathStatus(relativePath, pathStatus);
404402
LOG.debug("ListBlob attempted on a file path. Returning file status.");
405-
List<FileStatus> fileStatusList = new ArrayList<>();
403+
List<VersionedFileStatus> fileStatusList = new ArrayList<>();
406404
for (BlobListResultEntrySchema entry : listResultSchema.paths()) {
407405
fileStatusList.add(getVersionedFileStatusFromEntry(entry, uri));
408406
}
@@ -1617,7 +1615,7 @@ public ListResponseData parseListPathResults(AbfsHttpOperation result, URI uri)
16171615
LOG.debug("ListBlobs listed {} blobs with {} as continuation token",
16181616
listResultSchema.paths().size(),
16191617
listResultSchema.getNextMarker());
1620-
return filterDuplicateEntriesAndRenamePendingFiles(listResultSchema, uri);
1618+
return filterRenamePendingFiles(listResultSchema, uri);
16211619
} catch (SAXException | IOException ex) {
16221620
throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, ex);
16231621
}
@@ -1917,39 +1915,23 @@ private List<AbfsHttpHeader> getMetadataHeadersList(final Hashtable<String, Stri
19171915
});
19181916

19191917
/**
1920-
* This is to handle duplicate listing entries returned by Blob Endpoint for
1921-
* implicit paths that also has a marker file created for them.
1922-
* This will retain entry corresponding to marker file and remove the BlobPrefix entry.
1923-
* This will also filter out all the rename pending json files in listing output.
1918+
* This will filter out all the rename pending json files in listing output.
19241919
* @param listResultSchema List of entries returned by Blob Endpoint.
19251920
* @param uri URI to be used for path conversion.
19261921
* @return List of entries after removing duplicates.
19271922
* @throws IOException if path conversion fails.
19281923
*/
19291924
@VisibleForTesting
1930-
public ListResponseData filterDuplicateEntriesAndRenamePendingFiles(
1925+
public ListResponseData filterRenamePendingFiles(
19311926
BlobListResultSchema listResultSchema, URI uri) throws IOException {
1932-
List<FileStatus> fileStatuses = new ArrayList<>();
1927+
List<VersionedFileStatus> fileStatuses = new ArrayList<>();
19331928
Map<Path, Integer> renamePendingJsonPaths = new HashMap<>();
1934-
TreeMap<String, BlobListResultEntrySchema> nameToEntryMap = new TreeMap<>();
19351929

19361930
for (BlobListResultEntrySchema entry : listResultSchema.paths()) {
1937-
if (StringUtils.isNotEmpty(entry.eTag())) {
1938-
// This is a blob entry. It is either a file or a marker blob.
1939-
// In both cases we will add this.
1940-
if (isRenamePendingJsonPathEntry(entry)) {
1941-
renamePendingJsonPaths.put(entry.path(), entry.contentLength().intValue());
1942-
} else {
1943-
nameToEntryMap.put(entry.name(), entry);
1944-
fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
1945-
}
1931+
if (isRenamePendingJsonPathEntry(entry)) {
1932+
renamePendingJsonPaths.put(entry.path(), entry.contentLength().intValue());
19461933
} else {
1947-
// This is a BlobPrefix entry. It is a directory with file inside
1948-
// This might have already been added as a marker blob.
1949-
if (!nameToEntryMap.containsKey(entry.name())) {
1950-
nameToEntryMap.put(entry.name(), entry);
1951-
fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
1952-
}
1934+
fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
19531935
}
19541936
}
19551937

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import com.fasterxml.jackson.databind.ObjectMapper;
4646

4747
import org.apache.hadoop.classification.VisibleForTesting;
48-
import org.apache.hadoop.fs.FileStatus;
4948
import org.apache.hadoop.fs.FileSystem;
5049
import org.apache.hadoop.fs.Path;
5150
import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -1353,7 +1352,7 @@ public ListResponseData parseListPathResults(AbfsHttpOperation result, URI uri)
13531352
LOG.debug("ListPath listed {} paths with {} as continuation token",
13541353
listResultSchema.paths().size(),
13551354
getContinuationFromResponse(result));
1356-
List<FileStatus> fileStatuses = new ArrayList<>();
1355+
List<VersionedFileStatus> fileStatuses = new ArrayList<>();
13571356
for (DfsListResultEntrySchema entry : listResultSchema.paths()) {
13581357
fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
13591358
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListResponseData.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,34 +21,33 @@
2121
import java.util.List;
2222
import java.util.Map;
2323

24-
import org.apache.hadoop.fs.FileStatus;
2524
import org.apache.hadoop.fs.Path;
2625

2726
/**
2827
* This class is used to hold the response data for list operations.
29-
* It contains a list of FileStatus objects, a map of rename pending JSON paths,
28+
* It contains a list of VersionedFileStatus objects, a map of rename pending JSON paths,
3029
* continuation token and the executed REST operation.
3130
*/
3231
public class ListResponseData {
3332

34-
private List<FileStatus> fileStatusList;
33+
private List<VersionedFileStatus> fileStatusList;
3534
private Map<Path, Integer> renamePendingJsonPaths;
3635
private AbfsRestOperation executedRestOperation;
3736
private String continuationToken;
3837

3938
/**
40-
* Returns the list of FileStatus objects.
41-
* @return the list of FileStatus objects
39+
* Returns the list of VersionedFileStatus objects.
40+
* @return the list of VersionedFileStatus objects
4241
*/
43-
public List<FileStatus> getFileStatusList() {
42+
public List<VersionedFileStatus> getFileStatusList() {
4443
return fileStatusList;
4544
}
4645

4746
/**
48-
* Sets the list of FileStatus objects.
49-
* @param fileStatusList the list of FileStatus objects
47+
* Sets the list of VersionedFileStatus objects.
48+
* @param fileStatusList the list of VersionedFileStatus objects
5049
*/
51-
public void setFileStatusList(final List<FileStatus> fileStatusList) {
50+
public void setFileStatusList(final List<VersionedFileStatus> fileStatusList) {
5251
this.fileStatusList = fileStatusList;
5352
}
5453

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.azurebfs.utils;
20+
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
import java.util.TreeMap;
24+
25+
import org.apache.hadoop.fs.FileStatus;
26+
27+
/**
28+
* Utility class for List operations.
29+
*/
30+
public final class ListUtils {
31+
32+
private ListUtils() {
33+
// Private constructor to prevent instantiation
34+
}
35+
36+
/**
37+
* Utility method to remove duplicates from a list of FileStatus.
38+
* ListBlob API of blob endpoint can return duplicate entries.
39+
* @param originalList prone to have duplicates
40+
* @return rectified list with no duplicates.
41+
*/
42+
public static List<FileStatus> getUniqueListResult(List<FileStatus> originalList) {
43+
if (originalList == null || originalList.isEmpty()) {
44+
return originalList;
45+
}
46+
47+
TreeMap<String, FileStatus> nameToEntryMap = new TreeMap<>();
48+
String prefix = null;
49+
List<FileStatus> rectifiedFileStatusList = new ArrayList<>();
50+
51+
for (FileStatus current : originalList) {
52+
String fileName = current.getPath().getName();
53+
54+
if (prefix == null || !fileName.startsWith(prefix)) {
55+
// Prefix pattern breaks here. Reset Map and prefix.
56+
prefix = fileName;
57+
nameToEntryMap.clear();
58+
}
59+
60+
// Add the current entry if it is not already added.
61+
if (!nameToEntryMap.containsKey(fileName)) {
62+
nameToEntryMap.put(fileName, current);
63+
rectifiedFileStatusList.add(current);
64+
}
65+
}
66+
67+
return rectifiedFileStatusList;
68+
}
69+
}

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import java.net.SocketTimeoutException;
2525
import java.net.URL;
2626
import java.util.ArrayList;
27+
import java.util.HashSet;
2728
import java.util.List;
29+
import java.util.Set;
2830
import java.util.concurrent.Callable;
2931
import java.util.concurrent.ExecutorService;
3032
import java.util.concurrent.Executors;
@@ -90,6 +92,8 @@ public class ITestAzureBlobFileSystemListStatus extends
9092
AbstractAbfsIntegrationTest {
9193
private static final int TEST_FILES_NUMBER = 6000;
9294
public static final String TEST_CONTINUATION_TOKEN = "continuation";
95+
private static final int TOTAL_NUMBER_OF_PATHS = 11;
96+
private static final int NUMBER_OF_UNIQUE_PATHS = 7;
9397

9498
public ITestAzureBlobFileSystemListStatus() throws Exception {
9599
super();
@@ -197,7 +201,7 @@ public void testListPathParsingFailure() throws Exception {
197201
Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore();
198202
Mockito.doReturn(spiedClient).when(spiedStore).getClient();
199203

200-
Mockito.doThrow(new SocketException(CONNECTION_RESET_MESSAGE)).when(spiedClient).filterDuplicateEntriesAndRenamePendingFiles(any(), any());
204+
Mockito.doThrow(new SocketException(CONNECTION_RESET_MESSAGE)).when(spiedClient).filterRenamePendingFiles(any(), any());
201205
List<FileStatus> fileStatuses = new ArrayList<>();
202206
AbfsDriverException ex = intercept(AbfsDriverException.class,
203207
() -> {
@@ -532,6 +536,87 @@ public void testEmptyContinuationToken() throws Exception {
532536
.describedAs("Listing Size Not as expected").hasSize(1);
533537
}
534538

539+
/**
540+
* Test to verify that listStatus returns the correct file status
541+
* after removing duplicates across multiple iterations of list blobs.
542+
* Also verifies that in case of non-empty explicit dir,
543+
* entry corresponding to marker blob is returned.
544+
* @throws Exception if test fails.
545+
*/
546+
@Test
547+
public void testDuplicateEntriesAcrossListBlobIterations() throws Exception {
548+
AzureBlobFileSystem fs = Mockito.spy(getFileSystem());
549+
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
550+
store.getAbfsConfiguration().setListMaxResults(1);
551+
AbfsClient client = Mockito.spy(store.getClient());
552+
553+
Mockito.doReturn(store).when(fs).getAbfsStore();
554+
Mockito.doReturn(client).when(store).getClient();
555+
556+
/*
557+
* Following entries will be created inside the root path.
558+
* 0. /A - implicit directory without any marker blob
559+
* 1. /a - marker file for explicit directory
560+
* 2. /a/file1 - normal file inside explicit directory
561+
* 3. /b - normal file inside root
562+
* 4. /c - marker file for explicit directory
563+
* 5. /c.bak - marker file for explicit directory
564+
* 6. /c.bak/file2 - normal file inside explicit directory
565+
* 7. /c/file3 - normal file inside explicit directory
566+
* 8. /d - implicit directory
567+
* 9. /e - marker file for explicit directory
568+
* 10. /e/file4 - normal file inside explicit directory
569+
*/
570+
// Create Path 0
571+
createAzCopyFolder(new Path("/A"));
572+
573+
// Create Path 1 and 2.
574+
fs.create(new Path("/a/file1"));
575+
576+
// Create Path 3
577+
fs.create(new Path("/b"));
578+
579+
// Create Path 4 and 7
580+
fs.create(new Path("/c/file3"));
581+
582+
// Create Path 5 and 6
583+
fs.create(new Path("/c.bak/file2"));
584+
585+
// Create Path 8
586+
createAzCopyFolder(new Path("/d"));
587+
588+
// Create Path 9 and 10
589+
fs.create(new Path("/e/file4"));
590+
591+
FileStatus[] fileStatuses = fs.listStatus(new Path(ROOT_PATH));
592+
593+
// Assert that client.listPath was called 11 times.
594+
// This will assert server returned 11 entries in total.
595+
Mockito.verify(client, Mockito.times(TOTAL_NUMBER_OF_PATHS))
596+
.listPath(eq(ROOT_PATH), eq(false), eq(1), any(), any(), any());
597+
598+
// Assert that after duplicate removal, only 7 unique entries are returned.
599+
Assertions.assertThat(fileStatuses.length)
600+
.describedAs("List size is not expected").isEqualTo(NUMBER_OF_UNIQUE_PATHS);
601+
602+
// Assert that for duplicates, entry corresponding to marker blob is returned.
603+
assertImplicitDirectoryFileStatus(fileStatuses[0], fs.makeQualified(new Path("/A")));
604+
assertExplicitDirectoryFileStatus(fileStatuses[1], fs.makeQualified(new Path("/a")));
605+
assertFilePathFileStatus(fileStatuses[2], fs.makeQualified(new Path("/b")));
606+
assertExplicitDirectoryFileStatus(fileStatuses[3], fs.makeQualified(new Path("/c")));
607+
assertExplicitDirectoryFileStatus(fileStatuses[4], fs.makeQualified(new Path("/c.bak")));
608+
assertImplicitDirectoryFileStatus(fileStatuses[5], fs.makeQualified(new Path("/d")));
609+
assertExplicitDirectoryFileStatus(fileStatuses[6], fs.makeQualified(new Path("/e")));
610+
611+
// Assert that there are no duplicates in the returned file statuses.
612+
Set<Path> uniquePaths = new HashSet<>();
613+
for (FileStatus fileStatus : fileStatuses) {
614+
Assertions.assertThat(uniquePaths.add(fileStatus.getPath()))
615+
.describedAs("Duplicate Entries found")
616+
.isTrue();
617+
}
618+
}
619+
535620
private void assertFilePathFileStatus(final FileStatus fileStatus,
536621
final Path qualifiedPath) {
537622
Assertions.assertThat(fileStatus.getPath())

0 commit comments

Comments
 (0)