Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-17851. Support user specified content encoding for S3A #3498

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,12 @@ private Constants() {
public static final String CANNED_ACL = "fs.s3a.acl.default";
public static final String DEFAULT_CANNED_ACL = "";

/**
* Content encoding: gzip, deflate, compress, br, etc.
* Value {@value}.
*/
public static final String CONTENT_ENCODING = "fs.s3a.object.content.encoding";

// should we try to purge old multipart uploads when starting up
public static final String PURGE_EXISTING_MULTIPART =
"fs.s3a.multipart.purge";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -938,12 +938,16 @@ protected RequestFactory createRequestFactory() {
// request factory.
initCannedAcls(getConf());

// Any encoding type
String contentEncoding = getConf().getTrimmed(CONTENT_ENCODING, null);

return RequestFactoryImpl.builder()
.withBucket(requireNonNull(bucket))
.withCannedACL(getCannedACL())
.withEncryptionSecrets(requireNonNull(encryptionSecrets))
.withMultipartPartCountLimit(partCountLimit)
.withRequestPreparer(getAuditManager()::requestCreated)
.withContentEncoding(contentEncoding)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ public interface RequestFactory {
*/
S3AEncryptionMethods getServerSideEncryptionAlgorithm();

/**
* Get the content encoding (e.g. gzip) or return null if none.
* @return content encoding
*/
String getContentEncoding();

/**
* Create a new object metadata instance.
* Any standard metadata headers are added here, for example:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public class HeaderProcessing extends AbstractStoreOperation {
XA_HEADER_PREFIX + Headers.CONTENT_DISPOSITION;

/**
* Standard HTTP header found on some S3 objects: {@value}.
* Content encoding; can be configured: {@value}.
*/
public static final String XA_CONTENT_ENCODING =
XA_HEADER_PREFIX + Headers.CONTENT_ENCODING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ public class RequestFactoryImpl implements RequestFactory {
*/
private final PrepareRequest requestPreparer;

/**
* Content encoding (null for none).
*/
private final String contentEncoding;

/**
* Constructor.
* @param builder builder with all the configuration.
Expand All @@ -130,6 +135,7 @@ protected RequestFactoryImpl(
this.multipartPartCountLimit = builder.multipartPartCountLimit;
this.requesterPays = builder.requesterPays;
this.requestPreparer = builder.requestPreparer;
this.contentEncoding = builder.contentEncoding;
}

/**
Expand Down Expand Up @@ -193,6 +199,15 @@ public S3AEncryptionMethods getServerSideEncryptionAlgorithm() {
return encryptionSecrets.getEncryptionMethod();
}

/**
* Get the content encoding (e.g. gzip) or return null if none.
* @return content encoding
*/
@Override
public String getContentEncoding() {
return contentEncoding;
}

/**
* Sets server side encryption parameters to the part upload
* request when encryption is enabled.
Expand Down Expand Up @@ -236,13 +251,18 @@ protected void setOptionalPutRequestParameters(PutObjectRequest request) {
/**
* Set the optional metadata for an object being created or copied.
* @param metadata to update.
* @param isDirectoryMarker is this for a directory marker?
*/
protected void setOptionalObjectMetadata(ObjectMetadata metadata) {
protected void setOptionalObjectMetadata(ObjectMetadata metadata,
boolean isDirectoryMarker) {
final S3AEncryptionMethods algorithm
= getServerSideEncryptionAlgorithm();
if (S3AEncryptionMethods.SSE_S3 == algorithm) {
metadata.setSSEAlgorithm(algorithm.getMethod());
}
if (contentEncoding != null && !isDirectoryMarker) {
metadata.setContentEncoding(contentEncoding);
}
}

/**
Expand All @@ -255,8 +275,21 @@ protected void setOptionalObjectMetadata(ObjectMetadata metadata) {
*/
@Override
public ObjectMetadata newObjectMetadata(long length) {
return createObjectMetadata(length, false);
}

/**
* Create a new object metadata instance.
* Any standard metadata headers are added here, for example:
* encryption.
*
* @param length length of data to set in header; Ignored if negative
* @param isDirectoryMarker is this for a directory marker?
* @return a new metadata instance
*/
private ObjectMetadata createObjectMetadata(long length, boolean isDirectoryMarker) {
final ObjectMetadata om = new ObjectMetadata();
setOptionalObjectMetadata(om);
setOptionalObjectMetadata(om, isDirectoryMarker);
if (length >= 0) {
om.setContentLength(length);
}
Expand All @@ -271,7 +304,7 @@ public CopyObjectRequest newCopyObjectRequest(String srcKey,
new CopyObjectRequest(getBucket(), srcKey, getBucket(), dstKey);
ObjectMetadata dstom = newObjectMetadata(srcom.getContentLength());
HeaderProcessing.cloneObjectMetadata(srcom, dstom);
setOptionalObjectMetadata(dstom);
setOptionalObjectMetadata(dstom, false);
copyEncryptionParameters(srcom, copyObjectRequest);
copyObjectRequest.setCannedAccessControlList(cannedACL);
copyObjectRequest.setNewObjectMetadata(dstom);
Expand Down Expand Up @@ -371,7 +404,7 @@ public int read() throws IOException {
}
};
// preparation happens in here
final ObjectMetadata md = newObjectMetadata(0L);
final ObjectMetadata md = createObjectMetadata(0L, true);
md.setContentType(HeaderProcessing.CONTENT_TYPE_X_DIRECTORY);
PutObjectRequest putObjectRequest =
newPutObjectRequest(key, md, im);
Expand Down Expand Up @@ -586,6 +619,9 @@ public static final class RequestFactoryBuilder {
/** Requester Pays flag. */
private boolean requesterPays = false;

/** Content Encoding. */
private String contentEncoding;

/**
* Multipart limit.
*/
Expand All @@ -607,6 +643,16 @@ public RequestFactory build() {
return new RequestFactoryImpl(this);
}

/**
* Content encoding.
* @param value new value
* @return the builder
*/
public RequestFactoryBuilder withContentEncoding(final String value) {
contentEncoding = value;
return this;
}

/**
* Target bucket.
* @param value new value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,17 @@ options are covered in [Testing](./testing.md).
client has permission to read the bucket.
</description>
</property>

<property>
<name>fs.s3a.object.content.encoding</name>
<value></value>
<description>
Content encoding: gzip, deflate, compress, br, etc.
This will be set in the "Content-Encoding" header of the object,
and returned in HTTP HEAD/GET requests.
</description>
</property>

```

## <a name="retry_and_recovery"></a>Retry and Recovery
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a;

import java.io.IOException;
import java.util.Map;

import org.assertj.core.api.Assertions;
import org.junit.Test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;

import static org.apache.hadoop.fs.s3a.Constants.CONTENT_ENCODING;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_CONTENT_ENCODING;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.decodeBytes;

/**
* Tests of content encoding object meta data.
*/
public class ITestS3AContentEncoding extends AbstractS3ATestBase {

private static final String GZIP = "gzip";

@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
removeBaseAndBucketOverrides(conf, CONTENT_ENCODING);
conf.set(CONTENT_ENCODING, GZIP);

return conf;
}

@Test
public void testCreatedObjectsHaveEncoding() throws Throwable {
S3AFileSystem fs = getFileSystem();
Path dir = methodPath();
fs.mkdirs(dir);
// even with content encoding enabled, directories do not have
// encoding.
Assertions.assertThat(getEncoding(dir))
.describedAs("Encoding of object %s", dir)
.isNull();
Path path = new Path(dir, "1");
ContractTestUtils.touch(fs, path);
assertObjectHasEncoding(path);
Path path2 = new Path(dir, "2");
fs.rename(path, path2);
assertObjectHasEncoding(path2);
}

/**
* Assert that a given object has gzip encoding specified.
* @param path path
*
*/
private void assertObjectHasEncoding(Path path) throws Throwable {
Assertions.assertThat(getEncoding(path))
.describedAs("Encoding of object %s", path)
.isEqualTo(GZIP);
}

/**
* Get the encoding of a path.
* @param path path
* @return encoding string or null
* @throws IOException IO Failure.
*/
private String getEncoding(Path path) throws IOException {
S3AFileSystem fs = getFileSystem();

Map<String, byte[]> xAttrs = fs.getXAttrs(path);
return decodeBytes(xAttrs.get(XA_CONTENT_ENCODING));
}
}