Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
import org.apache.hadoop.fs.azurebfs.services.ListResponseData;
import org.apache.hadoop.fs.azurebfs.enums.Trilean;
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
Expand Down Expand Up @@ -117,6 +118,7 @@
import org.apache.hadoop.fs.azurebfs.utils.CRC64;
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
import org.apache.hadoop.fs.azurebfs.utils.ListUtils;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
import org.apache.hadoop.fs.impl.BackReference;
Expand Down Expand Up @@ -1272,7 +1274,7 @@ public String listStatus(final Path path, final String startFrom,
: generateContinuationTokenForNonXns(relativePath, startFrom);
}
}

List<FileStatus> fileStatusList = new ArrayList<>();
do {
try (AbfsPerfInfo perfInfo = startTracking("listStatus", "listPath")) {
ListResponseData listResponseData = listingClient.listPath(relativePath,
Expand All @@ -1281,9 +1283,9 @@ public String listStatus(final Path path, final String startFrom,
AbfsRestOperation op = listResponseData.getOp();
perfInfo.registerResult(op.getResult());
continuation = listResponseData.getContinuationToken();
List<FileStatus> fileStatusListInCurrItr = listResponseData.getFileStatusList();
List<VersionedFileStatus> fileStatusListInCurrItr = listResponseData.getFileStatusList();
if (fileStatusListInCurrItr != null && !fileStatusListInCurrItr.isEmpty()) {
fileStatuses.addAll(fileStatusListInCurrItr);
fileStatusList.addAll(fileStatusListInCurrItr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we are iterating over a list of VersionedFileStatus and maintaing a list of FileStatus, shouldn't the new list also be of VersionedFileStatus elements ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Elements are always of type VersionedFileStatus only. Just the reference type is getting changed because store.listStatus() works with FileStatus only. The final list to be returned has to be of type FileStatus. Although the objects hold by it are always VersionedFileStatus and can be type casted wherever needed.

}
perfInfo.registerSuccess(true);
countAggregate++;
Expand All @@ -1296,6 +1298,14 @@ public String listStatus(final Path path, final String startFrom,
}
} while (shouldContinue);

if (listingClient instanceof AbfsBlobClient) {
fileStatuses.addAll(ListUtils.getUniqueListResult(fileStatusList));
LOG.debug("ListBlob API returned a total of {} elements including duplicates."
+ "Number of unique Elements are {}", fileStatusList.size(), fileStatuses.size());
} else {
fileStatuses.addAll(fileStatusList);
}

return continuation;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;

import org.w3c.dom.Document;
Expand All @@ -52,7 +51,6 @@

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -402,7 +400,7 @@ public ListResponseData listPath(final String relativePath, final boolean recurs
AbfsRestOperation pathStatus = this.getPathStatus(relativePath, tracingContext, null, false);
BlobListResultSchema listResultSchema = getListResultSchemaFromPathStatus(relativePath, pathStatus);
LOG.debug("ListBlob attempted on a file path. Returning file status.");
List<FileStatus> fileStatusList = new ArrayList<>();
List<VersionedFileStatus> fileStatusList = new ArrayList<>();
for (BlobListResultEntrySchema entry : listResultSchema.paths()) {
fileStatusList.add(getVersionedFileStatusFromEntry(entry, uri));
}
Expand Down Expand Up @@ -1617,7 +1615,7 @@ public ListResponseData parseListPathResults(AbfsHttpOperation result, URI uri)
LOG.debug("ListBlobs listed {} blobs with {} as continuation token",
listResultSchema.paths().size(),
listResultSchema.getNextMarker());
return filterDuplicateEntriesAndRenamePendingFiles(listResultSchema, uri);
return filterRenamePendingFiles(listResultSchema, uri);
} catch (SAXException | IOException ex) {
throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, ex);
}
Expand Down Expand Up @@ -1917,39 +1915,23 @@ private List<AbfsHttpHeader> getMetadataHeadersList(final Hashtable<String, Stri
});

/**
* This is to handle duplicate listing entries returned by Blob Endpoint for
* implicit paths that also has a marker file created for them.
* This will retain entry corresponding to marker file and remove the BlobPrefix entry.
* This will also filter out all the rename pending json files in listing output.
* This will filter out all the rename pending json files in listing output.
* @param listResultSchema List of entries returned by Blob Endpoint.
* @param uri URI to be used for path conversion.
* @return List of entries after removing duplicates.
* @throws IOException if path conversion fails.
*/
@VisibleForTesting
public ListResponseData filterDuplicateEntriesAndRenamePendingFiles(
public ListResponseData filterRenamePendingFiles(
BlobListResultSchema listResultSchema, URI uri) throws IOException {
List<FileStatus> fileStatuses = new ArrayList<>();
List<VersionedFileStatus> fileStatuses = new ArrayList<>();
Map<Path, Integer> renamePendingJsonPaths = new HashMap<>();
TreeMap<String, BlobListResultEntrySchema> nameToEntryMap = new TreeMap<>();

for (BlobListResultEntrySchema entry : listResultSchema.paths()) {
if (StringUtils.isNotEmpty(entry.eTag())) {
// This is a blob entry. It is either a file or a marker blob.
// In both cases we will add this.
if (isRenamePendingJsonPathEntry(entry)) {
renamePendingJsonPaths.put(entry.path(), entry.contentLength().intValue());
} else {
nameToEntryMap.put(entry.name(), entry);
fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
}
if (isRenamePendingJsonPathEntry(entry)) {
renamePendingJsonPaths.put(entry.path(), entry.contentLength().intValue());
} else {
// This is a BlobPrefix entry. It is a directory with file inside
// This might have already been added as a marker blob.
if (!nameToEntryMap.containsKey(entry.name())) {
nameToEntryMap.put(entry.name(), entry);
fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
}
fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileAlreadyExistsException;
Expand Down Expand Up @@ -1353,7 +1352,7 @@ public ListResponseData parseListPathResults(AbfsHttpOperation result, URI uri)
LOG.debug("ListPath listed {} paths with {} as continuation token",
listResultSchema.paths().size(),
getContinuationFromResponse(result));
List<FileStatus> fileStatuses = new ArrayList<>();
List<VersionedFileStatus> fileStatuses = new ArrayList<>();
for (DfsListResultEntrySchema entry : listResultSchema.paths()) {
fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,33 @@
import java.util.List;
import java.util.Map;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;

/**
* This class is used to hold the response data for list operations.
* It contains a list of FileStatus objects, a map of rename pending JSON paths,
* It contains a list of VersionedFileStatus objects, a map of rename pending JSON paths,
* continuation token and the executed REST operation.
*/
public class ListResponseData {

private List<FileStatus> fileStatusList;
private List<VersionedFileStatus> fileStatusList;
private Map<Path, Integer> renamePendingJsonPaths;
private AbfsRestOperation executedRestOperation;
private String continuationToken;

/**
* Returns the list of FileStatus objects.
* @return the list of FileStatus objects
* Returns the list of VersionedFileStatus objects.
* @return the list of VersionedFileStatus objects
*/
public List<FileStatus> getFileStatusList() {
public List<VersionedFileStatus> getFileStatusList() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc update is needed.
Returns list of VersionedFileStatus objects

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taken

return fileStatusList;
}

/**
* Sets the list of FileStatus objects.
* @param fileStatusList the list of FileStatus objects
* Sets the list of VersionedFileStatus objects.
* @param fileStatusList the list of VersionedFileStatus objects
*/
public void setFileStatusList(final List<FileStatus> fileStatusList) {
public void setFileStatusList(final List<VersionedFileStatus> fileStatusList) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, javadoc update is needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taken

this.fileStatusList = fileStatusList;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.utils;

import java.util.ArrayList;
import java.util.List;
import java.util.TreeMap;

import org.apache.hadoop.fs.FileStatus;

/**
* Utility class for List operations.
*/
public final class ListUtils {

private ListUtils() {
// Private constructor to prevent instantiation
}

/**
* Utility method to remove duplicates from a list of FileStatus.
* ListBlob API of blob endpoint can return duplicate entries.
* @param originalList prone to have duplicates
* @return rectified list with no duplicates.
*/
public static List<FileStatus> getUniqueListResult(List<FileStatus> originalList) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logic looks a bit complex to read, can be refactored. One possible solution could be

public static List getUniqueListResult(List originalList) {
if (originalList == null || originalList.isEmpty()) {
return originalList;
}

List uniqueList = new ArrayList<>();
TreeMap<String, FileStatus> currentGroupMap = new TreeMap<>();

String currentPrefix = null;

for (FileStatus fileStatus : originalList) {
String fileName = fileStatus.getPath().getName();

if (currentPrefix == null || !fileName.startsWith(currentPrefix)) {
  // Start of a new group
  currentPrefix = fileName;
  currentGroupMap.clear();
}

if (!currentGroupMap.containsKey(fileName)) {
  currentGroupMap.put(fileName, fileStatus);
  uniqueList.add(fileStatus);
}

}

return uniqueList;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice Suggestion.
Taken

if (originalList == null || originalList.isEmpty()) {
return originalList;
}

TreeMap<String, FileStatus> nameToEntryMap = new TreeMap<>();
String prefix = null;
List<FileStatus> rectifiedFileStatusList = new ArrayList<>();

for (FileStatus current : originalList) {
String fileName = current.getPath().getName();

if (prefix == null || !fileName.startsWith(prefix)) {
// Prefix pattern breaks here. Reset Map and prefix.
prefix = fileName;
nameToEntryMap.clear();
}

// Add the current entry if it is not already added.
if (!nameToEntryMap.containsKey(fileName)) {
nameToEntryMap.put(fileName, current);
rectifiedFileStatusList.add(current);
}
}

return rectifiedFileStatusList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.net.SocketTimeoutException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -90,6 +92,8 @@ public class ITestAzureBlobFileSystemListStatus extends
AbstractAbfsIntegrationTest {
private static final int TEST_FILES_NUMBER = 6000;
public static final String TEST_CONTINUATION_TOKEN = "continuation";
private static final int TOTAL_NUMBER_OF_PATHS = 11;
private static final int NUMBER_OF_UNIQUE_PATHS = 7;

public ITestAzureBlobFileSystemListStatus() throws Exception {
super();
Expand Down Expand Up @@ -197,7 +201,7 @@ public void testListPathParsingFailure() throws Exception {
Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore();
Mockito.doReturn(spiedClient).when(spiedStore).getClient();

Mockito.doThrow(new SocketException(CONNECTION_RESET_MESSAGE)).when(spiedClient).filterDuplicateEntriesAndRenamePendingFiles(any(), any());
Mockito.doThrow(new SocketException(CONNECTION_RESET_MESSAGE)).when(spiedClient).filterRenamePendingFiles(any(), any());
List<FileStatus> fileStatuses = new ArrayList<>();
AbfsDriverException ex = intercept(AbfsDriverException.class,
() -> {
Expand Down Expand Up @@ -532,6 +536,87 @@ public void testEmptyContinuationToken() throws Exception {
.describedAs("Listing Size Not as expected").hasSize(1);
}

/**
* Test to verify that listStatus returns the correct file status
* after removing duplicates across multiple iterations of list blobs.
* Also verifies that in case of non-empty explicit dir,
* entry corresponding to marker blob is returned.
* @throws Exception if test fails.
*/
@Test
public void testDuplicateEntriesAcrossListBlobIterations() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add one more test where there are more than one files in the directory and max list result is 1 and multiple list status calls and verify the overall file statuses don't have any duplicate entries

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The duplicate can only happen in case of non-empty explicit directories. The store won't allow me to create duplicate paths otherwise.
What I can do is modify this test to have more than one entry. A mix of all types of entries and make sure we still don't have any duplicates.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified test as above

AzureBlobFileSystem fs = Mockito.spy(getFileSystem());
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
store.getAbfsConfiguration().setListMaxResults(1);
AbfsClient client = Mockito.spy(store.getClient());

Mockito.doReturn(store).when(fs).getAbfsStore();
Mockito.doReturn(client).when(store).getClient();

/*
* Following entries will be created inside the root path.
* 0. /A - implicit directory without any marker blob
* 1. /a - marker file for explicit directory
* 2. /a/file1 - normal file inside explicit directory
* 3. /b - normal file inside root
* 4. /c - marker file for explicit directory
* 5. /c.bak - marker file for explicit directory
* 6. /c.bak/file2 - normal file inside explicit directory
* 7. /c/file3 - normal file inside explicit directory
* 8. /d - implicit directory
* 9. /e - marker file for explicit directory
* 10. /e/file4 - normal file inside explicit directory
*/
// Create Path 0
createAzCopyFolder(new Path("/A"));

// Create Path 1 and 2.
fs.create(new Path("/a/file1"));

// Create Path 3
fs.create(new Path("/b"));

// Create Path 4 and 7
fs.create(new Path("/c/file3"));

// Create Path 5 and 6
fs.create(new Path("/c.bak/file2"));

// Create Path 8
createAzCopyFolder(new Path("/d"));

// Create Path 9 and 10
fs.create(new Path("/e/file4"));

FileStatus[] fileStatuses = fs.listStatus(new Path(ROOT_PATH));

// Assert that client.listPath was called 11 times.
// This will assert server returned 11 entries in total.
Mockito.verify(client, Mockito.times(TOTAL_NUMBER_OF_PATHS))
.listPath(eq(ROOT_PATH), eq(false), eq(1), any(), any(), any());

// Assert that after duplicate removal, only 7 unique entries are returned.
Assertions.assertThat(fileStatuses.length)
.describedAs("List size is not expected").isEqualTo(NUMBER_OF_UNIQUE_PATHS);

// Assert that for duplicates, entry corresponding to marker blob is returned.
assertImplicitDirectoryFileStatus(fileStatuses[0], fs.makeQualified(new Path("/A")));
assertExplicitDirectoryFileStatus(fileStatuses[1], fs.makeQualified(new Path("/a")));
assertFilePathFileStatus(fileStatuses[2], fs.makeQualified(new Path("/b")));
assertExplicitDirectoryFileStatus(fileStatuses[3], fs.makeQualified(new Path("/c")));
assertExplicitDirectoryFileStatus(fileStatuses[4], fs.makeQualified(new Path("/c.bak")));
assertImplicitDirectoryFileStatus(fileStatuses[5], fs.makeQualified(new Path("/d")));
assertExplicitDirectoryFileStatus(fileStatuses[6], fs.makeQualified(new Path("/e")));

// Assert that there are no duplicates in the returned file statuses.
Set<Path> uniquePaths = new HashSet<>();
for (FileStatus fileStatus : fileStatuses) {
Assertions.assertThat(uniquePaths.add(fileStatus.getPath()))
.describedAs("Duplicate Entries found")
.isTrue();
}
}

private void assertFilePathFileStatus(final FileStatus fileStatus,
final Path qualifiedPath) {
Assertions.assertThat(fileStatus.getPath())
Expand Down
Loading