Skip to content

HADOOP-18908. Improve S3A region handling. #6106

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,6 @@ public final class StoreStatisticNames {
public static final String MULTIPART_UPLOAD_LIST
= "multipart_upload_list";

/** Probe for store region: {@value}. */
public static final String STORE_REGION_PROBE
= "store_region_probe";

private StoreStatisticNames() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,12 @@ private Constants() {
*/
public static final String AWS_S3_CENTRAL_REGION = "us-east-1";

/**
* The default S3 region when using cross region client.
* Value {@value}.
*/
public static final String AWS_S3_DEFAULT_REGION = "us-east-2";

/**
* Require that all S3 access is made through Access Points.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import software.amazon.awssdk.awscore.util.AwsHostNameUtils;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
Expand All @@ -48,6 +49,8 @@
import org.apache.hadoop.fs.s3a.statistics.impl.AwsStatisticsCollector;
import org.apache.hadoop.fs.store.LogExactlyOnce;

import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_DEFAULT_REGION;
import static org.apache.hadoop.fs.s3a.Constants.CENTRAL_ENDPOINT;
import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.REQUESTER_PAYS_HEADER;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_SECURE_CONNECTIONS;
import static org.apache.hadoop.fs.s3a.Constants.SECURE_CONNECTIONS;
Expand All @@ -66,12 +69,27 @@ public class DefaultS3ClientFactory extends Configured

private static final String REQUESTER_PAYS_HEADER_VALUE = "requester";

private static final String S3_SERVICE_NAME = "s3";

/**
* Subclasses refer to this.
*/
protected static final Logger LOG =
LoggerFactory.getLogger(DefaultS3ClientFactory.class);

/**
* A one-off warning of default region chains in use.
*/
private static final LogExactlyOnce WARN_OF_DEFAULT_REGION_CHAIN =
new LogExactlyOnce(LOG);

/**
* Warning message printed when the SDK Region chain is in use.
*/
private static final String SDK_REGION_CHAIN_IN_USE =
"S3A filesystem client is using"
+ " the SDK region resolution chain.";


/** Exactly once log to inform about ignoring the AWS-SDK Warnings for CSE. */
private static final LogExactlyOnce IGNORE_CSE_WARN = new LogExactlyOnce(LOG);
Expand Down Expand Up @@ -138,15 +156,7 @@ private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> Build
BuilderT builder, S3ClientCreationParameters parameters, Configuration conf, String bucket)
throws IOException {

Region region = parameters.getRegion();
LOG.debug("Using region {}", region);

URI endpoint = getS3Endpoint(parameters.getEndpoint(), conf);

if (endpoint != null) {
builder.endpointOverride(endpoint);
LOG.debug("Using endpoint {}", endpoint);
}
configureEndpointAndRegion(builder, parameters, conf);

S3Configuration serviceConfiguration = S3Configuration.builder()
.pathStyleAccessEnabled(parameters.isPathStyleAccess())
Expand All @@ -155,7 +165,6 @@ private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> Build
return builder
.overrideConfiguration(createClientOverrideConfiguration(parameters, conf))
.credentialsProvider(parameters.getCredentialSet())
.region(region)
.serviceConfiguration(serviceConfiguration);
}

Expand Down Expand Up @@ -201,6 +210,65 @@ protected ClientOverrideConfiguration createClientOverrideConfiguration(
return clientOverrideConfigBuilder.build();
}

/**
* This method configures the endpoint and region for a S3 client.
* The order of configuration is:
*
* <ol>
* <li>If region is configured via fs.s3a.endpoint.region, use it.</li>
* <li>If endpoint is configured via via fs.s3a.endpoint, set it.
* If no region is configured, try to parse region from endpoint. </li>
* <li> If no region is configured, and it could not be parsed from the endpoint,
* set the default region as US_EAST_2 and enable cross region access. </li>
* <li> If configured region is empty, fallback to SDK resolution chain. </li>
* </ol>
*
* @param builder S3 client builder.
* @param parameters parameter object
* @param conf conf configuration object
* @param <BuilderT> S3 client builder type
* @param <ClientT> S3 client type
*/
private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> void configureEndpointAndRegion(
BuilderT builder, S3ClientCreationParameters parameters, Configuration conf) {
URI endpoint = getS3Endpoint(parameters.getEndpoint(), conf);

String configuredRegion = parameters.getRegion();
Region region = null;

// If the region was configured, set it.
if (configuredRegion != null && !configuredRegion.isEmpty()) {
region = Region.of(configuredRegion);
}

if (endpoint != null) {
builder.endpointOverride(endpoint);
// No region was configured, try to determine it from the endpoint.
if (region == null) {
region = getS3RegionFromEndpoint(parameters.getEndpoint());
}
LOG.debug("Setting endpoint to {}", endpoint);
}

if (region != null) {
builder.region(region);
} else if (configuredRegion == null) {
// no region is configured, and none could be determined from the endpoint.
// Use US_EAST_2 as default.
region = Region.of(AWS_S3_DEFAULT_REGION);
builder.crossRegionAccessEnabled(true);
builder.region(region);
} else if (configuredRegion.isEmpty()) {
// region configuration was set to empty string.
// allow this if people really want it; it is OK to rely on this
// when deployed in EC2.
WARN_OF_DEFAULT_REGION_CHAIN.warn(SDK_REGION_CHAIN_IN_USE);
LOG.debug(SDK_REGION_CHAIN_IN_USE);
}

LOG.debug("Setting region to {}", region);
}

/**
* Given a endpoint string, create the endpoint URI.
*
Expand Down Expand Up @@ -229,4 +297,23 @@ private static URI getS3Endpoint(String endpoint, final Configuration conf) {
throw new IllegalArgumentException(e);
}
}

/**
* Parses the endpoint to get the region.
* If endpoint is the central one, use US_EAST_1.
*
* @param endpoint the configure endpoint.
* @return the S3 region, null if unable to resolve from endpoint.
*/
private static Region getS3RegionFromEndpoint(String endpoint) {

if(!endpoint.endsWith(CENTRAL_ENDPOINT)) {
LOG.debug("Endpoint {} is not the default; parsing", endpoint);
return AwsHostNameUtils.parseSigningRegion(endpoint, S3_SERVICE_NAME).orElse(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weve had so much trouble in the past here with vpce, govcloud, cn that we need a set of unit tests to verify this code does what we actually want -and therea are no regressions

Copy link
Contributor Author

@ahmarsuhail ahmarsuhail Oct 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

turns out this AwsHostNameUtils.parseSigningRegion is not meant to be used for non standard endpoints .. which why a vpce endpoint gets resolved to region "vpce". Can we still merge this PR and create a follow up PR to handle non standard endpoints? I working with the SDK team to understand how best to do this - via the SDK or in S3A.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. getting pressure to merge this as without it then its a major regression. also, we need a "what is a region" section, or maybe "all about signing, endpoints and regions"

}

// endpoint is for US_EAST_1;
return Region.US_EAST_1;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
Expand All @@ -54,7 +53,6 @@

import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
Expand Down Expand Up @@ -83,7 +81,6 @@
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Error;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.model.SelectObjectContentRequest;
import software.amazon.awssdk.services.s3.model.SelectObjectContentResponseHandler;
Expand All @@ -98,7 +95,6 @@
import software.amazon.awssdk.transfer.s3.model.FileUpload;
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -246,7 +242,6 @@
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_301_MOVED_PERMANENTLY;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_403_FORBIDDEN;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
Expand Down Expand Up @@ -332,8 +327,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private int executorCapacity;
private long multiPartThreshold;
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
/** Exactly once log to warn about setting the region in config to avoid probe. */
private static final LogExactlyOnce SET_REGION_WARNING = new LogExactlyOnce(LOG);

/** Log to warn of storage class configuration problems. */
private static final LogExactlyOnce STORAGE_CLASS_WARNING = new LogExactlyOnce(LOG);
Expand Down Expand Up @@ -461,8 +454,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private String scheme = FS_S3A;

private final static Map<String, Region> BUCKET_REGIONS = new HashMap<>();

/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
private static void addDeprecatedKeys() {
Expand Down Expand Up @@ -870,9 +861,6 @@ protected void verifyBucketExists() throws UnknownStoreException, IOException {
STORE_EXISTS_PROBE, bucket, null, () ->
invoker.retry("doesBucketExist", bucket, true, () -> {
try {
if (BUCKET_REGIONS.containsKey(bucket)) {
return true;
}
s3Client.headBucket(HeadBucketRequest.builder().bucket(bucket).build());
return true;
} catch (AwsServiceException ex) {
Expand Down Expand Up @@ -982,8 +970,6 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
? conf.getTrimmed(AWS_REGION)
: accessPoint.getRegion();

Region region = getS3Region(configuredRegion);

S3ClientFactory.S3ClientCreationParameters parameters =
new S3ClientFactory.S3ClientCreationParameters()
.withCredentialSet(credentials)
Expand All @@ -998,7 +984,7 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
.withMultipartCopyEnabled(isMultipartCopyEnabled)
.withMultipartThreshold(multiPartThreshold)
.withTransferManagerExecutor(unboundedThreadPool)
.withRegion(region);
.withRegion(configuredRegion);

S3ClientFactory clientFactory = ReflectionUtils.newInstance(s3ClientFactoryClass, conf);
s3Client = clientFactory.createS3Client(getUri(), parameters);
Expand All @@ -1019,75 +1005,6 @@ private void createS3AsyncClient(S3ClientFactory clientFactory,
s3AsyncClient = clientFactory.createS3AsyncClient(getUri(), parameters);
}

/**
* Get the bucket region.
*
* @param region AWS S3 Region set in the config. This property may not be set, in which case
* ask S3 for the region.
* @return region of the bucket.
*/
private Region getS3Region(String region) throws IOException {

if (!StringUtils.isBlank(region)) {
return Region.of(region);
}

Region cachedRegion = BUCKET_REGIONS.get(bucket);

if (cachedRegion != null) {
LOG.debug("Got region {} for bucket {} from cache", cachedRegion, bucket);
return cachedRegion;
}

Region s3Region = trackDurationAndSpan(STORE_REGION_PROBE, bucket, null,
() -> invoker.retry("getS3Region", bucket, true, () -> {
try {

SET_REGION_WARNING.warn(
"Getting region for bucket {} from S3, this will slow down FS initialisation. "
+ "To avoid this, set the region using property {}", bucket,
FS_S3A_BUCKET_PREFIX + bucket + ".endpoint.region");

// build a s3 client with region eu-west-1 that can be used to get the region of the
// bucket. Using eu-west-1, as headBucket() doesn't work with us-east-1. This is because
// us-east-1 uses the endpoint s3.amazonaws.com, which resolves bucket.s3.amazonaws.com
// to the actual region the bucket is in. As the request is signed with us-east-1 and
// not the bucket's region, it fails.
S3Client getRegionS3Client =
S3Client.builder().region(Region.EU_WEST_1).credentialsProvider(credentials)
.build();

HeadBucketResponse headBucketResponse =
getRegionS3Client.headBucket(HeadBucketRequest.builder().bucket(bucket).build());

Region bucketRegion = Region.of(
headBucketResponse.sdkHttpResponse().headers().get(BUCKET_REGION_HEADER).get(0));
BUCKET_REGIONS.put(bucket, bucketRegion);

return bucketRegion;
} catch (S3Exception exception) {
if (exception.statusCode() == SC_301_MOVED_PERMANENTLY) {
Region bucketRegion = Region.of(
exception.awsErrorDetails().sdkHttpResponse().headers().get(BUCKET_REGION_HEADER)
.get(0));
BUCKET_REGIONS.put(bucket, bucketRegion);

return bucketRegion;
}

if (exception.statusCode() == SC_404_NOT_FOUND) {
throw new UnknownStoreException("s3a://" + bucket + "/",
" Bucket does not exist: " + exception,
exception);
}

throw exception;
}
}));

return s3Region;
}

/**
* Initialize and launch the audit manager and service.
* As this takes the FS IOStatistics store, it must be invoked
Expand Down
Loading