Skip to content

Hadoop-17015. ABFS: Handling Rename and Delete idempotency #2021

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
May 19, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,11 @@ public void setMaxIoRetries(int maxIoRetries) {
this.maxIoRetries = maxIoRetries;
}

@VisibleForTesting
void setMaxBackoffIntervalMilliseconds(int maxBackoffInterval) {
this.maxBackoffInterval = maxBackoffInterval;
}

@VisibleForTesting
void setIsNamespaceEnabledAccount(String isNamespaceEnabledAccount) {
this.isNamespaceEnabledAccount = isNamespaceEnabledAccount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.ArrayList;
Expand Down Expand Up @@ -96,6 +95,7 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfInfo;
import org.apache.hadoop.fs.azurebfs.utils.Base64;
import org.apache.hadoop.fs.azurebfs.utils.CRC64;
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
Expand Down Expand Up @@ -128,7 +128,6 @@ public class AzureBlobFileSystemStore implements Closeable {
private URI uri;
private String userName;
private String primaryUserGroup;
private static final String DATE_TIME_PATTERN = "E, dd MMM yyyy HH:mm:ss z";
private static final String TOKEN_DATE_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'";
private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1";
private static final int GET_SET_AGGREGATE_COUNT = 2;
Expand Down Expand Up @@ -672,7 +671,7 @@ public FileStatus getFileStatus(final Path path) throws IOException {
resourceIsDir,
1,
blockSize,
parseLastModifiedTime(lastModified),
DateTimeUtils.parseLastModifiedTime(lastModified),
path,
eTag);
}
Expand Down Expand Up @@ -748,7 +747,8 @@ public FileStatus[] listStatus(final Path path, final String startFrom) throws I
long contentLength = entry.contentLength() == null ? 0 : entry.contentLength();
boolean isDirectory = entry.isDirectory() == null ? false : entry.isDirectory();
if (entry.lastModified() != null && !entry.lastModified().isEmpty()) {
lastModifiedMillis = parseLastModifiedTime(entry.lastModified());
lastModifiedMillis = DateTimeUtils.parseLastModifiedTime(
entry.lastModified());
}

Path entryPath = new Path(File.separator + entry.name());
Expand Down Expand Up @@ -1235,18 +1235,6 @@ private boolean parseIsDirectory(final String resourceType) {
&& resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY);
}

private long parseLastModifiedTime(final String lastModifiedTime) {
long parsedTime = 0;
try {
Date utcDate = new SimpleDateFormat(DATE_TIME_PATTERN, Locale.US).parse(lastModifiedTime);
parsedTime = utcDate.getTime();
} catch (ParseException e) {
LOG.error("Failed to parse the date {}", lastModifiedTime);
} finally {
return parsedTime;
}
}

