Skip to content

HADOOP-17092. ABFS: Making AzureADAuthenticator.getToken() throw HttpException if a… #2123

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jul 21, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.hadoop.fs.azurebfs.oauth2.UserPasswordTokenProvider;
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
import org.apache.hadoop.fs.azurebfs.services.KeyProvider;
import org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
Expand Down Expand Up @@ -120,6 +121,26 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT)
private int customTokenFetchRetryCount;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT,
MinValue = 0,
DefaultValue = DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS)
private int oauthTokenFetchRetryCount;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF,
MinValue = 0,
DefaultValue = DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF_INTERVAL)
private int oauthTokenFetchRetryMinBackoff;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF,
MinValue = 0,
DefaultValue = DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF_INTERVAL)
private int oauthTokenFetchRetryMaxBackoff;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF,
MinValue = 0,
DefaultValue = DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF)
private int oauthTokenFetchRetryDeltaBackoff;

@LongConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BLOCK_SIZE_PROPERTY_NAME,
MinValue = 0,
MaxValue = MAX_AZURE_BLOCK_SIZE,
Expand Down Expand Up @@ -809,6 +830,12 @@ boolean validateBoolean(Field field) throws IllegalAccessException, InvalidConfi
validator.ThrowIfInvalid()).validate(value);
}

public ExponentialRetryPolicy getOauthTokenFetchRetryPolicy() {
return new ExponentialRetryPolicy(oauthTokenFetchRetryCount,
oauthTokenFetchRetryMinBackoff, oauthTokenFetchRetryMaxBackoff,
oauthTokenFetchRetryDeltaBackoff);
}

