Skip to content

HADOOP-18012. ABFS: Using Source Path eTags for Rename Idemptonency checks #5488

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
e672ae3
Rename retry recovery changes
sreeb-msft Mar 17, 2023
f53b43c
Changes to mock behavior
sreeb-msft Mar 17, 2023
1abc380
Added comments
sreeb-msft Mar 17, 2023
e8ce29f
Changes to test methods
sreeb-msft Mar 17, 2023
4426ad7
Added config for rename resilience
sreeb-msft Mar 20, 2023
1dcd6ce
Namespace check + IOStats counters test
sreeb-msft Mar 20, 2023
d93ae01
Enabling tests at fs level
sreeb-msft Mar 20, 2023
a2cd3be
Changesfor dir rename recovery skip
sreeb-msft Mar 20, 2023
e7b6b86
Change in integration tests
sreeb-msft Mar 21, 2023
ff9bd4e
Test Changes
sreeb-msft Mar 21, 2023
59db26d
Resolving conflicts
steveloughran Mar 20, 2023
13b6c24
Test + HNS check changes
sreeb-msft Mar 23, 2023
4801a9f
HNS check changes
sreeb-msft Mar 24, 2023
df59cfb
isDir variable changes
sreeb-msft Mar 24, 2023
749dc42
Review feedback changes
sreeb-msft Mar 24, 2023
82f4ce5
Import changes
sreeb-msft Mar 24, 2023
a009695
Changed setAbfsClient back to protected
sreeb-msft Mar 24, 2023
dc1e7c8
Spacing Changes
sreeb-msft Mar 24, 2023
aec848c
Spacing Changes
sreeb-msft Mar 24, 2023
7f9dfa4
Recovery to be attempted even for rename parent path not found
sreeb-msft Mar 24, 2023
63957ff
Nit changes
sreeb-msft Mar 24, 2023
ff0141b
Reverting HNS Check changes
sreeb-msft Mar 27, 2023
00e286a
Metadata incomplete clause change
sreeb-msft Mar 28, 2023
382bbf9
Nit changes
sreeb-msft Mar 28, 2023
6686fbe
Style changes
sreeb-msft Mar 31, 2023
9164102
Merge branch 'trunk' into HADOOP-18012
sreeb-msft Mar 31, 2023
447254f
Checkstyle changes
sreeb-msft Mar 31, 2023
585d101
Merge branch 'HADOOP-18012' of https://github.com/sreeb-msft/hadoop i…
sreeb-msft Mar 31, 2023
d202799
Removing redundant import
sreeb-msft Mar 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ public class AbfsConfiguration{
FS_AZURE_ENABLE_ABFS_LIST_ITERATOR, DefaultValue = DEFAULT_ENABLE_ABFS_LIST_ITERATOR)
private boolean enableAbfsListIterator;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_ABFS_RENAME_RESILIENCE, DefaultValue = DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE)
private boolean renameResilience;

