Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;

import java.time.Duration;
import java.util.Locale;
import java.util.concurrent.TimeUnit;

import static org.apache.hadoop.io.Sizes.S_128K;
Expand Down Expand Up @@ -1339,6 +1340,37 @@ private Constants() {
public static final String AWS_SERVICE_IDENTIFIER_DDB = "DDB";
public static final String AWS_SERVICE_IDENTIFIER_STS = "STS";

/** Prefix for S3A client-specific properties.
* value: {@value}
*/
public static final String FS_S3A_CLIENT_PREFIX = "fs.s3a.client.";

/** Custom headers postfix.
* value: {@value}
*/
public static final String CUSTOM_HEADERS_POSTFIX = ".custom.headers";

/**
* List of custom headers to be set on the service client.
* Multiple parameters can be used to specify custom headers.
* <pre>
* Usage:
* fs.s3a.client.s3.custom.headers - Headers to add on all the S3 requests.
* fs.s3a.client.sts.custom.headers - Headers to add on all the STS requests.
*
* Examples:
* CustomHeader {@literal ->} 'Header1:Value1'
* CustomHeaders {@literal ->} 'Header1=Value1;Value2,Header2=Value1'
* </pre>
*/
public static final String CUSTOM_HEADERS_STS =
FS_S3A_CLIENT_PREFIX + AWS_SERVICE_IDENTIFIER_STS.toLowerCase(Locale.ROOT)
+ CUSTOM_HEADERS_POSTFIX;

public static final String CUSTOM_HEADERS_S3 =
FS_S3A_CLIENT_PREFIX + AWS_SERVICE_IDENTIFIER_S3.toLowerCase(Locale.ROOT)
+ CUSTOM_HEADERS_POSTFIX;

/**
* How long to wait for the thread pool to terminate when cleaning up.
* Value: {@value} seconds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -76,6 +80,8 @@
import static org.apache.hadoop.fs.s3a.Constants.SIGNING_ALGORITHM_STS;
import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.USER_AGENT_PREFIX;
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_S3;
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_STS;
import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.enforceMinimumDuration;
import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.getDuration;
import static org.apache.hadoop.util.Preconditions.checkArgument;
Expand Down Expand Up @@ -120,6 +126,8 @@ public static ClientOverrideConfiguration.Builder createClientConfigBuilder(Conf

initUserAgent(conf, overrideConfigBuilder);

initRequestHeaders(conf, overrideConfigBuilder, awsServiceIdentifier);

String signer = conf.getTrimmed(SIGNING_ALGORITHM, "");
if (!signer.isEmpty()) {
LOG.debug("Signer override = {}", signer);
Expand Down Expand Up @@ -412,6 +420,44 @@ private static void initSigner(Configuration conf,
}
}

/**
* Initialize custom request headers for AWS clients.
* @param conf hadoop configuration
* @param clientConfig client configuration to update
* @param awsServiceIdentifier service name
*/
private static void initRequestHeaders(Configuration conf,
ClientOverrideConfiguration.Builder clientConfig, String awsServiceIdentifier) {
String configKey = null;
switch (awsServiceIdentifier) {
case AWS_SERVICE_IDENTIFIER_S3:
configKey = CUSTOM_HEADERS_S3;
break;
case AWS_SERVICE_IDENTIFIER_STS:
configKey = CUSTOM_HEADERS_STS;
break;
default:
// No known service.
}
if (configKey != null) {
Map<String, String> awsClientCustomHeadersMap =
S3AUtils.getTrimmedStringCollectionSplitByEquals(conf, configKey);
awsClientCustomHeadersMap.forEach((header, valueString) -> {
List<String> headerValues = Arrays.stream(valueString.split(";"))
.map(String::trim)
.filter(v -> !v.isEmpty())
.collect(Collectors.toList());
if (!headerValues.isEmpty()) {
clientConfig.putHeader(header, headerValues);
} else {
LOG.warn("Ignoring header '{}' for {} client because no values were provided",
header, awsServiceIdentifier);
}
});
LOG.debug("headers for {} client = {}", awsServiceIdentifier, clientConfig.headers());
}
}

/**
* Configures request timeout in the client configuration.
* This is independent of the timeouts set in the sync and async HTTP clients;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,31 @@ The switch to turn S3A auditing on or off.
</property>

```

### Configuring Custom Headers for AWS Service Clients

You can set custom headers for S3 and STS requests. These headers are set on client level, and will be sent for all requests made to these services.

**Configuration Properties:**
- `fs.s3a.client.s3.custom.headers`: Custom headers for S3 service requests.
- `fs.s3a.client.sts.custom.headers`: Sets custom headers for all requests to AWS STS.

**Header Format:**
Custom headers should be specified as key-value pairs, separated by `=`. Multiple values for a single header can be separated by `;`. Multiple headers can be separated by `,`.


```xml
<property>
<name>fs.s3a.client.s3.custom.headers</name>
<value>Header1=Value1</value>
</property>

<property>
<name>fs.s3a.client.sts.custom.headers</name>
<value>Header1=Value1;Value2,Header2=Value1</value>
</property>
```

## <a name="retry_and_recovery"></a>Retry and Recovery

The S3A client makes a best-effort attempt at recovering from network failures;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.fs.s3a.impl;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;

Expand All @@ -29,11 +30,16 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.test.AbstractHadoopTestBase;
import org.apache.hadoop.util.Lists;

import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_S3;
import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_STS;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_ACQUISITION_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_IDLE_TIME;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_KEEPALIVE;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_TTL;
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_S3;
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_STS;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_DURATION;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_IDLE_TIME_DURATION;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_KEEPALIVE;
Expand All @@ -48,6 +54,7 @@
import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.createApiConnectionSettings;
import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.createClientConfigBuilder;
import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.createConnectionSettings;
import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.enforceMinimumDuration;

Expand Down Expand Up @@ -201,4 +208,122 @@ public void testCreateApiConnectionSettingsDefault() {
private void setOptionsToValue(String value, Configuration conf, String... keys) {
Arrays.stream(keys).forEach(key -> conf.set(key, value));
}

/**
* if {@link org.apache.hadoop.fs.s3a.Constants#CUSTOM_HEADERS_STS} is set,
* verify that returned client configuration has desired headers set.
*/
@Test
public void testInitRequestHeadersForSTS() throws IOException {
final Configuration conf = new Configuration();
conf.set(CUSTOM_HEADERS_STS, "header1=value1;value2,header2=value3");

Assertions.assertThat(conf.get(CUSTOM_HEADERS_S3))
.describedAs("Custom client headers for s3 %s", CUSTOM_HEADERS_S3)
.isNull();

Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3)
.headers().size())
.describedAs("Count of S3 client headers")
.isEqualTo(0);

Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_STS)
.headers().size())
.describedAs("Count of STS client headers")
.isEqualTo(2);

Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_STS)
.headers().get("header1"))
.describedAs("STS client 'header1' header value")
.isEqualTo(Lists.newArrayList("value1", "value2"));

Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_STS)
.headers().get("header2"))
.describedAs("STS client 'header2' header value")
.isEqualTo(Lists.newArrayList("value3"));
}

