Skip to content

Commit

Permalink
Merge pull request apache#67 from ABFSDriver/listStatusOnBlob
Browse files Browse the repository at this point in the history
Adding support for listStatus() File System Call on Blob Endpoint
  • Loading branch information
anujmodi2021 authored Jun 15, 2023
2 parents 9b5744f + 73d05ca commit 6216d4c
Show file tree
Hide file tree
Showing 13 changed files with 756 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
* @param tracingContext The tracingContext.
*/
private void validatePathOrSubPathDoesNotExist(final Path path, TracingContext tracingContext) throws IOException {
List<BlobProperty> blobList = abfsStore.getListBlobs(path, null,
List<BlobProperty> blobList = abfsStore.getListBlobs(path, null, null,
tracingContext, 2, true);
if (blobList.size() > 0 || abfsStore.checkIsDirectory(path, tracingContext)) {
throw new AbfsRestOperationException(HTTP_CONFLICT,
Expand Down Expand Up @@ -788,7 +788,7 @@ private PathInformation getPathInformation(final Path path,
if (getAbfsStore().getAbfsConfiguration().getPrefixMode()
== PrefixMode.BLOB) {
List<BlobProperty> blobProperties = getAbfsStore()
.getListBlobs(path, null, tracingContext, 2, true);
.getListBlobs(path, null, null, tracingContext, 2, true);
if (blobProperties.size() > 0) {
return new PathInformation(true, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.WeakHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -635,7 +636,7 @@ BlobCopyProgress handleCopyInProgress(final Path dstPath,
throw new AbfsRestOperationException(
COPY_BLOB_FAILED.getStatusCode(), COPY_BLOB_FAILED.getErrorCode(),
String.format("copy to path %s failed due to: %s",
dstPath.toUri().getPath(), blobProperty.getStatusDescription()),
dstPath.toUri().getPath(), blobProperty.getCopyStatusDescription()),
new Exception(COPY_BLOB_FAILED.getErrorCode()));
}
if (COPY_STATUS_ABORTED.equalsIgnoreCase(blobProperty.getCopyStatus())) {
Expand Down Expand Up @@ -668,7 +669,7 @@ BlobProperty getBlobProperty(Path blobPath,
blobProperty.setCopyId(opResult.getResponseHeader(X_MS_COPY_ID));
blobProperty.setPath(blobPath);
blobProperty.setCopySourceUrl(opResult.getResponseHeader(X_MS_COPY_SOURCE));
blobProperty.setStatusDescription(
blobProperty.setCopyStatusDescription(
opResult.getResponseHeader(X_MS_COPY_STATUS_DESCRIPTION));
blobProperty.setCopyStatus(opResult.getResponseHeader(X_MS_COPY_STATUS));
blobProperty.setContentLength(
Expand Down Expand Up @@ -725,7 +726,7 @@ public Hashtable<String, String> getBlobMetadata(final Path path,
// The path does not exist explicitly.
// Check here if the path is an implicit dir
if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND && !path.isRoot()) {
List<BlobProperty> blobProperties = getListBlobs(path, null,
List<BlobProperty> blobProperties = getListBlobs(path, null, null,
tracingContext, 2, true);
if (blobProperties.size() == 0) {
throw ex;
Expand Down Expand Up @@ -825,7 +826,7 @@ private List<AbfsHttpHeader> getRequestHeadersForMetadata(Hashtable<String, Stri
* @throws AbfsRestOperationException exception from server-calls / xml-parsing
*/
public List<BlobProperty> getListBlobs(Path sourceDirBlobPath,
String prefix, TracingContext tracingContext,
String prefix, String delimiter, TracingContext tracingContext,
final Integer maxResult, final Boolean isDefinitiveDirSearch)
throws AzureBlobFileSystemException {
List<BlobProperty> blobProperties = new ArrayList<>();
Expand All @@ -837,9 +838,12 @@ public List<BlobProperty> getListBlobs(Path sourceDirBlobPath,
? ROOT_PATH
: EMPTY_STRING);
}
if (delimiter == null) {
delimiter = "";
}
do {
AbfsRestOperation op = client.getListBlobs(
nextMarker, prefix, maxResult, tracingContext
nextMarker, prefix, delimiter, maxResult, tracingContext
);
BlobList blobList = op.getResult().getBlobList();
nextMarker = blobList.getNextMarker();
Expand Down Expand Up @@ -1248,7 +1252,7 @@ public AbfsInputStream openFileForRead(final Path path,
if (e.getStatusCode() != HTTP_NOT_FOUND) {
throw e;
}
List<BlobProperty> blobsList = getListBlobs(new Path(relativePath), null,
List<BlobProperty> blobsList = getListBlobs(new Path(relativePath), null, null,
tracingContext, 2, true);
if (blobsList.size() > 0) {
throw new AbfsRestOperationException(
Expand Down Expand Up @@ -1341,7 +1345,7 @@ public OutputStream openFileForWrite(final Path path,
// The path does not exist explicitly.
// Check here if the path is an implicit dir
if (getPrefixMode() == PrefixMode.BLOB && ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
List<BlobProperty> blobProperties = getListBlobs(path, null,
List<BlobProperty> blobProperties = getListBlobs(path, null, null,
tracingContext, 2, true);
if (blobProperties.size() != 0) {
throw new AbfsRestOperationException(
Expand Down Expand Up @@ -1440,7 +1444,7 @@ public void rename(final Path source, final Path destination,
/*
* Fetch the list of blobs in the given sourcePath.
*/
List<BlobProperty> srcBlobProperties = getListBlobs(source, null,
List<BlobProperty> srcBlobProperties = getListBlobs(source, null, null,
tracingContext, null, true);
BlobProperty blobPropOnSrc;
if (srcBlobProperties.size() > 0) {
Expand Down Expand Up @@ -1822,7 +1826,7 @@ public FileStatus getFileStatusOverBlob(final Path path,
// The path does not exist explicitly.
// Check here if the path is an implicit dir
if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND && !path.isRoot()) {
List<BlobProperty> blobProperties = getListBlobs(path,null, tracingContext, 2, true);
List<BlobProperty> blobProperties = getListBlobs(path,null, null, tracingContext, 2, true);
if (blobProperties.size() == 0) {
throw ex;
}
Expand Down Expand Up @@ -1891,16 +1895,66 @@ public String listStatus(final Path path, final String startFrom,
startFrom);

final String relativePath = getRelativePath(path);
boolean useBlobEndpointListing = getPrefixMode() == PrefixMode.BLOB;

if (continuation == null || continuation.isEmpty()) {
// generate continuation token if a valid startFrom is provided.
if (startFrom != null && !startFrom.isEmpty()) {
// In case startFrom is passed, fallback to DFS for now
// TODO: Support startFrom for List Blobs on Blob Endpoint
useBlobEndpointListing = false;
continuation = getIsNamespaceEnabled(tracingContext)
? generateContinuationTokenForXns(startFrom)
: generateContinuationTokenForNonXns(relativePath, startFrom);
}
}

if (useBlobEndpointListing) {
FileStatus status = getFileStatusOverBlob(path, tracingContext);
if (status.isFile()) {
fileStatuses.add(status);
return continuation;
}
// For blob endpoint continuation will be used as nextMarker.
String prefix = relativePath + ROOT_PATH;
String delimiter = ROOT_PATH;
if (path.isRoot()) {
prefix = null;
}

TreeMap<String, FileStatus> fileMetadata = new TreeMap<>();
do {
/*
* List Blob calls will be made with delimiter "/". This will ensure
* that all the children of a folder not listed out separately. Instead,
* a single entry corresponding to the directory name will be returned as BlobPrefix.
*/
try (AbfsPerfInfo perfInfo = startTracking("listStatus", "getListBlobs")) {
AbfsRestOperation op = client.getListBlobs(
continuation, prefix, delimiter, abfsConfiguration.getListMaxResults(),
tracingContext
);
perfInfo.registerResult(op.getResult());
BlobList blobList = op.getResult().getBlobList();
continuation = blobList.getNextMarker();

addBlobListAsFileStatus(blobList, fileMetadata);

perfInfo.registerSuccess(true);
countAggregate++;
shouldContinue =
fetchAll && continuation != null && !continuation.isEmpty();

if (!shouldContinue) {
perfInfo.registerAggregates(startAggregate, countAggregate);
}
}
} while (shouldContinue);

fileStatuses.addAll(fileMetadata.values());
return continuation;
}

do {
try (AbfsPerfInfo perfInfo = startTracking("listStatus", "listPath")) {
AbfsRestOperation op = client.listPath(relativePath, false,
Expand Down Expand Up @@ -1967,6 +2021,67 @@ public String listStatus(final Path path, final String startFrom,
return continuation;
}

private void addBlobListAsFileStatus(final BlobList blobList,
TreeMap<String, FileStatus> fileMetadata) throws IOException {

/*
* Here before adding the data we might have to remove the duplicates.
* List Blobs call over blob endpoint returns two types of entries: Blob
* and BlobPrefix. In the case where ABFS generated the data,
* there will be a marker blob for each "directory" created by driver,
* and we will receive them as a Blob. If there are also files within this
* "directory", we will also receive a BlobPrefix. To further
* complicate matters, the data may not be generated by ABFS Driver, in
* which case we may not have a marker blob for each "directory". In this
* the only way to know there is a directory is using BlobPrefix entry.
* So, sometimes we receive both a Blob and a BlobPrefix for directories,
* and sometimes we receive only BlobPrefix as directory. We remove duplicates
* but prefer Blob over BlobPrefix.
*/
List<BlobProperty> blobProperties = blobList.getBlobPropertyList();

for (BlobProperty entry: blobProperties) {
String blobKey = entry.getName();
final String owner = identityTransformer.transformIdentityForGetRequest(
entry.getOwner(), true, userName);
final String group = identityTransformer.transformIdentityForGetRequest(
entry.getGroup(), false, primaryUserGroup);
final FsPermission fsPermission = entry.getPermission() == null
? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
: AbfsPermission.valueOf(entry.getPermission());
final boolean hasAcl = entry.getAcl() != null;
long blockSize = abfsConfiguration.getAzureBlockSize();

Path entryPath = entry.getPath();
entryPath = entryPath.makeQualified(this.uri, entryPath);

FileStatus fileStatus = new VersionedFileStatus(
owner,
group,
fsPermission,
hasAcl,
entry.getContentLength(),
entry.getIsDirectory(),
1,
blockSize,
entry.getLastModifiedTime(),
entryPath,
entry.getETag());

if (entry.getETag() != null) {
// This is a blob entry. It is either a file or a marker blob.
// In both cases we will add this.
fileMetadata.put(blobKey, fileStatus);
} else {
// This is a BlobPrefix entry. It is a directory with file inside
// This might have already been added as a marker blob.
if (!fileMetadata.containsKey(blobKey)) {
fileMetadata.put(blobKey, fileStatus);
}
}
}
}

// generate continuation token for xns account
private String generateContinuationTokenForXns(final String firstEntryName) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(firstEntryName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,19 @@ public final class AbfsHttpConstants {
public static final String COPY_STATUS_ABORTED = "aborted";
public static final String COPY_STATUS_FAILED = "failed";
public static final String HDI_ISFOLDER = "hdi_isfolder";
public static final String ETAG = "Etag";
public static final String LAST_MODIFIED_TIME = "Last-Modified";
public static final String CREATION_TIME = "Creation-Time";
public static final String OWNER = "Owner";
public static final String GROUP = "Group";
public static final String PERMISSIONS = "Permissions";
public static final String ACL = "Acl";
public static final String COPY_ID = "CopyId";
public static final String COPY_STATUS = "CopyStatus";
public static final String COPY_SOURCE = "CopySource";
public static final String COPY_PROGRESS = "CopyProgress";
public static final String COPY_COMPLETION_TIME = "CopyCompletionTime";
public static final String COPY_STATUS_DESCRIPTION = "CopyStatusDescription";

private AbfsHttpConstants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public final class HttpQueryParams {
public static final String QUERY_PARAM_UPN = "upn";
public static final String QUERY_PARAM_BLOBTYPE = "blobtype";
public static final String QUERY_PARAM_BLOCKID = "blockid";
public static final String QUERY_PARAM_DELIMITER = "delimiter";

//query params for SAS
public static final String QUERY_PARAM_SAOID = "saoid";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1465,6 +1465,12 @@ public AbfsRestOperation setBlobMetadata(Path blobPath, List<AbfsHttpHeader> met
* @param marker optional value. To be sent in case this method call in a non-first
* iteration to the blobList API. Value has to be equal to the field NextMarker in the response
* of previous iteration for the same operation.
* @param prefix optional value. Filters the results to return only blobs
* with names that begin with the specified prefix
* @param delimiter Optional. When the request includes this parameter,
* the operation returns a BlobPrefix element in the response body.
* This element acts as a placeholder for all blobs with names that begin
* with the same substring, up to the appearance of the delimiter character.
* @param maxResult define how many blobs can client handle in server response.
* In case maxResult <= 5000, server sends number of blobs equal to the value. In
* case maxResult > 5000, server sends maximum 5000 blobs.
Expand All @@ -1477,12 +1483,14 @@ public AbfsRestOperation setBlobMetadata(Path blobPath, List<AbfsHttpHeader> met
*/
public AbfsRestOperation getListBlobs(String marker,
String prefix,
String delimiter,
Integer maxResult,
TracingContext tracingContext)
throws AzureBlobFileSystemException {
AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESTYPE, CONTAINER);
abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, QUERY_PARAM_COMP_VALUE_LIST);
abfsUriQueryBuilder.addQuery(QUERY_PARAM_DELIMITER, delimiter);
abfsUriQueryBuilder.addQuery(QUERY_PARAM_INCLUDE,
QUERY_PARAM_INCLUDE_VALUE_METADATA);
prefix = getDirectoryQueryParameter(prefix);
Expand Down
Loading

0 comments on commit 6216d4c

Please sign in to comment.