Skip to content

Commit ddc45b8

Browse files
author
Anuj Modi
committed
Production Code changes to support Checksum
1 parent b6d06c8 commit ddc45b8

File tree

6 files changed

+138
-2
lines changed

6 files changed

+138
-2
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,10 @@ public class AbfsConfiguration{
337337
FS_AZURE_ABFS_RENAME_RESILIENCE, DefaultValue = DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE)
338338
private boolean renameResilience;
339339

340+
@BooleanConfigurationValidatorAnnotation(ConfigurationKey =
341+
FS_AZURE_ABFS_ENABLE_CHECKSUM, DefaultValue = DEFAULT_ENABLE_ABFS_CHECKSUM)
342+
private boolean isChecksumEnabled;
343+
340344
public AbfsConfiguration(final Configuration rawConfig, String accountName)
341345
throws IllegalAccessException, InvalidConfigurationValueException, IOException {
342346
this.rawConfig = ProviderUtils.excludeIncompatibleCredentialProviders(
@@ -1150,4 +1154,12 @@ public boolean getRenameResilience() {
11501154
void setRenameResilience(boolean actualResilience) {
11511155
renameResilience = actualResilience;
11521156
}
1157+
1158+
public boolean getIsChecksumEnabled() {
1159+
return isChecksumEnabled;
1160+
}
1161+
1162+
void setIsChecksumEnabled(boolean isChecksumEnabled) {
1163+
this.isChecksumEnabled = isChecksumEnabled;
1164+
}
11531165
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,9 @@ public final class ConfigurationKeys {
241241
/** Add extra resilience to rename failures, at the expense of performance. */
242242
public static final String FS_AZURE_ABFS_RENAME_RESILIENCE = "fs.azure.enable.rename.resilience";
243243

244+
/** Add extra integrity checks on data read and written using Md5 Hash Validation*/
245+
public static final String FS_AZURE_ABFS_ENABLE_CHECKSUM = "fs.azure.enable.checksum";
246+
244247
public static String accountProperty(String property, String account) {
245248
return property + "." + account;
246249
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ public final class FileSystemConfigurations {
119119
public static final int STREAM_ID_LEN = 12;
120120
public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true;
121121
public static final boolean DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE = true;
122+
public static final boolean DEFAULT_ENABLE_ABFS_CHECKSUM = false;
122123

123124
/**
124125
* Limit of queued block upload operations before writes

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public final class HttpHeaderConfigurations {
7171
public static final String X_MS_PROPOSED_LEASE_ID = "x-ms-proposed-lease-id";
7272
public static final String X_MS_LEASE_BREAK_PERIOD = "x-ms-lease-break-period";
7373
public static final String EXPECT = "Expect";
74+
public static final String X_MS_RANGE_GET_CONTENT_MD5 = "x-ms-range-get-content-md5";
7475

7576
private HttpHeaderConfigurations() {}
7677
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
5050
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
5151

52+
import com.sun.tools.javac.util.Convert;
5253
import org.slf4j.Logger;
5354
import org.slf4j.LoggerFactory;
5455

@@ -75,6 +76,7 @@
7576
import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
7677
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
7778
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
79+
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
7880
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SERVER_SIDE_ENCRYPTION_ALGORITHM;
7981
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
8082
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
@@ -761,6 +763,8 @@ public AbfsRestOperation append(final String path, final byte[] buffer,
761763
requestHeaders.add(new AbfsHttpHeader(USER_AGENT, userAgentRetry));
762764
}
763765

766+
addCheckSumHeaderForWrite(requestHeaders, buffer);
767+
764768
// AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
765769
String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION,
766770
abfsUriQueryBuilder, cachedSasToken);
@@ -978,9 +982,12 @@ public AbfsRestOperation read(final String path, final long position, final byte
978982
TracingContext tracingContext) throws AzureBlobFileSystemException {
979983
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
980984
addCustomerProvidedKeyHeaders(requestHeaders);
981-
requestHeaders.add(new AbfsHttpHeader(RANGE,
982-
String.format("bytes=%d-%d", position, position + bufferLength - 1)));
985+
986+
AbfsHttpHeader rangeHeader = new AbfsHttpHeader(RANGE,
987+
String.format("bytes=%d-%d", position, position + bufferLength - 1));
988+
requestHeaders.add(rangeHeader);
983989
requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
990+
addCheckSumHeaderForRead(requestHeaders, bufferLength, rangeHeader);
984991

985992
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
986993
// AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
@@ -999,6 +1006,8 @@ public AbfsRestOperation read(final String path, final long position, final byte
9991006
bufferLength, sasTokenForReuse);
10001007
op.execute(tracingContext);
10011008

1009+
verifyCheckSumForRead(buffer, op.getResult());
1010+
10021011
return op;
10031012
}
10041013

@@ -1412,6 +1421,54 @@ private void appendIfNotEmpty(StringBuilder sb, String regEx,
14121421
}
14131422
}
14141423

1424+
private void addCheckSumHeaderForRead(List<AbfsHttpHeader> requestHeaders,
1425+
final int bufferLength, final AbfsHttpHeader rangeHeader) {
1426+
if(getAbfsConfiguration().getIsChecksumEnabled() &&
1427+
requestHeaders.contains(rangeHeader) && bufferLength <= 4 * ONE_MB) {
1428+
requestHeaders.add(new AbfsHttpHeader(X_MS_RANGE_GET_CONTENT_MD5, TRUE));
1429+
}
1430+
}
1431+
1432+
private void addCheckSumHeaderForWrite(List<AbfsHttpHeader> requestHeaders,
1433+
final byte[] buffer) {
1434+
if(getAbfsConfiguration().getIsChecksumEnabled()) {
1435+
try {
1436+
MessageDigest md5Digest = MessageDigest.getInstance("MD5");
1437+
byte[] md5Bytes = md5Digest.digest(buffer);
1438+
String md5Hash = Base64.getEncoder().encodeToString(md5Bytes);
1439+
requestHeaders.add(new AbfsHttpHeader(CONTENT_MD5, md5Hash));
1440+
} catch (NoSuchAlgorithmException e) {
1441+
e.printStackTrace();
1442+
}
1443+
}
1444+
}
1445+
1446+
private void verifyCheckSumForRead(final byte[] buffer, final AbfsHttpOperation result)
1447+
throws AbfsRestOperationException{
1448+
if(getAbfsConfiguration().getIsChecksumEnabled()) {
1449+
// Number of bytes returned by server could be less than or equal to what
1450+
// caller requests. In case it is less, extra bytes will be initialized to 0
1451+
// Server returned MD5 Hash will be computed on what server returned.
1452+
// We need to get exact data that server returned and compute its md5 hash
1453+
// Computed hash should be equal to what server returned
1454+
int numberOfBytesRead = (int)result.getBytesReceived();
1455+
byte[] dataRead = new byte[numberOfBytesRead];
1456+
System.arraycopy(buffer, 0, dataRead, 0, numberOfBytesRead);
1457+
1458+
try {
1459+
MessageDigest md5Digest = MessageDigest.getInstance("MD5");
1460+
byte[] md5Bytes = md5Digest.digest(dataRead);
1461+
String md5HashComputed = Base64.getEncoder().encodeToString(md5Bytes);
1462+
String md5HashActual = result.getResponseHeader(CONTENT_MD5);
1463+
if (!md5HashComputed.equals(md5HashActual)) {
1464+
throw new AbfsRestOperationException(-1, "-1", "Checksum Check Failed", new IOException());
1465+
}
1466+
} catch (NoSuchAlgorithmException e) {
1467+
e.printStackTrace();
1468+
}
1469+
}
1470+
}
1471+
14151472
@VisibleForTesting
14161473
URL getBaseUrl() {
14171474
return baseUrl;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.azurebfs;
20+
21+
import java.nio.charset.StandardCharsets;
22+
23+
import org.assertj.core.api.Assertions;
24+
import org.junit.Test;
25+
26+
import org.apache.hadoop.fs.FSDataInputStream;
27+
import org.apache.hadoop.fs.FSDataOutputStream;
28+
import org.apache.hadoop.fs.Path;
29+
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
30+
31+
/**
32+
* Test For Verifying Checksum Related Operations
33+
*/
34+
public class ITestAzureBlobFileSystemChecksum extends AbstractAbfsIntegrationTest {
35+
36+
public ITestAzureBlobFileSystemChecksum() throws Exception {
37+
super();
38+
}
39+
40+
@Test
41+
public void testWriteReadWithChecksum() throws Exception {
42+
AzureBlobFileSystem fs = getFileSystem();
43+
AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration();
44+
// Enable checksum validations for Read and Write Requests
45+
conf.setIsChecksumEnabled(true);
46+
47+
Path testpath = new Path("a/b.txt");
48+
String dataUploaded = "This is Sample Data";
49+
FSDataOutputStream out = fs.create(testpath);
50+
out.write(dataUploaded.getBytes(StandardCharsets.UTF_8));
51+
out.hflush();
52+
out.close();
53+
54+
FSDataInputStream in = fs.open(testpath);
55+
byte[] bytesRead = new byte[dataUploaded.length()];
56+
in.read(bytesRead);
57+
58+
// Verify that the data read is same as data written
59+
Assertions.assertThat(bytesRead).describedAs("").containsExactly(dataUploaded.getBytes(StandardCharsets.UTF_8));
60+
Assertions.assertThat(new String(bytesRead, StandardCharsets.UTF_8)).describedAs("").isEqualTo(dataUploaded);
61+
}
62+
}

0 commit comments

Comments
 (0)