-
Notifications
You must be signed in to change notification settings - Fork 8.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-13887. Support S3 client side encryption (S3-CSE) using AWS-SDK #2706
Merged
Merged
Changes from 8 commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
b830364
HADOOP-13887. Support S3 client side encryption (S3-CSE) using AWS-SDK
1bf1b22
HADOOP-13887. Adding new tests(Dir. listing) + using unencrypted cont…
ba79e75
HADOOP-13887. Stripping padded lengths, test fixes, dir markers fix
58ae264
HADOOP-13887. Multipart + test fixes
992188e
HADOOP-17774. steve's review comments
9d3f520
HADOOP-13887. Documentation + Merging SSE-CSE props + review comments
7f81092
HADOOP-13887. ITestSessionDelegationInFileystem fix(SSE-KMS by defaul…
e3d5922
HADOOP-13887. review comments, docs + test fix
274a30e
HADOOP-13887. review comments
1a05322
HADOOP-13887. yetus fixes
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,13 +25,23 @@ | |
import com.amazonaws.SdkClientException; | ||
import com.amazonaws.client.builder.AwsClientBuilder; | ||
import com.amazonaws.handlers.RequestHandler2; | ||
import com.amazonaws.regions.RegionUtils; | ||
import com.amazonaws.services.s3.AmazonS3; | ||
import com.amazonaws.services.s3.AmazonS3Builder; | ||
import com.amazonaws.services.s3.AmazonS3Client; | ||
import com.amazonaws.services.s3.AmazonS3ClientBuilder; | ||
import com.amazonaws.services.s3.AmazonS3EncryptionClientV2Builder; | ||
import com.amazonaws.services.s3.AmazonS3EncryptionV2; | ||
import com.amazonaws.services.s3.S3ClientOptions; | ||
import com.amazonaws.services.s3.internal.ServiceUtils; | ||
import com.amazonaws.services.s3.model.CryptoConfigurationV2; | ||
import com.amazonaws.services.s3.model.CryptoMode; | ||
import com.amazonaws.services.s3.model.CryptoRangeGetMode; | ||
import com.amazonaws.services.s3.model.EncryptionMaterialsProvider; | ||
import com.amazonaws.services.s3.model.KMSEncryptionMaterialsProvider; | ||
import com.amazonaws.util.AwsHostNameUtils; | ||
import com.amazonaws.util.RuntimeHttpUtils; | ||
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; | ||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
@@ -48,6 +58,8 @@ | |
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CENTRAL_REGION; | ||
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING; | ||
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT; | ||
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM; | ||
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY; | ||
import static org.apache.hadoop.fs.s3a.S3AUtils.translateException; | ||
|
||
/** | ||
|
@@ -112,15 +124,76 @@ public AmazonS3 createS3Client( | |
} | ||
|
||
try { | ||
return buildAmazonS3Client( | ||
awsConf, | ||
parameters); | ||
if (S3AEncryptionMethods.getMethod(S3AUtils. | ||
lookupPassword(conf, S3_ENCRYPTION_ALGORITHM, null)) | ||
.equals(S3AEncryptionMethods.CSE_KMS)) { | ||
return buildAmazonS3EncryptionClient( | ||
awsConf, | ||
parameters); | ||
} else { | ||
return buildAmazonS3Client( | ||
awsConf, | ||
parameters); | ||
} | ||
} catch (SdkClientException e) { | ||
// SDK refused to build. | ||
throw translateException("creating AWS S3 client", uri.toString(), e); | ||
} | ||
} | ||
|
||
/** | ||
* Create an {@link AmazonS3} client of type | ||
* {@link AmazonS3EncryptionV2} if CSE is enabled. | ||
* | ||
* @param awsConf AWS configuration. | ||
* @param parameters parameters. | ||
* | ||
* @return new AmazonS3 client. | ||
*/ | ||
protected AmazonS3 buildAmazonS3EncryptionClient( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. InconsistentS3Client doesn't do this; I can see tests skip it. Should we have that client throw some Unsupported Exception here |
||
final ClientConfiguration awsConf, | ||
final S3ClientCreationParameters parameters) throws IOException { | ||
|
||
AmazonS3 client; | ||
AmazonS3EncryptionClientV2Builder builder = | ||
mehakmeet marked this conversation as resolved.
Show resolved
Hide resolved
|
||
new AmazonS3EncryptionClientV2Builder(); | ||
Configuration conf = getConf(); | ||
|
||
//CSE-KMS Method | ||
String kmsKeyId = S3AUtils.lookupPassword(conf, | ||
S3_ENCRYPTION_KEY, null); | ||
// Check if kmsKeyID is not null | ||
Preconditions.checkArgument(kmsKeyId != null, "CSE-KMS method " | ||
+ "requires KMS key ID. Use " + S3_ENCRYPTION_KEY | ||
+ " property to set it. "); | ||
|
||
EncryptionMaterialsProvider materialsProvider = | ||
new KMSEncryptionMaterialsProvider(kmsKeyId); | ||
builder.withEncryptionMaterialsProvider(materialsProvider); | ||
//Configure basic params of a S3 builder. | ||
configureBasicParams(builder, awsConf, parameters); | ||
|
||
// Configuring endpoint. | ||
AmazonS3EncryptionClientV2Builder.EndpointConfiguration epr | ||
= createEndpointConfiguration(parameters.getEndpoint(), | ||
awsConf, getConf().getTrimmed(AWS_REGION)); | ||
configureEndpoint(builder, epr); | ||
|
||
// Create cryptoConfig. | ||
CryptoConfigurationV2 cryptoConfigurationV2 = | ||
new CryptoConfigurationV2(CryptoMode.AuthenticatedEncryption) | ||
.withRangeGetMode(CryptoRangeGetMode.ALL); | ||
if (epr != null) { | ||
cryptoConfigurationV2 | ||
.withAwsKmsRegion(RegionUtils.getRegion(epr.getSigningRegion())); | ||
LOG.debug("KMS region used: {}", cryptoConfigurationV2.getAwsKmsRegion()); | ||
} | ||
builder.withCryptoConfiguration(cryptoConfigurationV2); | ||
client = builder.build(); | ||
|
||
return client; | ||
} | ||
|
||
/** | ||
* Use the Builder API to create an AWS S3 client. | ||
* <p> | ||
|
@@ -137,41 +210,68 @@ protected AmazonS3 buildAmazonS3Client( | |
final ClientConfiguration awsConf, | ||
final S3ClientCreationParameters parameters) { | ||
AmazonS3ClientBuilder b = AmazonS3Client.builder(); | ||
b.withCredentials(parameters.getCredentialSet()); | ||
b.withClientConfiguration(awsConf); | ||
b.withPathStyleAccessEnabled(parameters.isPathStyleAccess()); | ||
configureBasicParams(b, awsConf, parameters); | ||
|
||
// endpoint set up is a PITA | ||
AwsClientBuilder.EndpointConfiguration epr | ||
= createEndpointConfiguration(parameters.getEndpoint(), | ||
awsConf, getConf().getTrimmed(AWS_REGION)); | ||
configureEndpoint(b, epr); | ||
final AmazonS3 client = b.build(); | ||
return client; | ||
} | ||
|
||
/** | ||
* A method to configure basic AmazonS3Builder parameters. | ||
* | ||
* @param builder Instance of AmazonS3Builder used. | ||
* @param awsConf ClientConfiguration used. | ||
* @param parameters Parameters used to set in the builder. | ||
*/ | ||
private void configureBasicParams(AmazonS3Builder builder, | ||
ClientConfiguration awsConf, S3ClientCreationParameters parameters) { | ||
builder.withCredentials(parameters.getCredentialSet()); | ||
builder.withClientConfiguration(awsConf); | ||
builder.withPathStyleAccessEnabled(parameters.isPathStyleAccess()); | ||
|
||
if (parameters.getMetrics() != null) { | ||
b.withMetricsCollector( | ||
builder.withMetricsCollector( | ||
new AwsStatisticsCollector(parameters.getMetrics())); | ||
} | ||
if (parameters.getRequestHandlers() != null) { | ||
b.withRequestHandlers( | ||
builder.withRequestHandlers( | ||
parameters.getRequestHandlers().toArray(new RequestHandler2[0])); | ||
} | ||
if (parameters.getMonitoringListener() != null) { | ||
b.withMonitoringListener(parameters.getMonitoringListener()); | ||
builder.withMonitoringListener(parameters.getMonitoringListener()); | ||
} | ||
|
||
// endpoint set up is a PITA | ||
AwsClientBuilder.EndpointConfiguration epr | ||
= createEndpointConfiguration(parameters.getEndpoint(), | ||
awsConf, getConf().getTrimmed(AWS_REGION)); | ||
} | ||
|
||
/** | ||
* A method to configure endpoint and Region for an AmazonS3Builder. | ||
* | ||
* @param builder Instance of AmazonS3Builder used. | ||
* @param epr EndpointConfiguration used to set in builder. | ||
*/ | ||
private void configureEndpoint( | ||
AmazonS3Builder builder, | ||
AmazonS3Builder.EndpointConfiguration epr) { | ||
if (epr != null) { | ||
// an endpoint binding was constructed: use it. | ||
b.withEndpointConfiguration(epr); | ||
builder.withEndpointConfiguration(epr); | ||
} else { | ||
// no idea what the endpoint is, so tell the SDK | ||
// to work it out at the cost of an extra HEAD request | ||
b.withForceGlobalBucketAccessEnabled(true); | ||
builder.withForceGlobalBucketAccessEnabled(true); | ||
// HADOOP-17771 force set the region so the build process doesn't halt. | ||
String region = getConf().getTrimmed(AWS_REGION, AWS_S3_CENTRAL_REGION); | ||
LOG.debug("fs.s3a.endpoint.region=\"{}\"", region); | ||
if (!region.isEmpty()) { | ||
// there's either an explicit region or we have fallen back | ||
// to the central one. | ||
LOG.debug("Using default endpoint; setting region to {}", region); | ||
b.setRegion(region); | ||
builder.setRegion(region); | ||
} else { | ||
// no region. | ||
// allow this if people really want it; it is OK to rely on this | ||
|
@@ -180,8 +280,6 @@ protected AmazonS3 buildAmazonS3Client( | |
LOG.debug(SDK_REGION_CHAIN_IN_USE); | ||
} | ||
} | ||
final AmazonS3 client = b.build(); | ||
return client; | ||
} | ||
|
||
/** | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -155,6 +155,9 @@ class S3ABlockOutputStream extends OutputStream implements | |
private static final LogExactlyOnce WARN_ON_SYNCABLE = | ||
new LogExactlyOnce(LOG); | ||
|
||
/** is client side encryption enabled? */ | ||
private final boolean isCSEEnabled; | ||
|
||
/** | ||
* An S3A output stream which uploads partitions in a separate pool of | ||
* threads; different {@link S3ADataBlocks.BlockFactory} | ||
|
@@ -189,6 +192,7 @@ class S3ABlockOutputStream extends OutputStream implements | |
LOG.debug("Put tracker requests multipart upload"); | ||
initMultipartUpload(); | ||
} | ||
this.isCSEEnabled = builder.isCSEEnabled; | ||
} | ||
|
||
/** | ||
|
@@ -307,29 +311,33 @@ public synchronized void write(byte[] source, int offset, int len) | |
// of capacity | ||
// Trigger an upload then process the remainder. | ||
LOG.debug("writing more data than block has capacity -triggering upload"); | ||
uploadCurrentBlock(); | ||
uploadCurrentBlock(false); | ||
// tail recursion is mildly expensive, but given buffer sizes must be MB. | ||
// it's unlikely to recurse very deeply. | ||
this.write(source, offset + written, len - written); | ||
} else { | ||
if (remainingCapacity == 0) { | ||
if (remainingCapacity == 0 && !isCSEEnabled) { | ||
// the whole buffer is done, trigger an upload | ||
uploadCurrentBlock(); | ||
uploadCurrentBlock(false); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Start an asynchronous upload of the current block. | ||
* | ||
* @param isLast true, if part being uploaded is last and client side | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and or 'or'?? |
||
* encryption is enabled. | ||
* @throws IOException Problems opening the destination for upload, | ||
* initializing the upload, or if a previous operation has failed. | ||
* initializing the upload, or if a previous operation has failed. | ||
*/ | ||
private synchronized void uploadCurrentBlock() throws IOException { | ||
private synchronized void uploadCurrentBlock(boolean isLast) | ||
throws IOException { | ||
Preconditions.checkState(hasActiveBlock(), "No active block"); | ||
LOG.debug("Writing block # {}", blockCount); | ||
initMultipartUpload(); | ||
try { | ||
multiPartUpload.uploadBlockAsync(getActiveBlock()); | ||
multiPartUpload.uploadBlockAsync(getActiveBlock(), isLast); | ||
bytesSubmitted += getActiveBlock().dataSize(); | ||
} finally { | ||
// set the block to null, so the next write will create a new block. | ||
|
@@ -389,8 +397,9 @@ public void close() throws IOException { | |
// PUT the final block | ||
if (hasBlock && | ||
(block.hasData() || multiPartUpload.getPartsSubmitted() == 0)) { | ||
//send last part | ||
uploadCurrentBlock(); | ||
// send last part and set the value of isLastPart to true. | ||
// Necessary to set this "true" in case of client side encryption. | ||
uploadCurrentBlock(true); | ||
} | ||
// wait for the partial uploads to finish | ||
final List<PartETag> partETags = | ||
|
@@ -760,7 +769,8 @@ public void maybeRethrowUploadFailure() throws IOException { | |
* @throws IOException upload failure | ||
* @throws PathIOException if too many blocks were written | ||
*/ | ||
private void uploadBlockAsync(final S3ADataBlocks.DataBlock block) | ||
private void uploadBlockAsync(final S3ADataBlocks.DataBlock block, | ||
Boolean isLast) | ||
mehakmeet marked this conversation as resolved.
Show resolved
Hide resolved
|
||
throws IOException { | ||
LOG.debug("Queueing upload of {} for upload {}", block, uploadId); | ||
Preconditions.checkNotNull(uploadId, "Null uploadId"); | ||
|
@@ -781,6 +791,7 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block) | |
uploadData.getUploadStream(), | ||
uploadData.getFile(), | ||
0L); | ||
request.setLastPart(isLast); | ||
} catch (SdkBaseException aws) { | ||
// catch and translate | ||
IOException e = translateException("upload", key, aws); | ||
|
@@ -1042,6 +1053,9 @@ public static final class BlockOutputStreamBuilder { | |
/** Should Syncable calls be downgraded? */ | ||
private boolean downgradeSyncableExceptions; | ||
|
||
/** is Client side Encryption enabled? */ | ||
private boolean isCSEEnabled; | ||
|
||
private BlockOutputStreamBuilder() { | ||
} | ||
|
||
|
@@ -1157,5 +1171,15 @@ public BlockOutputStreamBuilder withDowngradeSyncableExceptions( | |
downgradeSyncableExceptions = value; | ||
return this; | ||
} | ||
|
||
/** | ||
* Set builder value. | ||
* @param value new value | ||
* @return the builder | ||
*/ | ||
public BlockOutputStreamBuilder withCSEEnabled(boolean value) { | ||
isCSEEnabled = value; | ||
return this; | ||
} | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, apologies if this had been discussed before but wouldn't be better to define the client-side encryption in a property called
fs.s3a.client-side-encryption-algorithm
? Or perhaps deprecate the current property and move to justfs.s3a.encryption-algorithm
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
having two was too complex. as for a single name and deprecating the other -yes that's a great idea! we haven't shipped yet, so if you can add a patch there it'd be welcome.
(FWIW, in my local test setups i will stay on the old setting so tests on older branches still work. changing a config name is tricky, even with the deprecation mechanism)