private String convertXmsPropertiesToCommaSeparatedString(final Hashtable<String, String> properties) throws
CharacterCodingException {
StringBuilder commaSeparatedProperties = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,8 @@ public final class FileSystemConfigurations {
public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING;
public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN";

public static final boolean DEFAULT_DELETE_CONSIDERED_IDEMPOTENT = true;
public static final int DEFAULT_CLOCK_SKEW_WITH_SERVER_IN_MS = 5 * 60 * 1000; // 5 mins

private FileSystemConfigurations() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLEncoder;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
Expand All @@ -44,9 +46,11 @@
import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
import org.apache.hadoop.io.IOUtils;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
Expand Down Expand Up @@ -320,7 +324,51 @@ public AbfsRestOperation renamePath(String source, final String destination, fin
HTTP_METHOD_PUT,
url,
requestHeaders);
Instant renameRequestStartTime = Instant.now();
op.execute();

if (op.getResult().getStatusCode() != HttpURLConnection.HTTP_OK) {
return renameIdempotencyCheckOp(renameRequestStartTime, op, destination);
}

return op;
}

/**
Copy link
Contributor

@bilaharith bilaharith May 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to move the idempotency related methods to a separate Utility/Helper class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes are not utility related and are insync with the handling of the ABFS response. The reason they were included as separate methods was to enable mock testing which was otherwise not possible. Retaining the change as such.

Copy link
Contributor

@bilaharith bilaharith May 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One option is restrict the method accessibility tp package level and use @VisibleForTesting annotation. Keeping a method public solely for testing doesn't look good practice.

Also the idea is, If you have methods 'assisting', chances are the class is actually doing too much. Moving these methods into separate classes with public interfaces keeps the class with the assisting methods responsible for one thing and one thing only (see Single Responsibility Principle). This move into separte classes automatically makes for a testable structure as your methods must be made public

https://softwareengineering.stackexchange.com/questions/274937/is-it-bad-practice-to-make-methods-public-solely-for-the-sake-of-unit-testing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AbfsClient class handles triggering of requests to Store backend and returning the AbfsRestOperation back. For Rename and Delete, the response to return is not determined if request was re-tried by the idempotent logic. It will be not right to consider these methods as "assisting" or providing a utility service and are part of the actual flow.

* Check if the rename request failure is post a retry and if earlier rename
* request might have succeeded at back-end.
*
* If there is a parallel rename activity happening from any other store
* interface, the logic here will detect the rename to have happened due to
* the one initiated from this ABFS filesytem instance as it was retried. This
* should be a corner case hence going ahead with LMT check.
* @param renameRequestStartTime startTime for the rename request
* @param op Rename request REST operation response
* @param destination rename destination path
* @return REST operation response post idempotency check
* @throws AzureBlobFileSystemException if GetFileStatus hits any exception
*/
public AbfsRestOperation renameIdempotencyCheckOp(
final Instant renameRequestStartTime,
final AbfsRestOperation op,
final String destination) throws AzureBlobFileSystemException {
if ((op.isARetriedRequest())
&& (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND)) {
// Server has returned HTTP 404, which means rename source no longer
// exists. Check on destination status and if it has a recent LMT timestamp.
// If yes, return success, else fall back to original rename request failure response.

final AbfsRestOperation destStatusOp = getPathStatus(destination, false);
if (destStatusOp.getResult().getStatusCode() == HttpURLConnection.HTTP_OK) {
String lmt = destStatusOp.getResult().getResponseHeader(
HttpHeaderConfigurations.LAST_MODIFIED);

if (DateTimeUtils.isRecentlyModified(lmt, renameRequestStartTime)) {
return destStatusOp;
}
}
}

return op;
}

Expand Down Expand Up @@ -476,6 +524,45 @@ public AbfsRestOperation deletePath(final String path, final boolean recursive,
url,
requestHeaders);
op.execute();

if (op.getResult().getStatusCode() != HttpURLConnection.HTTP_OK) {
return deleteIdempotencyCheckOp(op);
}

return op;
}

