Skip to content

Commit

Permalink
HADOOP-13887. Support S3 client side encryption (S3-CSE) using AWS-SDK (
Browse files Browse the repository at this point in the history
#2706)


This (big!) patch adds support for client side encryption in AWS S3,
with keys managed by AWS-KMS.

Read the documentation in encryption.md very, very carefully before
use and consider it unstable.

S3-CSE is enabled in the existing configuration option
"fs.s3a.server-side-encryption-algorithm":

fs.s3a.server-side-encryption-algorithm=CSE-KMS
fs.s3a.server-side-encryption.key=<KMS_KEY_ID>

You cannot enable CSE and SSE in the same client, although
you can still enable a default SSE option in the S3 console. 
  
* Filesystem list/get status operations subtract 16 bytes from the length
  of all files >= 16 bytes long to compensate for the padding which CSE
  adds.
* The SDK always warns about the specific algorithm chosen being
  deprecated. It is critical to use this algorithm for ranged
  GET requests to work (i.e. random IO). Ignore.
* Unencrypted files CANNOT BE READ.
  The entire bucket SHOULD be encrypted with S3-CSE.
* Uploading files may be a bit slower as blocks are now
  written sequentially.
* The Multipart Upload API is disabled when S3-CSE is active.

Contributed by Mehakmeet Singh
  • Loading branch information
mehakmeet authored Jul 27, 2021
1 parent b038042 commit f813554
Show file tree
Hide file tree
Showing 35 changed files with 1,371 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ public void setup() throws Exception {

final FileSystem fs = getFileSystem();
Path testPath = getContract().getTestPath();
Assume.assumeTrue("Multipart uploader is not supported",
fs.hasPathCapability(testPath,
CommonPathCapabilities.FS_MULTIPART_UPLOADER));
uploader0 = fs.createMultipartUploader(testPath).build();
uploader1 = fs.createMultipartUploader(testPath).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,12 @@ private Constants() {
"fs.s3a.multipart.purge.age";
public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 86400;

// s3 server-side encryption, see S3AEncryptionMethods for valid options
/**
* s3 server-side encryption or s3 client side encryption method, see
* {@link S3AEncryptionMethods} for valid options.
*
* {@value}
*/
public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM =
"fs.s3a.server-side-encryption-algorithm";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,23 @@
import com.amazonaws.SdkClientException;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.handlers.RequestHandler2;
import com.amazonaws.regions.RegionUtils;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Builder;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.AmazonS3EncryptionClientV2Builder;
import com.amazonaws.services.s3.AmazonS3EncryptionV2;
import com.amazonaws.services.s3.S3ClientOptions;
import com.amazonaws.services.s3.internal.ServiceUtils;
import com.amazonaws.services.s3.model.CryptoConfigurationV2;
import com.amazonaws.services.s3.model.CryptoMode;
import com.amazonaws.services.s3.model.CryptoRangeGetMode;
import com.amazonaws.services.s3.model.EncryptionMaterialsProvider;
import com.amazonaws.services.s3.model.KMSEncryptionMaterialsProvider;
import com.amazonaws.util.AwsHostNameUtils;
import com.amazonaws.util.RuntimeHttpUtils;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,6 +58,8 @@
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CENTRAL_REGION;
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT;
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;

/**
Expand Down Expand Up @@ -112,15 +124,77 @@ public AmazonS3 createS3Client(
}

try {
return buildAmazonS3Client(
awsConf,
parameters);
if (S3AEncryptionMethods.getMethod(S3AUtils.
lookupPassword(conf, SERVER_SIDE_ENCRYPTION_ALGORITHM, null))
.equals(S3AEncryptionMethods.CSE_KMS)) {
return buildAmazonS3EncryptionClient(
awsConf,
parameters);
} else {
return buildAmazonS3Client(
awsConf,
parameters);
}
} catch (SdkClientException e) {
// SDK refused to build.
throw translateException("creating AWS S3 client", uri.toString(), e);
}
}

/**
* Create an {@link AmazonS3} client of type
* {@link AmazonS3EncryptionV2} if CSE is enabled.
*
* @param awsConf AWS configuration.
* @param parameters parameters.
*
* @return new AmazonS3 client.
* @throws IOException if lookupPassword() has any problem.
*/
protected AmazonS3 buildAmazonS3EncryptionClient(
final ClientConfiguration awsConf,
final S3ClientCreationParameters parameters) throws IOException {

AmazonS3 client;
AmazonS3EncryptionClientV2Builder builder =
new AmazonS3EncryptionClientV2Builder();
Configuration conf = getConf();

//CSE-KMS Method
String kmsKeyId = S3AUtils.lookupPassword(conf,
SERVER_SIDE_ENCRYPTION_KEY, null);
// Check if kmsKeyID is not null
Preconditions.checkArgument(kmsKeyId != null, "CSE-KMS method "
+ "requires KMS key ID. Use " + SERVER_SIDE_ENCRYPTION_KEY
+ " property to set it. ");

EncryptionMaterialsProvider materialsProvider =
new KMSEncryptionMaterialsProvider(kmsKeyId);
builder.withEncryptionMaterialsProvider(materialsProvider);
//Configure basic params of a S3 builder.
configureBasicParams(builder, awsConf, parameters);

// Configuring endpoint.
AmazonS3EncryptionClientV2Builder.EndpointConfiguration epr
= createEndpointConfiguration(parameters.getEndpoint(),
awsConf, getConf().getTrimmed(AWS_REGION));
configureEndpoint(builder, epr);

// Create cryptoConfig.
CryptoConfigurationV2 cryptoConfigurationV2 =
new CryptoConfigurationV2(CryptoMode.AuthenticatedEncryption)
.withRangeGetMode(CryptoRangeGetMode.ALL);
if (epr != null) {
cryptoConfigurationV2
.withAwsKmsRegion(RegionUtils.getRegion(epr.getSigningRegion()));
LOG.debug("KMS region used: {}", cryptoConfigurationV2.getAwsKmsRegion());
}
builder.withCryptoConfiguration(cryptoConfigurationV2);
client = builder.build();

return client;
}

/**
* Use the Builder API to create an AWS S3 client.
* <p>
Expand All @@ -137,41 +211,68 @@ protected AmazonS3 buildAmazonS3Client(
final ClientConfiguration awsConf,
final S3ClientCreationParameters parameters) {
AmazonS3ClientBuilder b = AmazonS3Client.builder();
b.withCredentials(parameters.getCredentialSet());
b.withClientConfiguration(awsConf);
b.withPathStyleAccessEnabled(parameters.isPathStyleAccess());
configureBasicParams(b, awsConf, parameters);

// endpoint set up is a PITA
AwsClientBuilder.EndpointConfiguration epr
= createEndpointConfiguration(parameters.getEndpoint(),
awsConf, getConf().getTrimmed(AWS_REGION));
configureEndpoint(b, epr);
final AmazonS3 client = b.build();
return client;
}

/**
* A method to configure basic AmazonS3Builder parameters.
*
* @param builder Instance of AmazonS3Builder used.
* @param awsConf ClientConfiguration used.
* @param parameters Parameters used to set in the builder.
*/
private void configureBasicParams(AmazonS3Builder builder,
ClientConfiguration awsConf, S3ClientCreationParameters parameters) {
builder.withCredentials(parameters.getCredentialSet());
builder.withClientConfiguration(awsConf);
builder.withPathStyleAccessEnabled(parameters.isPathStyleAccess());

if (parameters.getMetrics() != null) {
b.withMetricsCollector(
builder.withMetricsCollector(
new AwsStatisticsCollector(parameters.getMetrics()));
}
if (parameters.getRequestHandlers() != null) {
b.withRequestHandlers(
builder.withRequestHandlers(
parameters.getRequestHandlers().toArray(new RequestHandler2[0]));
}
if (parameters.getMonitoringListener() != null) {
b.withMonitoringListener(parameters.getMonitoringListener());
builder.withMonitoringListener(parameters.getMonitoringListener());
}

// endpoint set up is a PITA
AwsClientBuilder.EndpointConfiguration epr
= createEndpointConfiguration(parameters.getEndpoint(),
awsConf, getConf().getTrimmed(AWS_REGION));
}

/**
* A method to configure endpoint and Region for an AmazonS3Builder.
*
* @param builder Instance of AmazonS3Builder used.
* @param epr EndpointConfiguration used to set in builder.
*/
private void configureEndpoint(
AmazonS3Builder builder,
AmazonS3Builder.EndpointConfiguration epr) {
if (epr != null) {
// an endpoint binding was constructed: use it.
b.withEndpointConfiguration(epr);
builder.withEndpointConfiguration(epr);
} else {
// no idea what the endpoint is, so tell the SDK
// to work it out at the cost of an extra HEAD request
b.withForceGlobalBucketAccessEnabled(true);
builder.withForceGlobalBucketAccessEnabled(true);
// HADOOP-17771 force set the region so the build process doesn't halt.
String region = getConf().getTrimmed(AWS_REGION, AWS_S3_CENTRAL_REGION);
LOG.debug("fs.s3a.endpoint.region=\"{}\"", region);
if (!region.isEmpty()) {
// there's either an explicit region or we have fallen back
// to the central one.
LOG.debug("Using default endpoint; setting region to {}", region);
b.setRegion(region);
builder.setRegion(region);
} else {
// no region.
// allow this if people really want it; it is OK to rely on this
Expand All @@ -180,8 +281,6 @@ protected AmazonS3 buildAmazonS3Client(
LOG.debug(SDK_REGION_CHAIN_IN_USE);
}
}
final AmazonS3 client = b.build();
return client;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,18 @@
public class Listing extends AbstractStoreOperation {

private static final Logger LOG = S3AFileSystem.LOG;
private final boolean isCSEEnabled;

static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N =
new AcceptAllButS3nDirs();

private final ListingOperationCallbacks listingOperationCallbacks;

public Listing(ListingOperationCallbacks listingOperationCallbacks,
StoreContext storeContext) {
StoreContext storeContext) {
super(storeContext);
this.listingOperationCallbacks = listingOperationCallbacks;
this.isCSEEnabled = storeContext.isCSEEnabled();
}

/**
Expand Down Expand Up @@ -687,7 +689,7 @@ private boolean buildNextStatusBatch(S3ListResult objects) {
S3AFileStatus status = createFileStatus(keyPath, summary,
listingOperationCallbacks.getDefaultBlockSize(keyPath),
getStoreContext().getUsername(),
summary.getETag(), null);
summary.getETag(), null, isCSEEnabled);
LOG.debug("Adding: {}", status);
stats.add(status);
added++;
Expand Down Expand Up @@ -961,7 +963,7 @@ public AcceptFilesOnly(Path qualifiedPath) {
public boolean accept(Path keyPath, S3ObjectSummary summary) {
return !keyPath.equals(qualifiedPath)
&& !summary.getKey().endsWith(S3N_FOLDER_SUFFIX)
&& !objectRepresentsDirectory(summary.getKey(), summary.getSize());
&& !objectRepresentsDirectory(summary.getKey());
}

/**
Expand Down Expand Up @@ -1049,6 +1051,7 @@ public boolean accept(FileStatus status) {
}
}

@SuppressWarnings("unchecked")
public static RemoteIterator<LocatedFileStatus> toLocatedFileStatusIterator(
RemoteIterator<? extends LocatedFileStatus> iterator) {
return (RemoteIterator < LocatedFileStatus >) iterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ class S3ABlockOutputStream extends OutputStream implements
private static final LogExactlyOnce WARN_ON_SYNCABLE =
new LogExactlyOnce(LOG);

/** is client side encryption enabled? */
private final boolean isCSEEnabled;

/**
* An S3A output stream which uploads partitions in a separate pool of
* threads; different {@link S3ADataBlocks.BlockFactory}
Expand Down Expand Up @@ -189,6 +192,7 @@ class S3ABlockOutputStream extends OutputStream implements
LOG.debug("Put tracker requests multipart upload");
initMultipartUpload();
}
this.isCSEEnabled = builder.isCSEEnabled;
}

/**
Expand Down Expand Up @@ -307,29 +311,34 @@ public synchronized void write(byte[] source, int offset, int len)
// of capacity
// Trigger an upload then process the remainder.
LOG.debug("writing more data than block has capacity -triggering upload");
uploadCurrentBlock();
uploadCurrentBlock(false);
// tail recursion is mildly expensive, but given buffer sizes must be MB.
// it's unlikely to recurse very deeply.
this.write(source, offset + written, len - written);
} else {
if (remainingCapacity == 0) {
if (remainingCapacity == 0 && !isCSEEnabled) {
// the whole buffer is done, trigger an upload
uploadCurrentBlock();
uploadCurrentBlock(false);
}
}
}

/**
* Start an asynchronous upload of the current block.
*
* @param isLast true, if part being uploaded is last and client side
* encryption is enabled.
* @throws IOException Problems opening the destination for upload,
* initializing the upload, or if a previous operation has failed.
* initializing the upload, or if a previous operation
* has failed.
*/
private synchronized void uploadCurrentBlock() throws IOException {
private synchronized void uploadCurrentBlock(boolean isLast)
throws IOException {
Preconditions.checkState(hasActiveBlock(), "No active block");
LOG.debug("Writing block # {}", blockCount);
initMultipartUpload();
try {
multiPartUpload.uploadBlockAsync(getActiveBlock());
multiPartUpload.uploadBlockAsync(getActiveBlock(), isLast);
bytesSubmitted += getActiveBlock().dataSize();
} finally {
// set the block to null, so the next write will create a new block.
Expand Down Expand Up @@ -389,8 +398,9 @@ public void close() throws IOException {
// PUT the final block
if (hasBlock &&
(block.hasData() || multiPartUpload.getPartsSubmitted() == 0)) {
//send last part
uploadCurrentBlock();
// send last part and set the value of isLastPart to true.
// Necessary to set this "true" in case of client side encryption.
uploadCurrentBlock(true);
}
// wait for the partial uploads to finish
final List<PartETag> partETags =
Expand Down Expand Up @@ -760,7 +770,8 @@ public void maybeRethrowUploadFailure() throws IOException {
* @throws IOException upload failure
* @throws PathIOException if too many blocks were written
*/
private void uploadBlockAsync(final S3ADataBlocks.DataBlock block)
private void uploadBlockAsync(final S3ADataBlocks.DataBlock block,
Boolean isLast)
throws IOException {
LOG.debug("Queueing upload of {} for upload {}", block, uploadId);
Preconditions.checkNotNull(uploadId, "Null uploadId");
Expand All @@ -781,6 +792,7 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block)
uploadData.getUploadStream(),
uploadData.getFile(),
0L);
request.setLastPart(isLast);
} catch (SdkBaseException aws) {
// catch and translate
IOException e = translateException("upload", key, aws);
Expand Down Expand Up @@ -1042,6 +1054,9 @@ public static final class BlockOutputStreamBuilder {
/** Should Syncable calls be downgraded? */
private boolean downgradeSyncableExceptions;

/** is Client side Encryption enabled? */
private boolean isCSEEnabled;

private BlockOutputStreamBuilder() {
}

Expand Down Expand Up @@ -1157,5 +1172,15 @@ public BlockOutputStreamBuilder withDowngradeSyncableExceptions(
downgradeSyncableExceptions = value;
return this;
}

/**
* Set builder value.
* @param value new value
* @return the builder
*/
public BlockOutputStreamBuilder withCSEEnabled(boolean value) {
isCSEEnabled = value;
return this;
}
}
}
Loading

0 comments on commit f813554

Please sign in to comment.