-
Notifications
You must be signed in to change notification settings - Fork 9.1k
HADOOP-17198 Support S3 AccessPoint #3260
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.hadoop.fs.s3a; | ||
|
||
import javax.annotation.Nonnull; | ||
|
||
import com.amazonaws.arn.Arn; | ||
import com.amazonaws.regions.RegionUtils; | ||
|
||
/** | ||
* Represents an Arn Resource, this can be an accesspoint or bucket. | ||
*/ | ||
public final class ArnResource { | ||
|
||
/** | ||
* Resource name. | ||
*/ | ||
private final String name; | ||
|
||
/** | ||
* Resource owner account id. | ||
*/ | ||
private final String ownerAccountId; | ||
|
||
/** | ||
* Resource region. | ||
*/ | ||
private final String region; | ||
|
||
/** | ||
* Full Arn for the resource. | ||
*/ | ||
private final String fullArn; | ||
|
||
/** | ||
* Partition for the resource. Allowed partitions: aws, aws-cn, aws-us-gov | ||
*/ | ||
private final String partition; | ||
|
||
/** | ||
* Because of the different ways an endpoint can be constructed depending on partition we're | ||
* relying on the AWS SDK to produce the endpoint. In this case we need a region key of the form | ||
* {@code String.format("accesspoint-%s", awsRegion)} | ||
*/ | ||
private final String accessPointRegionKey; | ||
|
||
private ArnResource(String name, String owner, String region, String partition, String fullArn) { | ||
this.name = name; | ||
this.ownerAccountId = owner; | ||
this.region = region; | ||
this.partition = partition; | ||
this.fullArn = fullArn; | ||
this.accessPointRegionKey = String.format("accesspoint-%s", region); | ||
} | ||
|
||
/** | ||
* Resource name. | ||
* @return resource name. | ||
*/ | ||
public String getName() { | ||
return name; | ||
} | ||
|
||
/** | ||
* Return owner's account id. | ||
* @return owner account id | ||
*/ | ||
public String getOwnerAccountId() { | ||
return ownerAccountId; | ||
} | ||
|
||
/** | ||
* Resource region. | ||
* @return resource region. | ||
*/ | ||
public String getRegion() { | ||
return region; | ||
} | ||
|
||
/** | ||
* Full arn for resource. | ||
* @return arn for resource. | ||
*/ | ||
public String getFullArn() { | ||
return fullArn; | ||
} | ||
|
||
/** | ||
* Formatted endpoint for the resource. | ||
* @return resource endpoint. | ||
*/ | ||
public String getEndpoint() { | ||
return RegionUtils.getRegion(accessPointRegionKey) | ||
.getServiceEndpoint("s3"); | ||
} | ||
|
||
/** | ||
* Parses the passed `arn` string into a full ArnResource. | ||
* @param arn - string representing an Arn resource. | ||
* @return new ArnResource instance. | ||
* @throws IllegalArgumentException - if the Arn is malformed or any of the region, accountId and | ||
steveloughran marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* resource name properties are empty. | ||
*/ | ||
@Nonnull | ||
public static ArnResource accessPointFromArn(String arn) throws IllegalArgumentException { | ||
Arn parsed = Arn.fromString(arn); | ||
|
||
if (parsed.getRegion().isEmpty() || parsed.getAccountId().isEmpty() || | ||
parsed.getResourceAsString().isEmpty()) { | ||
throw new IllegalArgumentException( | ||
String.format("Access Point Arn %s has an invalid format or missing properties", arn)); | ||
} | ||
|
||
String resourceName = parsed.getResource().getResource(); | ||
return new ArnResource(resourceName, parsed.getAccountId(), parsed.getRegion(), | ||
parsed.getPartition(), arn); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -216,10 +216,14 @@ | |
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions; | ||
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound; | ||
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket; | ||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AP_REQUIRED_EXCEPTION; | ||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AP_S3GUARD_INCOMPATIBLE; | ||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.ARN_BUCKET_OPTION; | ||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH; | ||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_S3GUARD_INCOMPATIBLE; | ||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT; | ||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT; | ||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_403; | ||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404; | ||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT; | ||
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion; | ||
|
@@ -274,6 +278,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, | |
private Invoker s3guardInvoker = new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL, | ||
Invoker.LOG_EVENT); | ||
private final Retried onRetry = this::operationRetried; | ||
|
||
/** | ||
* Represents bucket name for all S3 operations. If per bucket override for | ||
* {@link InternalConstants#ARN_BUCKET_OPTION} property is set, then the bucket is updated to | ||
* point to the configured Arn. | ||
*/ | ||
private String bucket; | ||
private int maxKeys; | ||
private Listing listing; | ||
|
@@ -367,6 +377,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, | |
*/ | ||
private boolean isCSEEnabled; | ||
|
||
/** | ||
* Bucket AccessPoint. | ||
*/ | ||
private ArnResource accessPoint; | ||
|
||
/** Add any deprecated keys. */ | ||
@SuppressWarnings("deprecation") | ||
private static void addDeprecatedKeys() { | ||
|
@@ -408,10 +423,20 @@ public void initialize(URI name, Configuration originalConf) | |
LOG.debug("Initializing S3AFileSystem for {}", bucket); | ||
// clone the configuration into one with propagated bucket options | ||
Configuration conf = propagateBucketOptions(originalConf, bucket); | ||
|
||
// HADOOP-17894. remove references to s3a stores in JCEKS credentials. | ||
conf = ProviderUtils.excludeIncompatibleCredentialProviders( | ||
conf, S3AFileSystem.class); | ||
String arn = String.format(ARN_BUCKET_OPTION, bucket); | ||
String configuredArn = conf.getTrimmed(arn, ""); | ||
if (!configuredArn.isEmpty()) { | ||
accessPoint = ArnResource.accessPointFromArn(configuredArn); | ||
LOG.info("Using AccessPoint ARN \"{}\" for bucket {}", configuredArn, bucket); | ||
bucket = accessPoint.getFullArn(); | ||
} else if (conf.getBoolean(AWS_S3_ACCESSPOINT_REQUIRED, false)) { | ||
LOG.warn("Access Point usage is required because \"{}\" is enabled," + | ||
" but not configured for the bucket: {}", AWS_S3_ACCESSPOINT_REQUIRED, bucket); | ||
throw new PathIOException(bucket, AP_REQUIRED_EXCEPTION); | ||
} | ||
|
||
// fix up the classloader of the configuration to be whatever | ||
// classloader loaded this filesystem. | ||
|
@@ -479,6 +504,11 @@ public void initialize(URI name, Configuration originalConf) | |
"version 2", listVersion); | ||
} | ||
useListV1 = (listVersion == 1); | ||
if (accessPoint != null && useListV1) { | ||
LOG.warn("V1 list configured in fs.s3a.list.version. This is not supported in by" + | ||
" access points. Upgrading to V2"); | ||
useListV1 = false; | ||
} | ||
|
||
signerManager = new SignerManager(bucket, this, conf, owner); | ||
signerManager.initCustomSigners(); | ||
|
@@ -556,6 +586,9 @@ public void initialize(URI name, Configuration originalConf) | |
if (isCSEEnabled) { | ||
throw new PathIOException(uri.toString(), CSE_S3GUARD_INCOMPATIBLE); | ||
} | ||
if (accessPoint != null) { | ||
throw new PathIOException(uri.toString(), AP_S3GUARD_INCOMPATIBLE); | ||
} | ||
} | ||
|
||
// LOG if S3Guard is disabled on the warn level set in config | ||
|
@@ -743,11 +776,24 @@ protected void verifyBucketExists() | |
*/ | ||
@Retries.RetryTranslated | ||
protected void verifyBucketExistsV2() | ||
throws UnknownStoreException, IOException { | ||
throws UnknownStoreException, IOException { | ||
if (!invoker.retry("doesBucketExistV2", bucket, true, | ||
trackDurationOfOperation(getDurationTrackerFactory(), | ||
STORE_EXISTS_PROBE.getSymbol(), | ||
() -> s3.doesBucketExistV2(bucket)))) { | ||
() -> { | ||
// Bug in SDK always returns `true` for AccessPoint ARNs with `doesBucketExistV2()` | ||
// expanding implementation to use ARNs and buckets correctly | ||
steveloughran marked this conversation as resolved.
Show resolved
Hide resolved
|
||
try { | ||
s3.getBucketAcl(bucket); | ||
} catch (AmazonServiceException ex) { | ||
int statusCode = ex.getStatusCode(); | ||
if (statusCode == SC_404 || (statusCode == SC_403 && accessPoint != null)) { | ||
return false; | ||
} | ||
} | ||
|
||
return true; | ||
}))) { | ||
throw new UnknownStoreException("s3a://" + bucket + "/", " Bucket does " | ||
+ "not exist"); | ||
} | ||
|
@@ -835,10 +881,14 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException { | |
S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL, | ||
S3ClientFactory.class); | ||
|
||
String endpoint = accessPoint == null | ||
? conf.getTrimmed(ENDPOINT, DEFAULT_ENDPOINT) | ||
: accessPoint.getEndpoint(); | ||
|
||
S3ClientFactory.S3ClientCreationParameters parameters = null; | ||
parameters = new S3ClientFactory.S3ClientCreationParameters() | ||
.withCredentialSet(credentials) | ||
.withEndpoint(conf.getTrimmed(ENDPOINT, DEFAULT_ENDPOINT)) | ||
.withEndpoint(endpoint) | ||
.withMetrics(statisticsContext.newStatisticsFromAwsSdk()) | ||
.withPathStyleAccess(conf.getBoolean(PATH_STYLE_ACCESS, false)) | ||
.withUserAgentSuffix(uaSuffix) | ||
|
@@ -1167,7 +1217,10 @@ public String getBucketLocation(String bucketName) throws IOException { | |
final String region = trackDurationAndSpan( | ||
STORE_EXISTS_PROBE, bucketName, null, () -> | ||
invoker.retry("getBucketLocation()", bucketName, true, () -> | ||
s3.getBucketLocation(bucketName))); | ||
// If accessPoint then region is known from Arn | ||
accessPoint != null | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we pull this up to L1216 & we can skip the entire overhead of duration tracking, retry etc. Currently it's overkill to wrap, but it will add it to the iostats, so maybe it's best to leave as is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I liked the iostats tracking so it doesn't look like an operation is missing / changed. |
||
? accessPoint.getRegion() | ||
: s3.getBucketLocation(bucketName))); | ||
return fixBucketRegion(region); | ||
} | ||
|
||
|
@@ -4550,6 +4603,10 @@ public String toString() { | |
.append("}"); | ||
} | ||
sb.append(", ClientSideEncryption=").append(isCSEEnabled); | ||
|
||
if (accessPoint != null) { | ||
sb.append(", arnForBucket=").append(accessPoint.getFullArn()); | ||
} | ||
sb.append('}'); | ||
return sb.toString(); | ||
} | ||
|
Uh oh!
There was an error while loading. Please reload this page.