Skip to content

HADOOP-13551. AWS metrics wire-up (#2778) #2815

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,33 @@ private Constants() {
DEFAULT_SSL_CHANNEL_MODE =
DelegatingSSLSocketFactory.SSLChannelMode.Default_JSSE;

//use a custom endpoint?
/**
* Endpoint. For v4 signing and/or better performance,
* this should be the specific endpoint of the region
* in which the bucket is hosted.
*/
public static final String ENDPOINT = "fs.s3a.endpoint";

/**
* Default value of s3 endpoint. If not set explicitly using
* {@code AmazonS3#setEndpoint()}, this is used.
* Default value of s3 endpoint: {@value}.
* It tells the AWS client to work it out by asking the central
* endpoint where the bucket lives; caching that
* value in the client for the life of the process.
* <p>
* Note: previously this constant was defined as
* {@link #CENTRAL_ENDPOINT}, however the actual
* S3A client code used "" as the default when
* {@link #ENDPOINT} was unset.
* As core-default.xml also set the endpoint to "",
* the empty string has long been the <i>real</i>
* default value.
*/
public static final String DEFAULT_ENDPOINT = "";

/**
* The central endpoint :{@value}.
*/
public static final String DEFAULT_ENDPOINT = "s3.amazonaws.com";
public static final String CENTRAL_ENDPOINT = "s3.amazonaws.com";

//Enable path style access? Overrides default virtual hosting
public static final String PATH_STYLE_ACCESS = "fs.s3a.path.style.access";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
import java.net.URI;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.metrics.RequestMetricCollector;
import com.amazonaws.handlers.RequestHandler2;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
Expand All @@ -41,27 +40,22 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.s3a.statistics.StatisticsFromAwsSdk;
import org.apache.hadoop.fs.s3a.statistics.impl.AwsStatisticsCollector;

import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT;
import static org.apache.hadoop.fs.s3a.Constants.PATH_STYLE_ACCESS;

/**
* The default {@link S3ClientFactory} implementation.
* This calls the AWS SDK to configure and create an
* {@link AmazonS3Client} that communicates with the S3 service.
* {@code AmazonS3Client} that communicates with the S3 service.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class DefaultS3ClientFactory extends Configured
implements S3ClientFactory {

private static final String S3_SERVICE_NAME = "s3";
private static final String S3_SIGNER = "S3SignerType";
private static final String S3_V4_SIGNER = "AWSS3V4SignerType";

/**
* Subclasses refer to this.
Expand All @@ -70,22 +64,21 @@ public class DefaultS3ClientFactory extends Configured
LoggerFactory.getLogger(DefaultS3ClientFactory.class);

/**
* Create the client.
* <p>
* If the AWS stats are not null then a {@link AwsStatisticsCollector}.
* is created to bind to the two.
* <i>Important: until this binding works properly across regions,
* this should be null.</i>
* Create the client by preparing the AwsConf configuration
* and then invoking {@code buildAmazonS3Client()}.
*/
@Override
public AmazonS3 createS3Client(URI name,
final String bucket,
final AWSCredentialsProvider credentials,
final String userAgentSuffix,
final StatisticsFromAwsSdk statisticsFromAwsSdk) throws IOException {
public AmazonS3 createS3Client(
final URI uri,
final S3ClientCreationParameters parameters) throws IOException {
Configuration conf = getConf();
final ClientConfiguration awsConf = S3AUtils
.createAwsConf(conf, bucket, Constants.AWS_SERVICE_IDENTIFIER_S3);
.createAwsConf(conf,
uri.getHost(),
Constants.AWS_SERVICE_IDENTIFIER_S3);
// add any headers
parameters.getHeaders().forEach((h, v) ->
awsConf.addHeader(h, v));

// When EXPERIMENTAL_AWS_INTERNAL_THROTTLING is false
// throttling is explicitly disabled on the S3 client so that
Expand All @@ -96,111 +89,62 @@ public AmazonS3 createS3Client(URI name,
conf.getBoolean(EXPERIMENTAL_AWS_INTERNAL_THROTTLING,
EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT));

if (!StringUtils.isEmpty(userAgentSuffix)) {
awsConf.setUserAgentSuffix(userAgentSuffix);
if (!StringUtils.isEmpty(parameters.getUserAgentSuffix())) {
awsConf.setUserAgentSuffix(parameters.getUserAgentSuffix());
}
// optional metrics
RequestMetricCollector metrics = statisticsFromAwsSdk != null
? new AwsStatisticsCollector(statisticsFromAwsSdk)
: null;

return newAmazonS3Client(
credentials,
return buildAmazonS3Client(
awsConf,
metrics,
conf.getTrimmed(ENDPOINT, ""),
conf.getBoolean(PATH_STYLE_ACCESS, false));
}

/**
* Create an {@link AmazonS3} client.
* Override this to provide an extended version of the client
* @param credentials credentials to use
* @param awsConf AWS configuration
* @param metrics metrics collector or null
* @param endpoint endpoint string; may be ""
* @param pathStyleAccess enable path style access?
* @return new AmazonS3 client
*/
protected AmazonS3 newAmazonS3Client(
final AWSCredentialsProvider credentials,
final ClientConfiguration awsConf,
final RequestMetricCollector metrics,
final String endpoint,
final boolean pathStyleAccess) {
if (metrics != null) {
LOG.debug("Building S3 client using the SDK builder API");
return buildAmazonS3Client(credentials, awsConf, metrics, endpoint,
pathStyleAccess);
} else {
LOG.debug("Building S3 client using the SDK builder API");
return classicAmazonS3Client(credentials, awsConf, endpoint,
pathStyleAccess);
}
parameters);
}

/**
* Use the (newer) Builder SDK to create a an AWS S3 client.
* Use the Builder API to create an AWS S3 client.
* <p>
* This has a more complex endpoint configuration in a
* way which does not yet work in this code in a way
* which doesn't trigger regressions. So it is only used
* when SDK metrics are supplied.
* @param credentials credentials to use
* This has a more complex endpoint configuration mechanism
* which initially caused problems; the
* {@code withForceGlobalBucketAccessEnabled(true)}
* command is critical here.
* @param awsConf AWS configuration
* @param metrics metrics collector or null
* @param endpoint endpoint string; may be ""
* @param pathStyleAccess enable path style access?
* @param parameters parameters
* @return new AmazonS3 client
*/
private AmazonS3 buildAmazonS3Client(
final AWSCredentialsProvider credentials,
protected AmazonS3 buildAmazonS3Client(
final ClientConfiguration awsConf,
final RequestMetricCollector metrics,
final String endpoint,
final boolean pathStyleAccess) {
final S3ClientCreationParameters parameters) {
AmazonS3ClientBuilder b = AmazonS3Client.builder();
b.withCredentials(credentials);
b.withCredentials(parameters.getCredentialSet());
b.withClientConfiguration(awsConf);
b.withPathStyleAccessEnabled(pathStyleAccess);
if (metrics != null) {
b.withMetricsCollector(metrics);
b.withPathStyleAccessEnabled(parameters.isPathStyleAccess());

if (parameters.getMetrics() != null) {
b.withMetricsCollector(
new AwsStatisticsCollector(parameters.getMetrics()));
}
if (parameters.getRequestHandlers() != null) {
b.withRequestHandlers(
parameters.getRequestHandlers().toArray(new RequestHandler2[0]));
}
if (parameters.getMonitoringListener() != null) {
b.withMonitoringListener(parameters.getMonitoringListener());
}

// endpoint set up is a PITA
// client.setEndpoint("") is no longer available
AwsClientBuilder.EndpointConfiguration epr
= createEndpointConfiguration(endpoint, awsConf);
= createEndpointConfiguration(parameters.getEndpoint(),
awsConf);
if (epr != null) {
// an endpoint binding was constructed: use it.
b.withEndpointConfiguration(epr);
} else {
// no idea what the endpoint is, so tell the SDK
// to work it out at the cost of an extra HEAD request
b.withForceGlobalBucketAccessEnabled(true);
}
final AmazonS3 client = b.build();
return client;
}

/**
* Wrapper around constructor for {@link AmazonS3} client.
* Override this to provide an extended version of the client.
* <p>
* This uses a deprecated constructor -it is currently
* the only one which works for us.
* @param credentials credentials to use
* @param awsConf AWS configuration
* @param endpoint endpoint string; may be ""
* @param pathStyleAccess enable path style access?
* @return new AmazonS3 client
*/
@SuppressWarnings("deprecation")
private AmazonS3 classicAmazonS3Client(
AWSCredentialsProvider credentials,
ClientConfiguration awsConf,
final String endpoint,
final boolean pathStyleAccess) {
final AmazonS3 client = new AmazonS3Client(credentials, awsConf);
return configureAmazonS3Client(client, endpoint, pathStyleAccess);
}

/**
* Configure classic S3 client.
* <p>
Expand All @@ -226,31 +170,6 @@ protected static AmazonS3 configureAmazonS3Client(AmazonS3 s3,
throw new IllegalArgumentException(msg, e);
}
}
return applyS3ClientOptions(s3, pathStyleAccess);
}

/**
* Perform any tuning of the {@code S3ClientOptions} settings based on
* the Hadoop configuration.
* This is different from the general AWS configuration creation as
* it is unique to S3 connections.
* <p>
* The {@link Constants#PATH_STYLE_ACCESS} option enables path-style access
* to S3 buckets if configured. By default, the
* behavior is to use virtual hosted-style access with URIs of the form
* {@code http://bucketname.s3.amazonaws.com}
* <p>
* Enabling path-style access and a
* region-specific endpoint switches the behavior to use URIs of the form
* {@code http://s3-eu-west-1.amazonaws.com/bucketname}.
* It is common to use this when connecting to private S3 servers, as it
* avoids the need to play with DNS entries.
* @param s3 S3 client
* @param pathStyleAccess enable path style access?
* @return the S3 client
*/
protected static AmazonS3 applyS3ClientOptions(AmazonS3 s3,
final boolean pathStyleAccess) {
if (pathStyleAccess) {
LOG.debug("Enabling path style access!");
s3.setS3ClientOptions(S3ClientOptions.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package org.apache.hadoop.fs.s3a;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.metrics.RequestMetricCollector;
import com.amazonaws.services.s3.AmazonS3;

import org.apache.hadoop.classification.InterfaceAudience;
Expand All @@ -31,31 +29,25 @@
* This client is for testing <i>only</i>; it is in the production
* {@code hadoop-aws} module to enable integration tests to use this
* just by editing the Hadoop configuration used to bring up the client.
*
* The factory uses the older constructor-based instantiation/configuration
* of the client, so does not wire up metrics, handlers etc.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class InconsistentS3ClientFactory extends DefaultS3ClientFactory {

/**
* Create the inconsistent client.
* Logs a warning that this is being done.
* @param credentials credentials to use
* @param awsConf AWS configuration
* @param metrics metric collector
* @param endpoint AWS endpoint
* @param pathStyleAccess should path style access be supported?
* @return an inconsistent client.
*/
@Override
protected AmazonS3 newAmazonS3Client(AWSCredentialsProvider credentials,
ClientConfiguration awsConf,
final RequestMetricCollector metrics,
final String endpoint,
final boolean pathStyleAccess) {
protected AmazonS3 buildAmazonS3Client(
final ClientConfiguration awsConf,
final S3ClientCreationParameters parameters) {
LOG.warn("** FAILURE INJECTION ENABLED. Do not run in production! **");
InconsistentAmazonS3Client s3
= new InconsistentAmazonS3Client(credentials, awsConf, getConf());
configureAmazonS3Client(s3, endpoint, pathStyleAccess);
= new InconsistentAmazonS3Client(
parameters.getCredentialSet(), awsConf, getConf());
configureAmazonS3Client(s3,
parameters.getEndpoint(),
parameters.isPathStyleAccess());
return s3;
}
}
Loading