Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-13887. Support S3 client side encryption (S3-CSE) using AWS-SDK #2706

Merged
merged 10 commits into from
Jul 27, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ public void setup() throws Exception {

final FileSystem fs = getFileSystem();
Path testPath = getContract().getTestPath();
Assume.assumeTrue("Multipart uploader is not supported",
fs.hasPathCapability(testPath,
CommonPathCapabilities.FS_MULTIPART_UPLOADER));
uploader0 = fs.createMultipartUploader(testPath).build();
uploader1 = fs.createMultipartUploader(testPath).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,12 @@ private Constants() {
"fs.s3a.multipart.purge.age";
public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 86400;

// s3 server-side encryption, see S3AEncryptionMethods for valid options
/**
* s3 server-side encryption or s3 client side encryption method, see

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, apologies if this had been discussed before but wouldn't be better to define the client-side encryption in a property called fs.s3a.client-side-encryption-algorithm? Or perhaps deprecate the current property and move to just fs.s3a.encryption-algorithm?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having two was too complex. as for a single name and deprecating the other -yes that's a great idea! we haven't shipped yet, so if you can add a patch there it'd be welcome.

(FWIW, in my local test setups i will stay on the old setting so tests on older branches still work. changing a config name is tricky, even with the deprecation mechanism)

* {@link S3AEncryptionMethods} for valid options.
*
* {@value}
*/
public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM =
"fs.s3a.server-side-encryption-algorithm";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,23 @@
import com.amazonaws.SdkClientException;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.handlers.RequestHandler2;
import com.amazonaws.regions.RegionUtils;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Builder;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.AmazonS3EncryptionClientV2Builder;
import com.amazonaws.services.s3.AmazonS3EncryptionV2;
import com.amazonaws.services.s3.S3ClientOptions;
import com.amazonaws.services.s3.internal.ServiceUtils;
import com.amazonaws.services.s3.model.CryptoConfigurationV2;
import com.amazonaws.services.s3.model.CryptoMode;
import com.amazonaws.services.s3.model.CryptoRangeGetMode;
import com.amazonaws.services.s3.model.EncryptionMaterialsProvider;
import com.amazonaws.services.s3.model.KMSEncryptionMaterialsProvider;
import com.amazonaws.util.AwsHostNameUtils;
import com.amazonaws.util.RuntimeHttpUtils;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,6 +58,8 @@
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CENTRAL_REGION;
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT;
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;

/**
Expand Down Expand Up @@ -112,15 +124,77 @@ public AmazonS3 createS3Client(
}

try {
return buildAmazonS3Client(
awsConf,
parameters);
if (S3AEncryptionMethods.getMethod(S3AUtils.
lookupPassword(conf, SERVER_SIDE_ENCRYPTION_ALGORITHM, null))
.equals(S3AEncryptionMethods.CSE_KMS)) {
return buildAmazonS3EncryptionClient(
awsConf,
parameters);
} else {
return buildAmazonS3Client(
awsConf,
parameters);
}
} catch (SdkClientException e) {
// SDK refused to build.
throw translateException("creating AWS S3 client", uri.toString(), e);
}
}

/**
* Create an {@link AmazonS3} client of type
* {@link AmazonS3EncryptionV2} if CSE is enabled.
*
* @param awsConf AWS configuration.
* @param parameters parameters.
*
* @return new AmazonS3 client.
* @throws IOException if lookupPassword() has any problem.
*/
protected AmazonS3 buildAmazonS3EncryptionClient(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InconsistentS3Client doesn't do this; I can see tests skip it. Should we have that client throw some Unsupported Exception here

final ClientConfiguration awsConf,
final S3ClientCreationParameters parameters) throws IOException {

AmazonS3 client;
AmazonS3EncryptionClientV2Builder builder =
mehakmeet marked this conversation as resolved.
Show resolved Hide resolved
new AmazonS3EncryptionClientV2Builder();
Configuration conf = getConf();

//CSE-KMS Method
String kmsKeyId = S3AUtils.lookupPassword(conf,
SERVER_SIDE_ENCRYPTION_KEY, null);
// Check if kmsKeyID is not null
Preconditions.checkArgument(kmsKeyId != null, "CSE-KMS method "
+ "requires KMS key ID. Use " + SERVER_SIDE_ENCRYPTION_KEY
+ " property to set it. ");

EncryptionMaterialsProvider materialsProvider =
new KMSEncryptionMaterialsProvider(kmsKeyId);
builder.withEncryptionMaterialsProvider(materialsProvider);
//Configure basic params of a S3 builder.
configureBasicParams(builder, awsConf, parameters);

// Configuring endpoint.
AmazonS3EncryptionClientV2Builder.EndpointConfiguration epr
= createEndpointConfiguration(parameters.getEndpoint(),
awsConf, getConf().getTrimmed(AWS_REGION));
configureEndpoint(builder, epr);

// Create cryptoConfig.
CryptoConfigurationV2 cryptoConfigurationV2 =
new CryptoConfigurationV2(CryptoMode.AuthenticatedEncryption)
.withRangeGetMode(CryptoRangeGetMode.ALL);
if (epr != null) {
cryptoConfigurationV2
.withAwsKmsRegion(RegionUtils.getRegion(epr.getSigningRegion()));
LOG.debug("KMS region used: {}", cryptoConfigurationV2.getAwsKmsRegion());
}
builder.withCryptoConfiguration(cryptoConfigurationV2);
client = builder.build();

return client;
}

/**
* Use the Builder API to create an AWS S3 client.
* <p>
Expand All @@ -137,41 +211,68 @@ protected AmazonS3 buildAmazonS3Client(
final ClientConfiguration awsConf,
final S3ClientCreationParameters parameters) {
AmazonS3ClientBuilder b = AmazonS3Client.builder();
b.withCredentials(parameters.getCredentialSet());
b.withClientConfiguration(awsConf);
b.withPathStyleAccessEnabled(parameters.isPathStyleAccess());
configureBasicParams(b, awsConf, parameters);

// endpoint set up is a PITA
AwsClientBuilder.EndpointConfiguration epr
= createEndpointConfiguration(parameters.getEndpoint(),
awsConf, getConf().getTrimmed(AWS_REGION));
configureEndpoint(b, epr);
final AmazonS3 client = b.build();
return client;
}

/**
* A method to configure basic AmazonS3Builder parameters.
*
* @param builder Instance of AmazonS3Builder used.
* @param awsConf ClientConfiguration used.
* @param parameters Parameters used to set in the builder.
*/
private void configureBasicParams(AmazonS3Builder builder,
ClientConfiguration awsConf, S3ClientCreationParameters parameters) {
builder.withCredentials(parameters.getCredentialSet());
builder.withClientConfiguration(awsConf);
builder.withPathStyleAccessEnabled(parameters.isPathStyleAccess());

if (parameters.getMetrics() != null) {
b.withMetricsCollector(
builder.withMetricsCollector(
new AwsStatisticsCollector(parameters.getMetrics()));
}
if (parameters.getRequestHandlers() != null) {
b.withRequestHandlers(
builder.withRequestHandlers(
parameters.getRequestHandlers().toArray(new RequestHandler2[0]));
}
if (parameters.getMonitoringListener() != null) {
b.withMonitoringListener(parameters.getMonitoringListener());
builder.withMonitoringListener(parameters.getMonitoringListener());
}

// endpoint set up is a PITA
AwsClientBuilder.EndpointConfiguration epr
= createEndpointConfiguration(parameters.getEndpoint(),
awsConf, getConf().getTrimmed(AWS_REGION));
}

/**
* A method to configure endpoint and Region for an AmazonS3Builder.
*
* @param builder Instance of AmazonS3Builder used.
* @param epr EndpointConfiguration used to set in builder.
*/
private void configureEndpoint(
AmazonS3Builder builder,
AmazonS3Builder.EndpointConfiguration epr) {
if (epr != null) {
// an endpoint binding was constructed: use it.
b.withEndpointConfiguration(epr);
builder.withEndpointConfiguration(epr);
} else {
// no idea what the endpoint is, so tell the SDK
// to work it out at the cost of an extra HEAD request
b.withForceGlobalBucketAccessEnabled(true);
builder.withForceGlobalBucketAccessEnabled(true);
// HADOOP-17771 force set the region so the build process doesn't halt.
String region = getConf().getTrimmed(AWS_REGION, AWS_S3_CENTRAL_REGION);
LOG.debug("fs.s3a.endpoint.region=\"{}\"", region);
if (!region.isEmpty()) {
// there's either an explicit region or we have fallen back
// to the central one.
LOG.debug("Using default endpoint; setting region to {}", region);
b.setRegion(region);
builder.setRegion(region);
} else {
// no region.
// allow this if people really want it; it is OK to rely on this
Expand All @@ -180,8 +281,6 @@ protected AmazonS3 buildAmazonS3Client(
LOG.debug(SDK_REGION_CHAIN_IN_USE);
}
}
final AmazonS3 client = b.build();
return client;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,18 @@
public class Listing extends AbstractStoreOperation {

private static final Logger LOG = S3AFileSystem.LOG;
private final boolean isCSEEnabled;

static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N =
new AcceptAllButS3nDirs();

private final ListingOperationCallbacks listingOperationCallbacks;

public Listing(ListingOperationCallbacks listingOperationCallbacks,
StoreContext storeContext) {
StoreContext storeContext) {
super(storeContext);
this.listingOperationCallbacks = listingOperationCallbacks;
this.isCSEEnabled = storeContext.isCSEEnabled();
}

/**
Expand Down Expand Up @@ -687,7 +689,7 @@ private boolean buildNextStatusBatch(S3ListResult objects) {
S3AFileStatus status = createFileStatus(keyPath, summary,
listingOperationCallbacks.getDefaultBlockSize(keyPath),
getStoreContext().getUsername(),
summary.getETag(), null);
summary.getETag(), null, isCSEEnabled);
LOG.debug("Adding: {}", status);
stats.add(status);
added++;
Expand Down Expand Up @@ -961,7 +963,7 @@ public AcceptFilesOnly(Path qualifiedPath) {
public boolean accept(Path keyPath, S3ObjectSummary summary) {
return !keyPath.equals(qualifiedPath)
&& !summary.getKey().endsWith(S3N_FOLDER_SUFFIX)
&& !objectRepresentsDirectory(summary.getKey(), summary.getSize());
&& !objectRepresentsDirectory(summary.getKey());
}

/**
Expand Down Expand Up @@ -1049,6 +1051,7 @@ public boolean accept(FileStatus status) {
}
}

@SuppressWarnings("unchecked")
public static RemoteIterator<LocatedFileStatus> toLocatedFileStatusIterator(
RemoteIterator<? extends LocatedFileStatus> iterator) {
return (RemoteIterator < LocatedFileStatus >) iterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ class S3ABlockOutputStream extends OutputStream implements
private static final LogExactlyOnce WARN_ON_SYNCABLE =
new LogExactlyOnce(LOG);

/** is client side encryption enabled? */
private final boolean isCSEEnabled;

/**
* An S3A output stream which uploads partitions in a separate pool of
* threads; different {@link S3ADataBlocks.BlockFactory}
Expand Down Expand Up @@ -189,6 +192,7 @@ class S3ABlockOutputStream extends OutputStream implements
LOG.debug("Put tracker requests multipart upload");
initMultipartUpload();
}
this.isCSEEnabled = builder.isCSEEnabled;
}

/**
Expand Down Expand Up @@ -307,29 +311,34 @@ public synchronized void write(byte[] source, int offset, int len)
// of capacity
// Trigger an upload then process the remainder.
LOG.debug("writing more data than block has capacity -triggering upload");
uploadCurrentBlock();
uploadCurrentBlock(false);
// tail recursion is mildly expensive, but given buffer sizes must be MB.
// it's unlikely to recurse very deeply.
this.write(source, offset + written, len - written);
} else {
if (remainingCapacity == 0) {
if (remainingCapacity == 0 && !isCSEEnabled) {
// the whole buffer is done, trigger an upload
uploadCurrentBlock();
uploadCurrentBlock(false);
}
}
}

/**
* Start an asynchronous upload of the current block.
*
* @param isLast true, if part being uploaded is last and client side
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and or 'or'??

* encryption is enabled.
* @throws IOException Problems opening the destination for upload,
* initializing the upload, or if a previous operation has failed.
* initializing the upload, or if a previous operation
* has failed.
*/
private synchronized void uploadCurrentBlock() throws IOException {
private synchronized void uploadCurrentBlock(boolean isLast)
throws IOException {
Preconditions.checkState(hasActiveBlock(), "No active block");
LOG.debug("Writing block # {}", blockCount);
initMultipartUpload();
try {
multiPartUpload.uploadBlockAsync(getActiveBlock());
multiPartUpload.uploadBlockAsync(getActiveBlock(), isLast);
bytesSubmitted += getActiveBlock().dataSize();
} finally {
// set the block to null, so the next write will create a new block.
Expand Down Expand Up @@ -389,8 +398,9 @@ public void close() throws IOException {
// PUT the final block
if (hasBlock &&
(block.hasData() || multiPartUpload.getPartsSubmitted() == 0)) {
//send last part
uploadCurrentBlock();
// send last part and set the value of isLastPart to true.
// Necessary to set this "true" in case of client side encryption.
uploadCurrentBlock(true);
}
// wait for the partial uploads to finish
final List<PartETag> partETags =
Expand Down Expand Up @@ -760,7 +770,8 @@ public void maybeRethrowUploadFailure() throws IOException {
* @throws IOException upload failure
* @throws PathIOException if too many blocks were written
*/
private void uploadBlockAsync(final S3ADataBlocks.DataBlock block)
private void uploadBlockAsync(final S3ADataBlocks.DataBlock block,
Boolean isLast)
mehakmeet marked this conversation as resolved.
Show resolved Hide resolved
throws IOException {
LOG.debug("Queueing upload of {} for upload {}", block, uploadId);
Preconditions.checkNotNull(uploadId, "Null uploadId");
Expand All @@ -781,6 +792,7 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block)
uploadData.getUploadStream(),
uploadData.getFile(),
0L);
request.setLastPart(isLast);
} catch (SdkBaseException aws) {
// catch and translate
IOException e = translateException("upload", key, aws);
Expand Down Expand Up @@ -1042,6 +1054,9 @@ public static final class BlockOutputStreamBuilder {
/** Should Syncable calls be downgraded? */
private boolean downgradeSyncableExceptions;

/** is Client side Encryption enabled? */
private boolean isCSEEnabled;

private BlockOutputStreamBuilder() {
}

Expand Down Expand Up @@ -1157,5 +1172,15 @@ public BlockOutputStreamBuilder withDowngradeSyncableExceptions(
downgradeSyncableExceptions = value;
return this;
}

/**
* Set builder value.
* @param value new value
* @return the builder
*/
public BlockOutputStreamBuilder withCSEEnabled(boolean value) {
isCSEEnabled = value;
return this;
}
}
}
Loading