Skip to content

HADOOP-14661. Add S3 requester pays bucket support to S3A #3962

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Mar 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
4abf17d
HADOOP-14661. Add S3 requester pays bucket support to S3A
dannycjones Feb 4, 2022
a401c36
HADOOP-14661. Fix checkstyle
dannycjones Feb 7, 2022
038956f
Merge branch 'apache:trunk' into HADOOP-14661
dannycjones Mar 10, 2022
8d205b5
Add close statements for FS and InputStream in Requester Pays S3A int…
dannycjones Feb 28, 2022
257fe5b
Move requester pays header value to constant
dannycjones Feb 28, 2022
4ed5959
Deprecate unused requester pays flag in S3A's RequestFactory
dannycjones Feb 28, 2022
6fcea06
Add JavaDoc to ALLOW_REQUESTER_PAYS and related variables
dannycjones Mar 9, 2022
41e8016
Replace . with - in requester pays config
dannycjones Mar 9, 2022
15ed59d
Fix imports on ITestS3ARequesterPays
dannycjones Mar 9, 2022
a3768a5
Remove unnecessary configuration disabling FS caching in ITestS3ARequ…
dannycjones Mar 9, 2022
059b8d6
Use IOStatisticAssertions
dannycjones Mar 9, 2022
152c567
Update documentation to split over multiple lines
dannycjones Mar 9, 2022
876103f
Add listFiles call to S3A requester pays integ test
dannycjones Mar 10, 2022
bbccc72
Add removal of base/bucket overrides for requester pays tests
dannycjones Mar 10, 2022
9dd0bae
Enable full bucket exists check in requester pays tests
dannycjones Mar 10, 2022
d58b0f9
Add note in testing.md on how to configure requester pays tests
dannycjones Mar 10, 2022
1fbbc48
Update ITestS3ARequesterPays to use static imports for constants
dannycjones Mar 10, 2022
ae8e812
Fix blanks in documentation
dannycjones Mar 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,15 @@ private Constants() {
"fs.s3a.connection.ssl.enabled";
public static final boolean DEFAULT_SECURE_CONNECTIONS = true;

/**
* Configuration option for S3 Requester Pays feature: {@value}.
*/
public static final String ALLOW_REQUESTER_PAYS = "fs.s3a.requester.pays.enabled";
/**
* Default configuration for {@value ALLOW_REQUESTER_PAYS}: {@value}.
*/
public static final boolean DEFAULT_ALLOW_REQUESTER_PAYS = false;

// use OpenSSL or JSEE for secure connections
public static final String SSL_CHANNEL_MODE = "fs.s3a.ssl.channel.mode";
public static final DelegatingSSLSocketFactory.SSLChannelMode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.hadoop.fs.s3a.statistics.impl.AwsStatisticsCollector;
import org.apache.hadoop.fs.store.LogExactlyOnce;

import static com.amazonaws.services.s3.Headers.REQUESTER_PAYS_HEADER;
import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CENTRAL_REGION;
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
Expand All @@ -75,6 +76,8 @@ public class DefaultS3ClientFactory extends Configured

private static final String S3_SERVICE_NAME = "s3";

private static final String REQUESTER_PAYS_HEADER_VALUE = "requester";

/**
* Subclasses refer to this.
*/
Expand Down Expand Up @@ -118,6 +121,11 @@ public AmazonS3 createS3Client(
parameters.getHeaders().forEach((h, v) ->
awsConf.addHeader(h, v));

if (parameters.isRequesterPays()) {
// All calls must acknowledge requester will pay via header.
awsConf.addHeader(REQUESTER_PAYS_HEADER, REQUESTER_PAYS_HEADER_VALUE);
}

// When EXPERIMENTAL_AWS_INTERNAL_THROTTLING is false
// throttling is explicitly disabled on the S3 client so that
// all failures are collected in S3A instrumentation, and its
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,7 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
.withMetrics(statisticsContext.newStatisticsFromAwsSdk())
.withPathStyleAccess(conf.getBoolean(PATH_STYLE_ACCESS, false))
.withUserAgentSuffix(uaSuffix)
.withRequesterPays(conf.getBoolean(ALLOW_REQUESTER_PAYS, DEFAULT_ALLOW_REQUESTER_PAYS))
.withRequestHandlers(auditManager.createRequestHandlers());

s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ final class S3ClientCreationParameters {
private boolean pathStyleAccess;

/**
* This is in the settings awaiting wiring up and testing.
* Permit requests to requester pays buckets.
*/
private boolean requesterPays;

Expand Down Expand Up @@ -168,7 +168,7 @@ public S3ClientCreationParameters withMetrics(
}

/**
* Requester pays option. Not yet wired up.
* Set requester pays option.
* @param value new value
* @return the builder
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,6 @@ public class RequestFactoryImpl implements RequestFactory {
*/
private final long multipartPartCountLimit;

/**
* Requester Pays.
* This is to be wired up in a PR with its
* own tests and docs.
*/
private final boolean requesterPays;

/**
* Callback to prepare requests.
*/
Expand All @@ -133,7 +126,6 @@ protected RequestFactoryImpl(
this.cannedACL = builder.cannedACL;
this.encryptionSecrets = builder.encryptionSecrets;
this.multipartPartCountLimit = builder.multipartPartCountLimit;
this.requesterPays = builder.requesterPays;
this.requestPreparer = builder.requestPreparer;
this.contentEncoding = builder.contentEncoding;
}
Expand Down Expand Up @@ -616,9 +608,6 @@ public static final class RequestFactoryBuilder {
*/
private CannedAccessControlList cannedACL = null;

/** Requester Pays flag. */
private boolean requesterPays = false;

/** Content Encoding. */
private String contentEncoding;

Expand Down Expand Up @@ -685,17 +674,6 @@ public RequestFactoryBuilder withCannedACL(
return this;
}

/**
* Requester Pays flag.
* @param value new value
* @return the builder
*/
public RequestFactoryBuilder withRequesterPays(
final boolean value) {
requesterPays = value;
return this;
}

/**
* Multipart limit.
* @param value new value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1633,6 +1633,25 @@ Before using Access Points make sure you're not impacted by the following:
considering endpoints, if you have any custom signers that use the host endpoint property make
sure to update them if needed;

## <a name="requester_pays"></a>Requester Pays buckets

S3A supports buckets with
[Requester Pays](https://docs.aws.amazon.com/AmazonS3/latest/userguide/RequesterPaysBuckets.html)
enabled. When a bucket is configured with requester pays, the requester must cover
the per-request cost.

For requests to be successful, the S3 client must acknowledge that they will pay
for these requests by setting a request flag, usually a header, on each request.

To enable this feature within S3A, configure the `fs.s3a.requester.pays.enabled` property.

```xml
<property>
<name>fs.s3a.requester.pays.enabled</name>
<value>true</value>
</property>
```

## <a name="upload"></a>How S3A writes data to S3

The original S3A client implemented file writes by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,31 @@ your `core-site.xml` file, so that trying to use S3 select fails fast with
a meaningful error ("S3 Select not supported") rather than a generic Bad Request
exception.

### Testing Requester Pays

By default, the requester pays tests will look for a bucket that exists on Amazon S3
in us-east-1.

If the endpoint does support requester pays, you can specify an alternative object.
The test only requires an object of at least a few bytes in order
to check that lists and basic reads work.

```xml
<property>
<name>test.fs.s3a.requester.pays.file</name>
<value>s3a://my-req-pays-enabled-bucket/on-another-endpoint.json</value>
</property>
```

If the endpoint does not support requester pays, you can also disable the tests by configuring
the test URI as a single space.

```xml
<property>
<name>test.fs.s3a.requester.pays.file</name>
<value> </value>
</property>
```

### Testing Session Credentials

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,20 @@ When trying to write or read SEE-KMS-encrypted data, the client gets a
The caller does not have the permissions to access
the key with which the data was encrypted.

### <a name="access_denied_requester_pays"></a>`AccessDeniedException` when using a "Requester Pays" enabled bucket

When making cross-account requests to a requester pays enabled bucket, all calls must acknowledge via a header that the requester will be billed.

If you don't enable this acknowledgement within S3A, then you will see a message similar to this:

```
java.nio.file.AccessDeniedException: s3a://my-bucket/my-object: getFileStatus on s3a://my-bucket/my-object:
com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403;
Error Code: 403 Forbidden; Request ID: myshortreqid; S3 Extended Request ID: mylongreqid):403 Forbidden
```

To enable requester pays, set `fs.s3a.requester.pays.enabled` property to `true`.

### <a name="no_region_session_credentials"></a> "Unable to find a region via the region provider chain." when using session credentials.

Region must be provided when requesting session credentials, or an exception will be thrown with the message:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a;

import java.nio.file.AccessDeniedException;

import org.junit.Test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;

import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS;
import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;

/**
* Tests for Requester Pays feature.
*/
public class ITestS3ARequesterPays extends AbstractS3ATestBase {

@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
S3ATestUtils.removeBaseAndBucketOverrides(conf,
ALLOW_REQUESTER_PAYS,
S3A_BUCKET_PROBE);
return conf;
}

@Test
public void testRequesterPaysOptionSuccess() throws Throwable {
describe("Test requester pays enabled case by reading last then first byte");

Configuration conf = this.createConfiguration();
conf.setBoolean(ALLOW_REQUESTER_PAYS, true);
// Enable bucket exists check, the first failure point people may encounter
conf.setInt(S3A_BUCKET_PROBE, 2);

Path requesterPaysPath = getRequesterPaysPath(conf);

try (
FileSystem fs = requesterPaysPath.getFileSystem(conf);
FSDataInputStream inputStream = fs.open(requesterPaysPath);
) {
long fileLength = fs.getFileStatus(requesterPaysPath).getLen();

inputStream.seek(fileLength - 1);
inputStream.readByte();

// Jump back to the start, triggering a new GetObject request.
inputStream.seek(0);
inputStream.readByte();

// Verify > 1 call was made, so we're sure it is correctly configured for each request
IOStatisticAssertions
.assertThatStatisticCounter(inputStream.getIOStatistics(),
StreamStatisticNames.STREAM_READ_OPENED)
.isGreaterThan(1);

// Check list calls work without error
fs.listFiles(requesterPaysPath.getParent(), false);
}
}

@Test
public void testRequesterPaysDisabledFails() throws Throwable {
describe("Verify expected failure for requester pays buckets when client has it disabled");

Configuration conf = this.createConfiguration();
conf.setBoolean(ALLOW_REQUESTER_PAYS, false);
Path requesterPaysPath = getRequesterPaysPath(conf);

try (FileSystem fs = requesterPaysPath.getFileSystem(conf)) {
intercept(
AccessDeniedException.class,
"403 Forbidden",
"Expected requester pays bucket to fail without header set",
() -> fs.open(requesterPaysPath).close()
);
}
}

private Path getRequesterPaysPath(Configuration conf) {
String requesterPaysFile =
conf.getTrimmed(KEY_REQUESTER_PAYS_FILE, DEFAULT_REQUESTER_PAYS_FILE);
S3ATestUtils.assume(
"Empty test property: " + KEY_REQUESTER_PAYS_FILE,
!requesterPaysFile.isEmpty()
);
return new Path(requesterPaysFile);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ public interface S3ATestConstants {
*/
String DEFAULT_CSVTEST_FILE = LANDSAT_BUCKET + "scene_list.gz";

/**
* Configuration key for an existing object in a requester pays bucket: {@value}.
* If not set, defaults to {@value DEFAULT_REQUESTER_PAYS_FILE}.
*/
String KEY_REQUESTER_PAYS_FILE = TEST_FS_S3A + "requester.pays.file";

/**
* Default path for an S3 object inside a requester pays enabled bucket: {@value}.
*/
String DEFAULT_REQUESTER_PAYS_FILE = "s3a://usgs-landsat/collection02/catalog.json";

/**
* Name of the property to define the timeout for scale tests: {@value}.
* Measured in seconds.
Expand Down