/**
* if {@link org.apache.hadoop.fs.s3a.Constants#CUSTOM_HEADERS_S3} is set,
* verify that returned client configuration has desired headers set.
*/
@Test
public void testInitRequestHeadersForS3() throws IOException {
final Configuration conf = new Configuration();
conf.set(CUSTOM_HEADERS_S3, "header1=value1;value2,header2=value3");

Assertions.assertThat(conf.get(CUSTOM_HEADERS_STS))
.describedAs("Custom client headers for STS %s", CUSTOM_HEADERS_STS)
.isNull();

Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_STS)
.headers().size())
.describedAs("Count of STS client headers")
.isEqualTo(0);

Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3)
.headers().size())
.describedAs("Count of S3 client headers")
.isEqualTo(2);

Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3)
.headers().get("header1"))
.describedAs("S3 client 'header1' header value")
.isEqualTo(Lists.newArrayList("value1", "value2"));

Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3)
.headers().get("header2"))
.describedAs("S3 client 'header2' header value")
.isEqualTo(Lists.newArrayList("value3"));
}

/**
* if {@link org.apache.hadoop.fs.s3a.Constants#CUSTOM_HEADERS_S3} is set,
* verify that returned client configuration has desired headers set with
* whitespaces trimmed for headers and values.
*/
@Test
public void testInitRequestHeadersForS3WithWhitespace() throws IOException {
final Configuration conf = new Configuration();
conf.set(CUSTOM_HEADERS_S3, " header1 = value1 ; value2 , header2= value3 ");

Assertions.assertThat(conf.get(CUSTOM_HEADERS_STS))
.describedAs("Custom client headers for STS %s", CUSTOM_HEADERS_STS)
.isNull();

Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_STS)
.headers().size())
.describedAs("Count of STS client headers")
.isEqualTo(0);

Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3)
.headers().size())
.describedAs("Count of S3 client headers")
.isEqualTo(2);

Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3)
.headers().get("header1"))
.describedAs("S3 client 'header1' header value")
.isEqualTo(Lists.newArrayList("value1", "value2"));

Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3)
.headers().get("header2"))
.describedAs("S3 client 'header2' header value")
.isEqualTo(Lists.newArrayList("value3"));
}

/**
* if {@link org.apache.hadoop.fs.s3a.Constants#CUSTOM_HEADERS_S3} is set with duplicate values,
* verify that returned client configuration has desired headers with both values.
*/
@Test
public void testInitRequestHeadersForS3WithDuplicateValues() throws IOException {
Configuration conf = new Configuration();
conf.set(CUSTOM_HEADERS_S3, "header1=duplicate;duplicate");

Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3)
.headers().get("header1"))
.describedAs("S3 client 'header1' header value")
.isEqualTo(Lists.newArrayList("duplicate", "duplicate"));
}
}