/**
* Check if the delete request failure is post a retry and if delete failure
* qualifies to be a success response assuming idempotency.
*
* There are below scenarios where delete could be incorrectly deducted as
* success post request retry:
* 1. Target was originally not existing and initial delete request had to be
* re-tried.
* 2. Parallel delete issued from any other store interface rather than
* delete issued from this filesystem instance.
* These are few corner cases and usually returning a success at this stage
* should help the job to continue.
* @param op Delete request REST operation response
* @return REST operation response post idempotency check
*/
public AbfsRestOperation deleteIdempotencyCheckOp(final AbfsRestOperation op) {
if ((op.isARetriedRequest())
&& (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND)
&& DEFAULT_DELETE_CONSIDERED_IDEMPOTENT) {
// Server has returned HTTP 404, which means path no longer
// exists. Assuming delete result to be idempotent, return success.
final AbfsRestOperation successOp = new AbfsRestOperation(
AbfsRestOperationType.DeletePath,
this,
HTTP_METHOD_DELETE,
op.getUrl(),
op.getRequestHeaders());
successOp.hardSetResult(HttpURLConnection.HTTP_OK);
return successOp;
}

return op;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,19 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
private long sendRequestTimeMs;
private long recvResponseTimeMs;

public static AbfsHttpOperation getAbfsHttpOperationWithFixedResult(final URL url,
final String method, final int httpStatus) {
return new AbfsHttpOperation(url, method, httpStatus);
}

private AbfsHttpOperation(final URL url, final String method,
final int httpStatus) {
this.isTraceEnabled = LOG.isTraceEnabled();
this.url = url;
this.method = method;
this.statusCode = httpStatus;
}

protected HttpURLConnection getConnection() {
return connection;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,31 @@ public class AbfsRestOperation {
private byte[] buffer;
private int bufferOffset;
private int bufferLength;
private int retryCount = 0;

private AbfsHttpOperation result;

public AbfsHttpOperation getResult() {
return result;
}

public void hardSetResult(int httpStatus) {
result = AbfsHttpOperation.getAbfsHttpOperationWithFixedResult(this.url,
this.method, httpStatus);
}

public URL getUrl() {
return url;
}

public List<AbfsHttpHeader> getRequestHeaders() {
return requestHeaders;
}

public boolean isARetriedRequest() {
return (retryCount > 0);
}

String getSasToken() {
return sasToken;
}
Expand Down Expand Up @@ -157,7 +175,7 @@ void execute() throws AzureBlobFileSystemException {
requestHeaders.add(httpHeader);
}

int retryCount = 0;
retryCount = 0;
LOG.debug("First execution of REST operation - {}", operationType);
while (!executeHttpOperation(retryCount++)) {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.utils;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.Date;
import java.util.Locale;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_CLOCK_SKEW_WITH_SERVER_IN_MS;

public final class DateTimeUtils {
private static final Logger LOG = LoggerFactory.getLogger(DateTimeUtils.class);
private static final String DATE_TIME_PATTERN = "E, dd MMM yyyy HH:mm:ss z";

public static long parseLastModifiedTime(final String lastModifiedTime) {
long parsedTime = 0;
try {
Date utcDate = new SimpleDateFormat(DATE_TIME_PATTERN, Locale.US)
.parse(lastModifiedTime);
parsedTime = utcDate.getTime();
} catch (ParseException e) {
LOG.error("Failed to parse the date {}", lastModifiedTime);
} finally {
return parsedTime;
}
}

/**
* Tries to identify if an operation was recently executed based on the LMT of
* a file or folder. LMT needs to be more recent that the original request
* start time. To include any clock skew with server, LMT within
* DEFAULT_CLOCK_SKEW_WITH_SERVER_IN_MS from the request start time is going
* to be considered to qualify for recent operation.
* @param lastModifiedTime File/Folder LMT
* @param expectedLMTUpdateTime original request timestamp which should
* have updated the LMT on target
* @return true if the LMT is within timespan for recent operation, else false
*/
public static boolean isRecentlyModified(final String lastModifiedTime,
final Instant expectedLMTUpdateTime) {
long lmtEpochTime = DateTimeUtils.parseLastModifiedTime(lastModifiedTime);
long currentEpochTime = expectedLMTUpdateTime.toEpochMilli();

return ((lmtEpochTime > currentEpochTime)
|| ((currentEpochTime - lmtEpochTime) <= DEFAULT_CLOCK_SKEW_WITH_SERVER_IN_MS));
}

private DateTimeUtils() {
}
}
12 changes: 12 additions & 0 deletions hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,18 @@ Config `fs.azure.account.hns.enabled` provides an option to specify whether
Config `fs.azure.enable.check.access` needs to be set true to enable
the AzureBlobFileSystem.access().

### <a name="idempotency"></a> Operation Idempotency

Requests failing due to server timeouts and network failures will be retried.
PUT/POST operations are idempotent and need no specific handling
except for Rename and Delete operations.

Rename idempotency checks are made by ensuring the LastModifiedTime on destination
is recent if source path is found to be non-existent on retry.

Delete is considered to be idempotent by default if the target does not exist on
retry.

### <a name="featureconfigoptions"></a> Primary User Group Options
The group name which is part of FileStatus and AclStatus will be set the same as
the username if the following config is set to true
Expand Down
Loading