Skip to content

HADOOP-19522 [branch-3.4]: ABFS: [FnsOverBlob] Rename Recovery Should Succeed When Marker File Exists with Destination Directory #7633

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableMetric;

import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.ATOMIC_RENAME_PATH_ATTEMPTS;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_RECEIVED;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_SENT;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_APPEND;
Expand Down Expand Up @@ -134,7 +135,8 @@ public class AbfsCountersImpl implements AbfsCounters {
SERVER_UNAVAILABLE,
RENAME_RECOVERY,
METADATA_INCOMPLETE_RENAME_FAILURES,
RENAME_PATH_ATTEMPTS
RENAME_PATH_ATTEMPTS,
ATOMIC_RENAME_PATH_ATTEMPTS
};

private static final AbfsStatistic[] DURATION_TRACKER_LIST = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ public enum AbfsStatistic {
"Number of times rename operation failed due to metadata being "
+ "incomplete"),
RENAME_PATH_ATTEMPTS("rename_path_attempts",
"Number of times we attempt to rename a path internally");
"Number of times we attempt to rename a path internally"),
ATOMIC_RENAME_PATH_ATTEMPTS("atomic_rename_path_attempts",
"Number of times atomic rename attempted");

private String statName;
private String statDescription;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,20 @@ public ListResponseData listPath(final String relativePath, final boolean recurs
if (tracingContext.getOpType() == FSOperationType.LISTSTATUS
&& op.getResult() != null
&& op.getResult().getStatusCode() == HTTP_OK) {
retryRenameOnAtomicEntriesInListResults(tracingContext,
boolean isRenameRecovered = retryRenameOnAtomicEntriesInListResults(tracingContext,
listResponseData.getRenamePendingJsonPaths());
if (isRenameRecovered) {
LOG.debug("Retrying list operation after rename recovery.");
// Retry the list operation to get the updated list of paths after rename recovery.
AbfsRestOperation retryListOp = getAbfsRestOperation(
AbfsRestOperationType.ListBlobs,
HTTP_METHOD_GET,
url,
requestHeaders);
retryListOp.execute(tracingContext);
listResponseData = parseListPathResults(retryListOp.getResult(), uri);
listResponseData.setOp(retryListOp);
}
}

if (isEmptyListResults(listResponseData) && is404CheckRequired) {
Expand Down Expand Up @@ -425,15 +437,16 @@ public ListResponseData listPath(final String relativePath, final boolean recurs
* @param tracingContext tracing context
* @throws AzureBlobFileSystemException if rest operation or response parsing fails.
*/
private void retryRenameOnAtomicEntriesInListResults(TracingContext tracingContext,
private boolean retryRenameOnAtomicEntriesInListResults(TracingContext tracingContext,
Map<Path, Integer> renamePendingJsonPaths) throws AzureBlobFileSystemException {
if (renamePendingJsonPaths == null || renamePendingJsonPaths.isEmpty()) {
return;
return false;
}

for (Map.Entry<Path, Integer> entry : renamePendingJsonPaths.entrySet()) {
retryRenameOnAtomicKeyPath(entry.getKey(), entry.getValue(), tracingContext);
}
return true;
}

/**{@inheritDoc}*/
Expand Down Expand Up @@ -813,7 +826,7 @@ public AbfsClientRenameResult renamePath(final String source,
destination, sourceEtag, isAtomicRenameKey(source), tracingContext
);
try {
if (blobRenameHandler.execute()) {
if (blobRenameHandler.execute(false)) {
final AbfsUriQueryBuilder abfsUriQueryBuilder
= createDefaultUriQueryBuilder();
final URL url = createRequestUrl(destination,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.store.LogExactlyOnce;
Expand Down Expand Up @@ -1540,8 +1541,11 @@ public void getMetricCall(TracingContext tracingContext) throws IOException {
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);

final URL url = createRequestUrl(new URL(abfsMetricUrl), EMPTY_STRING, abfsUriQueryBuilder.toString());

// Construct the URL for the metric call
// In case of blob storage, the URL is changed to DFS URL
final URL url = UriUtils.changeUrlFromBlobToDfs(
createRequestUrl(new URL(abfsMetricUrl),
EMPTY_STRING, abfsUriQueryBuilder.toString()));
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.GetFileSystemProperties,
HTTP_METHOD_HEAD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
Expand Down Expand Up @@ -115,14 +116,15 @@ int getMaxConsumptionParallelism() {

/**
* Orchestrates the rename operation.
* @param isRenameRecovery true if the rename operation is a recovery of a previous failed atomic rename operation
*
* @return AbfsClientRenameResult containing the result of the rename operation
* @throws AzureBlobFileSystemException if server call fails
*/
public boolean execute() throws AzureBlobFileSystemException {
public boolean execute(final boolean isRenameRecovery) throws AzureBlobFileSystemException {
PathInformation pathInformation = getPathInformation(src, tracingContext);
boolean result = false;
if (preCheck(src, dst, pathInformation)) {
if (preCheck(src, dst, pathInformation, isRenameRecovery)) {
RenameAtomicity renameAtomicity = null;
if (pathInformation.getIsDirectory()
&& pathInformation.getIsImplicit()) {
Expand All @@ -147,6 +149,8 @@ public boolean execute() throws AzureBlobFileSystemException {
* recovers the lease on a log file, to gain exclusive access to it, before
* it splits it.
*/
getAbfsClient().getAbfsCounters()
.incrementCounter(AbfsStatistic.ATOMIC_RENAME_PATH_ATTEMPTS, 1);
if (srcAbfsLease == null) {
srcAbfsLease = takeLease(src, srcEtag);
}
Expand Down Expand Up @@ -192,6 +196,13 @@ private boolean finalSrcRename() throws AzureBlobFileSystemException {
tracingContext.setOperatedBlobCount(operatedBlobCount.get() + 1);
try {
return renameInternal(src, dst);
} catch(AbfsRestOperationException e) {
if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
// If the destination path already exists, then delete the source path.
getAbfsClient().deleteBlobPath(src, null, tracingContext);
return true;
}
throw e;
} finally {
tracingContext.setOperatedBlobCount(null);
}
Expand Down Expand Up @@ -249,16 +260,17 @@ private boolean containsColon(Path p) {
* @param src source path
* @param dst destination path
* @param pathInformation object in which path information of the source path would be stored
* @param isRenameRecovery true if the rename operation is a recovery of a previous failed atomic rename operation
*
* @return true if the pre-checks pass
* @throws AzureBlobFileSystemException if server call fails or given paths are invalid.
*/
private boolean preCheck(final Path src, final Path dst,
final PathInformation pathInformation)
final PathInformation pathInformation, final boolean isRenameRecovery)
throws AzureBlobFileSystemException {
validateDestinationIsNotSubDir(src, dst);
validateSourcePath(pathInformation);
validateDestinationPathNotExist(src, dst, pathInformation);
validateDestinationPathNotExist(src, dst, pathInformation, isRenameRecovery);
validateDestinationParentExist(src, dst, pathInformation);

return true;
Expand Down Expand Up @@ -319,22 +331,22 @@ private void validateSourcePath(final PathInformation pathInformation)
* @param src source path
* @param dst destination path
* @param pathInformation object containing the path information of the source path
* @param isRenameRecovery true if the rename operation is a recovery of a previous failed atomic rename operation
*
* @throws AbfsRestOperationException if the destination path already exists
*/
private void validateDestinationPathNotExist(final Path src,
final Path dst,
final PathInformation pathInformation)
throws AzureBlobFileSystemException {
final Path dst, final PathInformation pathInformation,
final boolean isRenameRecovery) throws AzureBlobFileSystemException {
/*
* Destination path name can be same to that of source path name only in the
* case of a directory rename.
*
* In case the directory is being renamed to some other name, the destination
* check would happen on the AzureBlobFileSystem#rename method.
*/
if (pathInformation.getIsDirectory() && dst.getName()
.equals(src.getName())) {
if (!isRenameRecovery && pathInformation.getIsDirectory()
&& dst.getName().equals(src.getName())) {
PathInformation dstPathInformation = getPathInformation(
dst,
tracingContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void redo() throws AzureBlobFileSystemException {
abfsClient, srcEtag, true,
true, tracingContext);

blobRenameHandler.execute();
blobRenameHandler.execute(true);
}
} finally {
deleteRenamePendingJson();
Expand Down
Loading