Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.WeakHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -76,6 +77,8 @@
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.apache.hadoop.fs.azurebfs.contracts.services.BlobListResultEntrySchema;
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
import org.apache.hadoop.fs.azurebfs.services.ListResponseData;
import org.apache.hadoop.fs.azurebfs.enums.Trilean;
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
Expand Down Expand Up @@ -1273,6 +1276,7 @@ public String listStatus(final Path path, final String startFrom,
}
}

TreeMap<String, VersionedFileStatus> nameToEntryMap = new TreeMap<>();
do {
try (AbfsPerfInfo perfInfo = startTracking("listStatus", "listPath")) {
ListResponseData listResponseData = listingClient.listPath(relativePath,
Expand All @@ -1281,9 +1285,17 @@ public String listStatus(final Path path, final String startFrom,
AbfsRestOperation op = listResponseData.getOp();
perfInfo.registerResult(op.getResult());
continuation = listResponseData.getContinuationToken();
List<FileStatus> fileStatusListInCurrItr = listResponseData.getFileStatusList();
List<VersionedFileStatus> fileStatusListInCurrItr = listResponseData.getFileStatusList();
if (fileStatusListInCurrItr != null && !fileStatusListInCurrItr.isEmpty()) {
fileStatuses.addAll(fileStatusListInCurrItr);
if (listingClient instanceof AbfsBlobClient) {
/* Blob Endpoint can return duplicate entries for non-empty explicit
* directories. Such entries can come across multiple iterations of
* list call and hence need to be filtered here.
*/
filterDuplicateEntriesForBlobClient(nameToEntryMap, fileStatusListInCurrItr, fileStatuses);
} else {
fileStatuses.addAll(fileStatusListInCurrItr);
}
}
perfInfo.registerSuccess(true);
countAggregate++;
Expand All @@ -1299,6 +1311,29 @@ public String listStatus(final Path path, final String startFrom,
return continuation;
}

private void filterDuplicateEntriesForBlobClient(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add javadocs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

TreeMap<String, VersionedFileStatus> nameToEntryMap,
List<VersionedFileStatus> fileStatusListInCurrItr,
List<FileStatus> fileStatuses) {
for (VersionedFileStatus fileStatus : fileStatusListInCurrItr) {
String entryName = fileStatus.getPath().getName();
if (StringUtils.isNotEmpty(fileStatus.getEtag())) {
// This is a blob entry. It is either a file or a marker blob.
// In both cases we will add this.
nameToEntryMap.put(entryName, fileStatus);
fileStatuses.add(fileStatus);
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be replaced with else if instead of nested if.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taken

// This is a BlobPrefix entry.
// It is a directory with file inside
// This might have already been added as a marker blob.
if (!nameToEntryMap.containsKey(entryName)) {
nameToEntryMap.put(entryName, fileStatus);
fileStatuses.add(fileStatus);
}
}
}
}

// generate continuation token for xns account
private String generateContinuationTokenForXns(final String firstEntryName) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(firstEntryName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ public ListResponseData listPath(final String relativePath, final boolean recurs
AbfsRestOperation pathStatus = this.getPathStatus(relativePath, tracingContext, null, false);
BlobListResultSchema listResultSchema = getListResultSchemaFromPathStatus(relativePath, pathStatus);
LOG.debug("ListBlob attempted on a file path. Returning file status.");
List<FileStatus> fileStatusList = new ArrayList<>();
List<VersionedFileStatus> fileStatusList = new ArrayList<>();
for (BlobListResultEntrySchema entry : listResultSchema.paths()) {
fileStatusList.add(getVersionedFileStatusFromEntry(entry, uri));
}
Expand Down Expand Up @@ -1617,7 +1617,7 @@ public ListResponseData parseListPathResults(AbfsHttpOperation result, URI uri)
LOG.debug("ListBlobs listed {} blobs with {} as continuation token",
listResultSchema.paths().size(),
listResultSchema.getNextMarker());
return filterDuplicateEntriesAndRenamePendingFiles(listResultSchema, uri);
return filterRenamePendingFiles(listResultSchema, uri);
} catch (SAXException | IOException ex) {
throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, ex);
}
Expand Down Expand Up @@ -1927,29 +1927,16 @@ private List<AbfsHttpHeader> getMetadataHeadersList(final Hashtable<String, Stri
* @throws IOException if path conversion fails.
*/
@VisibleForTesting
public ListResponseData filterDuplicateEntriesAndRenamePendingFiles(
public ListResponseData filterRenamePendingFiles(
BlobListResultSchema listResultSchema, URI uri) throws IOException {
List<FileStatus> fileStatuses = new ArrayList<>();
List<VersionedFileStatus> fileStatuses = new ArrayList<>();
Map<Path, Integer> renamePendingJsonPaths = new HashMap<>();
TreeMap<String, BlobListResultEntrySchema> nameToEntryMap = new TreeMap<>();

for (BlobListResultEntrySchema entry : listResultSchema.paths()) {
if (StringUtils.isNotEmpty(entry.eTag())) {
// This is a blob entry. It is either a file or a marker blob.
// In both cases we will add this.
if (isRenamePendingJsonPathEntry(entry)) {
renamePendingJsonPaths.put(entry.path(), entry.contentLength().intValue());
} else {
nameToEntryMap.put(entry.name(), entry);
fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
}
if (isRenamePendingJsonPathEntry(entry)) {
renamePendingJsonPaths.put(entry.path(), entry.contentLength().intValue());
} else {
// This is a BlobPrefix entry. It is a directory with file inside
// This might have already been added as a marker blob.
if (!nameToEntryMap.containsKey(entry.name())) {
nameToEntryMap.put(entry.name(), entry);
fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
}
fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1353,7 +1353,7 @@ public ListResponseData parseListPathResults(AbfsHttpOperation result, URI uri)
LOG.debug("ListPath listed {} paths with {} as continuation token",
listResultSchema.paths().size(),
getContinuationFromResponse(result));
List<FileStatus> fileStatuses = new ArrayList<>();
List<VersionedFileStatus> fileStatuses = new ArrayList<>();
for (DfsListResultEntrySchema entry : listResultSchema.paths()) {
fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
*/
public class ListResponseData {

private List<FileStatus> fileStatusList;
private List<VersionedFileStatus> fileStatusList;
private Map<Path, Integer> renamePendingJsonPaths;
private AbfsRestOperation executedRestOperation;
private String continuationToken;
Expand All @@ -40,15 +40,15 @@ public class ListResponseData {
* Returns the list of FileStatus objects.
* @return the list of FileStatus objects
*/
public List<FileStatus> getFileStatusList() {
public List<VersionedFileStatus> getFileStatusList() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc update is needed.
Returns list of VersionedFileStatus objects

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taken

return fileStatusList;
}

/**
* Sets the list of FileStatus objects.
* @param fileStatusList the list of FileStatus objects
*/
public void setFileStatusList(final List<FileStatus> fileStatusList) {
public void setFileStatusList(final List<VersionedFileStatus> fileStatusList) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, javadoc update is needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taken

this.fileStatusList = fileStatusList;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public void testListPathParsingFailure() throws Exception {
Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore();
Mockito.doReturn(spiedClient).when(spiedStore).getClient();

Mockito.doThrow(new SocketException(CONNECTION_RESET_MESSAGE)).when(spiedClient).filterDuplicateEntriesAndRenamePendingFiles(any(), any());
Mockito.doThrow(new SocketException(CONNECTION_RESET_MESSAGE)).when(spiedClient).filterRenamePendingFiles(any(), any());
List<FileStatus> fileStatuses = new ArrayList<>();
AbfsDriverException ex = intercept(AbfsDriverException.class,
() -> {
Expand Down Expand Up @@ -532,6 +532,28 @@ public void testEmptyContinuationToken() throws Exception {
.describedAs("Listing Size Not as expected").hasSize(1);
}

@Test
public void testDuplicateEntriesAcrossListBlobIterations() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add one more test where there are more than one files in the directory and max list result is 1 and multiple list status calls and verify the overall file statuses don't have any duplicate entries

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The duplicate can only happen in case of non-empty explicit directories. The store won't allow me to create duplicate paths otherwise.
What I can do is modify this test to have more than one entry. A mix of all types of entries and make sure we still don't have any duplicates.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified test as above

AzureBlobFileSystem fs = Mockito.spy(getFileSystem());
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
store.getAbfsConfiguration().setListMaxResults(1);
AbfsClient client = Mockito.spy(store.getClient());

Mockito.doReturn(store).when(fs).getAbfsStore();
Mockito.doReturn(client).when(store).getClient();

// Create a non-empty explicit directory under root
Path dir = new Path("/a");
Path fileInsideDir = new Path("/a/file");
createAzCopyFolder(dir);
fs.create(fileInsideDir);

FileStatus[] fileStatuses = fs.listStatus(new Path(ROOT_PATH));
Assertions.assertThat(fileStatuses.length)
.describedAs("List size is not expected").isEqualTo(1);
assertExplicitDirectoryFileStatus(fileStatuses[0], fs.makeQualified(dir));
}

private void assertFilePathFileStatus(final FileStatus fileStatus,
final Path qualifiedPath) {
Assertions.assertThat(fileStatus.getPath())
Expand Down