public AbfsConfiguration(final Configuration rawConfig, String accountName)
throws IllegalAccessException, InvalidConfigurationValueException, IOException {
this.rawConfig = ProviderUtils.excludeIncompatibleCredentialProviders(
Expand Down Expand Up @@ -1139,4 +1143,11 @@ public void setEnableAbfsListIterator(boolean enableAbfsListIterator) {
this.enableAbfsListIterator = enableAbfsListIterator;
}

public boolean getRenameResilience() {
return renameResilience;
}

void setRenameResilience(boolean actualResilience) {
renameResilience = actualResilience;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,9 @@ public void initialize(URI uri, Configuration configuration)
tracingHeaderFormat = abfsConfiguration.getTracingHeaderFormat();
this.setWorkingDirectory(this.getHomeDirectory());

TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, listener);
if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) {
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, listener);
if (this.tryGetFileStatus(new Path(AbfsHttpConstants.ROOT_PATH), tracingContext) == null) {
try {
this.createFileSystem(tracingContext);
Expand Down Expand Up @@ -442,7 +442,7 @@ public boolean rename(final Path src, final Path dst) throws IOException {
}

// Non-HNS account need to check dst status on driver side.
if (!abfsStore.getIsNamespaceEnabled(tracingContext) && dstFileStatus == null) {
if (!getIsNamespaceEnabled(tracingContext) && dstFileStatus == null) {
dstFileStatus = tryGetFileStatus(qualifiedDstPath, tracingContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -923,9 +923,11 @@ public boolean rename(final Path source,

do {
try (AbfsPerfInfo perfInfo = startTracking("rename", "renamePath")) {
boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext);
final AbfsClientRenameResult abfsClientRenameResult =
client.renamePath(sourceRelativePath, destinationRelativePath,
continuation, tracingContext, sourceEtag, false);
continuation, tracingContext, sourceEtag, false,
isNamespaceEnabled);

AbfsRestOperation op = abfsClientRenameResult.getOp();
perfInfo.registerResult(op.getResult());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ public final class ConfigurationKeys {
/** Key for rate limit capacity, as used by IO operations which try to throttle themselves. */
public static final String FS_AZURE_ABFS_IO_RATE_LIMIT = "fs.azure.io.rate.limit";

/** Add extra resilience to rename failures, at the expense of performance. */
public static final String FS_AZURE_ABFS_RENAME_RESILIENCE = "fs.azure.enable.rename.resilience";

public static String accountProperty(String property, String account) {
return property + "." + account;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public final class FileSystemConfigurations {

public static final int STREAM_ID_LEN = 12;
public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true;
public static final boolean DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE = true;

/**
* Limit of queued block upload operations before writes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException;
Expand All @@ -68,6 +69,7 @@
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import org.apache.hadoop.util.concurrent.HadoopExecutors;

import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS;
import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
Expand All @@ -77,8 +79,8 @@
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND;

/**
* AbfsClient.
Expand Down Expand Up @@ -106,9 +108,12 @@ public class AbfsClient implements Closeable {

private final ListeningScheduledExecutorService executorService;

/** logging the rename failure if metadata is in an incomplete state. */
private static final LogExactlyOnce ABFS_METADATA_INCOMPLETE_RENAME_FAILURE =
new LogExactlyOnce(LOG);
private boolean renameResilience;

/**
* logging the rename failure if metadata is in an incomplete state.
*/
private static final LogExactlyOnce ABFS_METADATA_INCOMPLETE_RENAME_FAILURE = new LogExactlyOnce(LOG);

private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
Expand All @@ -123,6 +128,7 @@ private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCreden
this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
this.authType = abfsConfiguration.getAuthType(accountName);
this.intercept = AbfsThrottlingInterceptFactory.getInstance(accountName, abfsConfiguration);
this.renameResilience = abfsConfiguration.getRenameResilience();

String encryptionKey = this.abfsConfiguration
.getClientProvidedEncryptionKey();
Expand Down Expand Up @@ -504,27 +510,55 @@ public AbfsRestOperation breakLease(final String path,
* took place.
* As rename recovery is only attempted if the source etag is non-empty,
* in normal rename operations rename recovery will never happen.
* @param source path to source file
* @param destination destination of rename.
* @param continuation continuation.
* @param tracingContext trace context
* @param sourceEtag etag of source file. may be null or empty
*
* @param source path to source file
* @param destination destination of rename.
* @param continuation continuation.
* @param tracingContext trace context
* @param sourceEtag etag of source file. may be null or empty
* @param isMetadataIncompleteState was there a rename failure due to
* incomplete metadata state?
* @param isNamespaceEnabled whether namespace enabled account or not
* @return AbfsClientRenameResult result of rename operation indicating the
* AbfsRest operation, rename recovery and incomplete metadata state failure.
* @throws AzureBlobFileSystemException failure, excluding any recovery from overload failures.
*/
public AbfsClientRenameResult renamePath(
final String source,
final String destination,
final String continuation,
final TracingContext tracingContext,
final String sourceEtag,
boolean isMetadataIncompleteState)
final String source,
final String destination,
final String continuation,
final TracingContext tracingContext,
String sourceEtag,
boolean isMetadataIncompleteState,
boolean isNamespaceEnabled)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();

final boolean hasEtag = !isEmpty(sourceEtag);

boolean shouldAttemptRecovery = renameResilience && isNamespaceEnabled;
if (!hasEtag && shouldAttemptRecovery) {
// in case eTag is already not supplied to the API
// and rename resilience is expected and it is an HNS enabled account
// fetch the source etag to be used later in recovery
try {
final AbfsRestOperation srcStatusOp = getPathStatus(source,
false, tracingContext);
if (srcStatusOp.hasResult()) {
final AbfsHttpOperation result = srcStatusOp.getResult();
sourceEtag = extractEtagHeader(result);
// and update the directory status.
boolean isDir = checkIsDir(result);
shouldAttemptRecovery = !isDir;
LOG.debug("Retrieved etag of source for rename recovery: {}; isDir={}", sourceEtag, isDir);
}
} catch (AbfsRestOperationException e) {
throw new AbfsRestOperationException(e.getStatusCode(), SOURCE_PATH_NOT_FOUND.getErrorCode(),
e.getMessage(), e);
}

}

String encodedRenameSource = urlEncode(FORWARD_SLASH + this.getFileSystem() + source);
if (authType == AuthType.SAS) {
final AbfsUriQueryBuilder srcQueryBuilder = new AbfsUriQueryBuilder();
Expand All @@ -541,12 +575,7 @@ public AbfsClientRenameResult renamePath(
appendSASTokenToQuery(destination, SASTokenProvider.RENAME_DESTINATION_OPERATION, abfsUriQueryBuilder);

final URL url = createRequestUrl(destination, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.RenamePath,
this,
HTTP_METHOD_PUT,
url,
requestHeaders);
final AbfsRestOperation op = createRenameRestOperation(url, requestHeaders);
try {
incrementAbfsRenamePath();
op.execute(tracingContext);
Expand All @@ -557,48 +586,74 @@ public AbfsClientRenameResult renamePath(
// isMetadataIncompleteState is used for renameRecovery(as the 2nd param).
return new AbfsClientRenameResult(op, isMetadataIncompleteState, isMetadataIncompleteState);
} catch (AzureBlobFileSystemException e) {
// If we have no HTTP response, throw the original exception.
if (!op.hasResult()) {
throw e;
}

// ref: HADOOP-18242. Rename failure occurring due to a rare case of
// tracking metadata being in incomplete state.
if (op.getResult().getStorageErrorCode()
.equals(RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode())
&& !isMetadataIncompleteState) {
//Logging
ABFS_METADATA_INCOMPLETE_RENAME_FAILURE
.info("Rename Failure attempting to resolve tracking metadata state and retrying.");
// If we have no HTTP response, throw the original exception.
if (!op.hasResult()) {
throw e;
}

// ref: HADOOP-18242. Rename failure occurring due to a rare case of
// tracking metadata being in incomplete state.
if (op.getResult().getStorageErrorCode()
.equals(RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode())
&& !isMetadataIncompleteState) {
//Logging
ABFS_METADATA_INCOMPLETE_RENAME_FAILURE
.info("Rename Failure attempting to resolve tracking metadata state and retrying.");
// rename recovery should be attempted in this case also
shouldAttemptRecovery = true;
isMetadataIncompleteState = true;
String sourceEtagAfterFailure = sourceEtag;
if (isEmpty(sourceEtagAfterFailure)) {
// Doing a HEAD call resolves the incomplete metadata state and
// then we can retry the rename operation.
AbfsRestOperation sourceStatusOp = getPathStatus(source, false,
tracingContext);
isMetadataIncompleteState = true;
// Extract the sourceEtag, using the status Op, and set it
// for future rename recovery.
AbfsHttpOperation sourceStatusResult = sourceStatusOp.getResult();
String sourceEtagAfterFailure = extractEtagHeader(sourceStatusResult);
renamePath(source, destination, continuation, tracingContext,
sourceEtagAfterFailure, isMetadataIncompleteState);
}
// if we get out of the condition without a successful rename, then
// it isn't metadata incomplete state issue.
isMetadataIncompleteState = false;

boolean etagCheckSucceeded = renameIdempotencyCheckOp(
source,
sourceEtag, op, destination, tracingContext);
if (!etagCheckSucceeded) {
// idempotency did not return different result
// throw back the exception
throw e;
sourceEtagAfterFailure = extractEtagHeader(sourceStatusResult);
}
renamePath(source, destination, continuation, tracingContext,
sourceEtagAfterFailure, isMetadataIncompleteState, isNamespaceEnabled);
}
// if we get out of the condition without a successful rename, then
// it isn't metadata incomplete state issue.
isMetadataIncompleteState = false;

// setting default rename recovery success to false
boolean etagCheckSucceeded = false;
if (shouldAttemptRecovery) {
etagCheckSucceeded = renameIdempotencyCheckOp(
source,
sourceEtag, op, destination, tracingContext);
}
if (!etagCheckSucceeded) {
// idempotency did not return different result
// throw back the exception
throw e;
}
return new AbfsClientRenameResult(op, true, isMetadataIncompleteState);
}
}

private boolean checkIsDir(AbfsHttpOperation result) {
String resourceType = result.getResponseHeader(
HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
return resourceType != null
&& resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY);
}

@VisibleForTesting
AbfsRestOperation createRenameRestOperation(URL url, List<AbfsHttpHeader> requestHeaders) {
AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.RenamePath,
this,
HTTP_METHOD_PUT,
url,
requestHeaders);
return op;
}

private void incrementAbfsRenamePath() {
abfsCounters.incrementCounter(RENAME_PATH_ATTEMPTS, 1);
}
Expand Down Expand Up @@ -628,28 +683,44 @@ public boolean renameIdempotencyCheckOp(
TracingContext tracingContext) {
Preconditions.checkArgument(op.hasResult(), "Operations has null HTTP response");

if ((op.isARetriedRequest())
&& (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND)
&& isNotEmpty(sourceEtag)) {

// Server has returned HTTP 404, which means rename source no longer
// exists. Check on destination status and if its etag matches
// that of the source, consider it to be a success.
LOG.debug("rename {} to {} failed, checking etag of destination",
source, destination);
// removing isDir from debug logs as it can be misleading
LOG.debug("rename({}, {}) failure {}; retry={} etag {}",
source, destination, op.getResult().getStatusCode(), op.isARetriedRequest(), sourceEtag);
if (!(op.isARetriedRequest()
&& (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND))) {
// only attempt recovery if the failure was a 404 on a retried rename request.
return false;
}

if (isNotEmpty(sourceEtag)) {
// Server has returned HTTP 404, we have an etag, so see
// if the rename has actually taken place,
LOG.info("rename {} to {} failed, checking etag of destination",
source, destination);
try {
final AbfsRestOperation destStatusOp = getPathStatus(destination,
false, tracingContext);
final AbfsRestOperation destStatusOp = getPathStatus(destination, false, tracingContext);
final AbfsHttpOperation result = destStatusOp.getResult();

return result.getStatusCode() == HttpURLConnection.HTTP_OK
&& sourceEtag.equals(extractEtagHeader(result));
} catch (AzureBlobFileSystemException ignored) {
final boolean recovered = result.getStatusCode() == HttpURLConnection.HTTP_OK
&& sourceEtag.equals(extractEtagHeader(result));
LOG.info("File rename has taken place: recovery {}",
recovered ? "succeeded" : "failed");
return recovered;

} catch (AzureBlobFileSystemException ex) {
// GetFileStatus on the destination failed, the rename did not take place
// or some other failure. log and swallow.
LOG.debug("Failed to get status of path {}", destination, ex);
}
} else {
LOG.debug("No source etag; unable to probe for the operation's success");
}
return false;
return false;
}

@VisibleForTesting
boolean isSourceDestEtagEqual(String sourceEtag, AbfsHttpOperation result) {
return sourceEtag.equals(extractEtagHeader(result));
}

public AbfsRestOperation append(final String path, final byte[] buffer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,16 @@ public boolean isRenameRecovered() {
public boolean isIncompleteMetadataState() {
return isIncompleteMetadataState;
}

@Override
public String toString() {
return "AbfsClientRenameResult{"
+ "op="
+ op
+ ", renameRecovered="
+ renameRecovered
+ ", isIncompleteMetadataState="
+ isIncompleteMetadataState
+ '}';
}
}
Loading