@VisibleForTesting
void setReadBufferSize(int bufferSize) {
this.readBufferSize = bufferSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import org.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator;
import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer;
import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformerInterface;
import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
Expand Down Expand Up @@ -1234,6 +1235,10 @@ private void initializeClient(URI uri, String fileSystemName,
AccessTokenProvider tokenProvider = null;
SASTokenProvider sasTokenProvider = null;

if (authType == AuthType.OAuth) {
AzureADAuthenticator.init(abfsConfiguration);
}

if (authType == AuthType.SharedKey) {
LOG.trace("Fetching SharedKey credentials");
int dotIndex = accountName.indexOf(AbfsHttpConstants.DOT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ public final class ConfigurationKeys {
public static final String AZURE_MAX_IO_RETRIES = "fs.azure.io.retry.max.retries";
public static final String AZURE_CUSTOM_TOKEN_FETCH_RETRY_COUNT = "fs.azure.custom.token.fetch.retry.count";

// Retry strategy for getToken calls
public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT = "fs.azure.oauth.token.fetch.retry.max.retries";
public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF = "fs.azure.oauth.token.fetch.retry.min.backoff.interval";
public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF = "fs.azure.oauth.token.fetch.retry.max.backoff.interval";
public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF = "fs.azure.oauth.token.fetch.retry.delta.backoff";

// Read and write buffer sizes defined by the user
public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size";
public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,21 @@ public final class FileSystemConfigurations {

public static final String USER_HOME_DIRECTORY_PREFIX = "/user";

private static final int SIXTY_SECONDS = 60 * 1000;

// Retry parameter defaults.
public static final int DEFAULT_MIN_BACKOFF_INTERVAL = 3 * 1000; // 3s
public static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30 * 1000; // 30s
public static final int DEFAULT_BACKOFF_INTERVAL = 3 * 1000; // 3s
public static final int DEFAULT_MAX_RETRY_ATTEMPTS = 30;
public static final int DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT = 3;

// Retry parameter defaults.
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS = 5;
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF_INTERVAL = 0;
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF_INTERVAL = SIXTY_SECONDS;
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF = 2;

private static final int ONE_KB = 1024;
private static final int ONE_MB = ONE_KB * ONE_KB;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.apache.hadoop.fs.azurebfs.oauth2;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Date;
Expand All @@ -34,6 +36,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.azurebfs.services.AbfsIoUtils;
Expand All @@ -56,10 +59,16 @@ public final class AzureADAuthenticator {
private static final int CONNECT_TIMEOUT = 30 * 1000;
private static final int READ_TIMEOUT = 30 * 1000;

private static ExponentialRetryPolicy tokenFetchRetryPolicy;

private AzureADAuthenticator() {
// no operation
}

public static void init(AbfsConfiguration abfsConfiguration) {
tokenFetchRetryPolicy = abfsConfiguration.getOauthTokenFetchRetryPolicy();
}

/**
* gets Azure Active Directory token using the user ID and password of
* a service principal (that is, Web App in Azure Active Directory).
Expand All @@ -81,8 +90,7 @@ private AzureADAuthenticator() {
* @throws IOException throws IOException if there is a failure in connecting to Azure AD
*/
public static AzureADToken getTokenUsingClientCreds(String authEndpoint,
String clientId, String clientSecret)
throws IOException {
String clientId, String clientSecret) throws IOException {
Preconditions.checkNotNull(authEndpoint, "authEndpoint");
Preconditions.checkNotNull(clientId, "clientId");
Preconditions.checkNotNull(clientSecret, "clientSecret");
Expand Down Expand Up @@ -283,13 +291,14 @@ private static AzureADToken getTokenCall(String authEndpoint, String body,
Hashtable<String, String> headers, String httpMethod, boolean isMsi)
throws IOException {
AzureADToken token = null;
ExponentialRetryPolicy retryPolicy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As every OAuth token provider eventually calls this method to get token, you will just need to create an instance of exponential retry right here with the configured values ?
Is there any benefit creating and passing down exponential retry instance from each of the token provider ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

= new ExponentialRetryPolicy(3, 0, 1000, 2);

int httperror = 0;
IOException ex = null;
boolean succeeded = false;
boolean isRecoverableFailure = true;
int retryCount = 0;
boolean shouldRetry;
LOG.trace("First execution of REST operation getTokenSingleCall");
do {
httperror = 0;
ex = null;
Expand All @@ -299,17 +308,38 @@ private static AzureADToken getTokenCall(String authEndpoint, String body,
httperror = e.httpErrorCode;
ex = e;
} catch (IOException e) {
ex = e;
httperror = -1;
isRecoverableFailure = isRecoverableFailure(e);
ex = new HttpException(httperror, "", String
.format("AzureADAuthenticator.getTokenCall threw %s : %s",
e.getClass().getTypeName(), e.getMessage()), authEndpoint, "",
"");
}
succeeded = ((httperror == 0) && (ex == null));
shouldRetry = !succeeded && isRecoverableFailure
&& tokenFetchRetryPolicy.shouldRetry(retryCount, httperror);
retryCount++;
} while (!succeeded && retryPolicy.shouldRetry(retryCount, httperror));
if (shouldRetry) {
LOG.debug("Retrying getTokenSingleCall. RetryCount = {}", retryCount);
try {
Thread.sleep(tokenFetchRetryPolicy.getRetryInterval(retryCount));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

} while (shouldRetry);
if (!succeeded) {
throw ex;
}
return token;
}

private static boolean isRecoverableFailure(IOException e) {
return !(e instanceof MalformedURLException
|| e instanceof FileNotFoundException);
}

private static AzureADToken getTokenSingleCall(String authEndpoint,
String payload, Hashtable<String, String> headers, String httpMethod,
boolean isMsi)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Random;
import java.net.HttpURLConnection;

import com.google.common.annotations.VisibleForTesting;

/**
* Retry policy used by AbfsClient.
* */
Expand Down Expand Up @@ -138,4 +140,25 @@ public long getRetryInterval(final int retryCount) {

return retryInterval;
}

@VisibleForTesting
int getRetryCount() {
return this.retryCount;
}

@VisibleForTesting
int getMinBackoff() {
return this.minBackoff;
}

@VisibleForTesting
int getMaxBackoff() {
return maxBackoff;
}

@VisibleForTesting
int getDeltaBackoff() {
return this.deltaBackoff;
}

}
20 changes: 19 additions & 1 deletion hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,22 @@ All secrets can be stored in JCEKS files. These are encrypted and password
protected —use them or a compatible Hadoop Key Management Store wherever
possible

### <a name="aad-token-fetch-retry-logic"></a> AAD Token fetch retries

The exponential retry policy used for the AAD token fetch retries can be tuned
with the following configurations.
* `fs.azure.oauth.token.fetch.retry.max.retries`: Sets the maximum number of
retries. Default value is 5.
* `fs.azure.oauth.token.fetch.retry.min.backoff.interval`: Minimum back-off
interval. Added to the retry interval computed from delta backoff. By
default this si set as 0. Set the interval in milli seconds.
* `fs.azure.oauth.token.fetch.retry.max.backoff.interval`: Maximum back-off
interval. Default value is 60000 (sixty seconds). Set the interval in milli
seconds.
* `fs.azure.oauth.token.fetch.retry.delta.backoff`: Back-off interval between
retries. Multiples of this timespan are used for subsequent retry attempts
. The default value is 2.

### <a name="shared-key-auth"></a> Default: Shared Key

This is the simplest authentication mechanism of account + password.
Expand Down Expand Up @@ -776,7 +792,9 @@ bytes. The value should be between 16384 to 104857600 both inclusive (16 KB to
`fs.azure.readaheadqueue.depth`: Sets the readahead queue depth in
AbfsInputStream. In case the set value is negative the read ahead queue depth
will be set as Runtime.getRuntime().availableProcessors(). By default the value
will be -1.
will be -1. To disable readaheads, set this value to 0. If your workload is
doing only random reads (non-sequential) or you are seeing throttling, you
may try setting this value to 0.

### <a name="securityconfigoptions"></a> Security Options
`fs.azure.always.use.https`: Enforces to use HTTPS instead of HTTP when the flag
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;

import org.assertj.core.api.Assertions;
import org.junit.Test;

import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;

import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF_INTERVAL;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF_INTERVAL;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ACCOUNT_NAME;

public class TestAzureADAuthenticator extends AbstractAbfsIntegrationTest {

private static final int TEST_RETRY_COUNT = 10;
private static final int TEST_MIN_BACKOFF = 20;
private static final int TEST_MAX_BACKOFF = 30;
private static final int TEST_DELTA_BACKOFF = 40;

public TestAzureADAuthenticator() throws Exception {
super();
}

@Test
public void testDefaultOAuthTokenFetchRetryPolicy() throws Exception {
getConfiguration().unset(AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT);
getConfiguration().unset(AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF);
getConfiguration().unset(AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF);
getConfiguration().unset(AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF);

String accountName = getConfiguration().get(FS_AZURE_ACCOUNT_NAME);
AbfsConfiguration abfsConfig = new AbfsConfiguration(getRawConfiguration(),
accountName);

ExponentialRetryPolicy retryPolicy = abfsConfig
.getOauthTokenFetchRetryPolicy();

Assertions.assertThat(retryPolicy.getRetryCount()).describedAs(
"retryCount should be the default value {} as the same "
+ "is not configured",
DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS)
.isEqualTo(DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS);
Assertions.assertThat(retryPolicy.getMinBackoff()).describedAs(
"minBackOff should be the default value {} as the same is "
+ "not configured",
DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF_INTERVAL)
.isEqualTo(DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF_INTERVAL);
Assertions.assertThat(retryPolicy.getMaxBackoff()).describedAs(
"maxBackOff should be the default value {} as the same is "
+ "not configured",
DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF_INTERVAL)
.isEqualTo(DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF_INTERVAL);
Assertions.assertThat(retryPolicy.getDeltaBackoff()).describedAs(
"deltaBackOff should be the default value {} as the same " + "is "
+ "not configured",
DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF)
.isEqualTo(DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF);

}

@Test
public void testOAuthTokenFetchRetryPolicy()
throws IOException, IllegalAccessException {

getConfiguration()
.set(AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT, String.valueOf(TEST_RETRY_COUNT));
getConfiguration().set(AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF,
String.valueOf(TEST_MIN_BACKOFF));
getConfiguration().set(AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF,
String.valueOf(TEST_MAX_BACKOFF));
getConfiguration().set(AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF,
String.valueOf(TEST_DELTA_BACKOFF));

String accountName = getConfiguration().get(FS_AZURE_ACCOUNT_NAME);
AbfsConfiguration abfsConfig = new AbfsConfiguration(getRawConfiguration(),
accountName);

ExponentialRetryPolicy retryPolicy = abfsConfig
.getOauthTokenFetchRetryPolicy();

Assertions.assertThat(retryPolicy.getRetryCount())
.describedAs("retryCount should be {}", TEST_RETRY_COUNT)
.isEqualTo(TEST_RETRY_COUNT);
Assertions.assertThat(retryPolicy.getMinBackoff())
.describedAs("minBackOff should be {}", TEST_MIN_BACKOFF)
.isEqualTo(TEST_MIN_BACKOFF);
Assertions.assertThat(retryPolicy.getMaxBackoff())
.describedAs("maxBackOff should be {}", TEST_MAX_BACKOFF)
.isEqualTo(TEST_MAX_BACKOFF);
Assertions.assertThat(retryPolicy.getDeltaBackoff())
.describedAs("deltaBackOff should be {}", TEST_DELTA_BACKOFF)
.isEqualTo(TEST_DELTA_BACKOFF);
}

}