Skip to content

Commit 299b806

Browse files
committed
MAPREDUCE-7322. revisiting TestMRIntermediateDataEncryption. Contributed by Ahmed Hussein.
1 parent aa4c17b commit 299b806

File tree

21 files changed

+1079
-445
lines changed

21 files changed

+1079
-445
lines changed

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/JarFinder.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,10 +165,11 @@ else if ("file".equals(url.getProtocol())) {
165165
if (!testDir.exists()) {
166166
testDir.mkdirs();
167167
}
168-
File tempJar = File.createTempFile("hadoop-", "", testDir);
169-
tempJar = new File(tempJar.getAbsolutePath() + ".jar");
168+
File tempFile = File.createTempFile("hadoop-", "", testDir);
169+
File tempJar = new File(tempFile.getAbsolutePath() + ".jar");
170170
createJar(baseDir, tempJar);
171171
tempJar.deleteOnExit();
172+
tempFile.deleteOnExit();
172173
return tempJar.getAbsolutePath();
173174
}
174175
}

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
import java.util.concurrent.TimeoutException;
4242

43+
import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
4344
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
4445
import org.junit.Assert;
4546

@@ -105,6 +106,8 @@
105106
import org.apache.hadoop.test.GenericTestUtils;
106107
import org.apache.hadoop.yarn.util.Clock;
107108
import org.apache.hadoop.yarn.util.SystemClock;
109+
110+
import org.junit.BeforeClass;
108111
import org.junit.Test;
109112
import org.mockito.ArgumentCaptor;
110113
import org.slf4j.Logger;
@@ -114,15 +117,24 @@
114117
public class TestRecovery {
115118

116119
private static final Logger LOG = LoggerFactory.getLogger(TestRecovery.class);
117-
private static Path outputDir = new Path(new File("target",
118-
TestRecovery.class.getName()).getAbsolutePath() +
119-
Path.SEPARATOR + "out");
120+
121+
private static File testRootDir;
122+
private static Path outputDir;
120123
private static String partFile = "part-r-00000";
121124
private Text key1 = new Text("key1");
122125
private Text key2 = new Text("key2");
123126
private Text val1 = new Text("val1");
124127
private Text val2 = new Text("val2");
125128

129+
@BeforeClass
130+
public static void setupClass() throws Exception {
131+
// setup the test root directory
132+
testRootDir =
133+
GenericTestUtils.setupTestRootDir(
134+
TestRecovery.class);
135+
outputDir = new Path(testRootDir.getAbsolutePath(), "out");
136+
}
137+
126138
/**
127139
* AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
128140
* completely disappears because of failed launch, one attempt gets killed and
@@ -600,14 +612,13 @@ public void testRecoveryWithSpillEncryption() throws Exception {
600612
MRApp app = new MRAppWithHistory(1, 1, false, this.getClass().getName(),
601613
true, ++runCount) {
602614
};
603-
Configuration conf = new Configuration();
615+
Configuration conf =
616+
MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(null);
604617
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
605618
conf.setBoolean("mapred.mapper.new-api", true);
606619
conf.setBoolean("mapred.reducer.new-api", true);
607620
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
608621
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
609-
conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
610-
611622
// run the MR job at the first attempt
612623
Job jobAttempt1 = app.submit(conf);
613624
app.waitForState(jobAttempt1, JobState.RUNNING);

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@
4242
import org.apache.hadoop.mapreduce.MRConfig;
4343
import org.apache.hadoop.mapreduce.MRJobConfig;
4444
import org.apache.hadoop.mapreduce.TaskAttemptID;
45-
import org.apache.hadoop.mapreduce.CryptoUtils;
45+
import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
46+
4647
import org.slf4j.Logger;
4748
import org.slf4j.LoggerFactory;
4849

@@ -576,7 +577,7 @@ private Writer<K,V> createSpillFile() throws IOException {
576577
file = lDirAlloc.getLocalPathForWrite(tmp.toUri().getPath(),
577578
-1, conf);
578579
FSDataOutputStream out = fs.create(file);
579-
out = CryptoUtils.wrapIfNecessary(conf, out);
580+
out = IntermediateEncryptedStream.wrapIfNecessary(conf, out, tmp);
580581
return new Writer<K, V>(conf, out, null, null, null, null, true);
581582
}
582583
}

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
6464
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
6565
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
66+
import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
6667
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
6768
import org.apache.hadoop.mapreduce.task.MapContextImpl;
6869
import org.apache.hadoop.mapreduce.CryptoUtils;
@@ -1630,7 +1631,9 @@ private void sortAndSpill() throws IOException, ClassNotFoundException,
16301631
IFile.Writer<K, V> writer = null;
16311632
try {
16321633
long segmentStart = out.getPos();
1633-
partitionOut = CryptoUtils.wrapIfNecessary(job, out, false);
1634+
partitionOut =
1635+
IntermediateEncryptedStream.wrapIfNecessary(job, out, false,
1636+
filename);
16341637
writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec,
16351638
spilledRecordsCounter);
16361639
if (combinerRunner == null) {
@@ -1687,6 +1690,7 @@ private void sortAndSpill() throws IOException, ClassNotFoundException,
16871690
Path indexFilename =
16881691
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
16891692
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
1693+
IntermediateEncryptedStream.addSpillIndexFile(indexFilename, job);
16901694
spillRec.writeToFile(indexFilename, job);
16911695
} else {
16921696
indexCacheList.add(spillRec);
@@ -1727,7 +1731,9 @@ private void spillSingleRecord(final K key, final V value,
17271731
try {
17281732
long segmentStart = out.getPos();
17291733
// Create a new codec, don't care!
1730-
partitionOut = CryptoUtils.wrapIfNecessary(job, out, false);
1734+
partitionOut =
1735+
IntermediateEncryptedStream.wrapIfNecessary(job, out, false,
1736+
filename);
17311737
writer = new IFile.Writer<K,V>(job, partitionOut, keyClass, valClass, codec,
17321738
spilledRecordsCounter);
17331739

@@ -1761,6 +1767,7 @@ private void spillSingleRecord(final K key, final V value,
17611767
Path indexFilename =
17621768
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
17631769
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
1770+
IntermediateEncryptedStream.addSpillIndexFile(indexFilename, job);
17641771
spillRec.writeToFile(indexFilename, job);
17651772
} else {
17661773
indexCacheList.add(spillRec);
@@ -1854,22 +1861,27 @@ private void mergeParts() throws IOException, InterruptedException,
18541861
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
18551862
}
18561863
if (numSpills == 1) { //the spill is the final output
1864+
Path indexFileOutput =
1865+
mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]);
18571866
sameVolRename(filename[0],
18581867
mapOutputFile.getOutputFileForWriteInVolume(filename[0]));
18591868
if (indexCacheList.size() == 0) {
1860-
sameVolRename(mapOutputFile.getSpillIndexFile(0),
1861-
mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));
1869+
Path indexFilePath = mapOutputFile.getSpillIndexFile(0);
1870+
IntermediateEncryptedStream.validateSpillIndexFile(
1871+
indexFilePath, job);
1872+
sameVolRename(indexFilePath, indexFileOutput);
18621873
} else {
1863-
indexCacheList.get(0).writeToFile(
1864-
mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job);
1874+
indexCacheList.get(0).writeToFile(indexFileOutput, job);
18651875
}
1876+
IntermediateEncryptedStream.addSpillIndexFile(indexFileOutput, job);
18661877
sortPhase.complete();
18671878
return;
18681879
}
18691880

18701881
// read in paged indices
18711882
for (int i = indexCacheList.size(); i < numSpills; ++i) {
18721883
Path indexFileName = mapOutputFile.getSpillIndexFile(i);
1884+
IntermediateEncryptedStream.validateSpillIndexFile(indexFileName, job);
18731885
indexCacheList.add(new SpillRecord(indexFileName, job));
18741886
}
18751887

@@ -1881,7 +1893,7 @@ private void mergeParts() throws IOException, InterruptedException,
18811893
mapOutputFile.getOutputFileForWrite(finalOutFileSize);
18821894
Path finalIndexFile =
18831895
mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
1884-
1896+
IntermediateEncryptedStream.addSpillIndexFile(finalIndexFile, job);
18851897
//The output stream for the final single output file
18861898
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
18871899
FSDataOutputStream finalPartitionOut = null;
@@ -1893,8 +1905,9 @@ private void mergeParts() throws IOException, InterruptedException,
18931905
try {
18941906
for (int i = 0; i < partitions; i++) {
18951907
long segmentStart = finalOut.getPos();
1896-
finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut,
1897-
false);
1908+
finalPartitionOut =
1909+
IntermediateEncryptedStream.wrapIfNecessary(job, finalOut,
1910+
false, finalOutputFile);
18981911
Writer<K, V> writer =
18991912
new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec, null);
19001913
writer.close();
@@ -1957,7 +1970,8 @@ private void mergeParts() throws IOException, InterruptedException,
19571970

19581971
//write merged output to disk
19591972
long segmentStart = finalOut.getPos();
1960-
finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut, false);
1973+
finalPartitionOut = IntermediateEncryptedStream.wrapIfNecessary(job,
1974+
finalOut, false, finalOutputFile);
19611975
Writer<K, V> writer =
19621976
new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec,
19631977
spilledRecordsCounter);

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.hadoop.mapreduce.MRConfig;
4141
import org.apache.hadoop.mapreduce.TaskType;
4242
import org.apache.hadoop.mapreduce.CryptoUtils;
43+
import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
4344
import org.apache.hadoop.util.PriorityQueue;
4445
import org.apache.hadoop.util.Progress;
4546
import org.apache.hadoop.util.Progressable;
@@ -302,7 +303,7 @@ void init(Counters.Counter readsCounter) throws IOException {
302303
FSDataInputStream in = fs.open(file);
303304

304305
in.seek(segmentOffset);
305-
in = CryptoUtils.wrapIfNecessary(conf, in);
306+
in = IntermediateEncryptedStream.wrapIfNecessary(conf, in, file);
306307
reader = new Reader<K, V>(conf, in,
307308
segmentLength - CryptoUtils.cryptoPadding(conf),
308309
codec, readsCounter);
@@ -730,7 +731,8 @@ RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
730731
approxOutputSize, conf);
731732

732733
FSDataOutputStream out = fs.create(outputFile);
733-
out = CryptoUtils.wrapIfNecessary(conf, out);
734+
out = IntermediateEncryptedStream.wrapIfNecessary(conf, out,
735+
outputFile);
734736
Writer<K, V> writer = new Writer<K, V>(conf, out, keyClass, valueClass,
735737
codec, writesCounter, true);
736738
writeFile(this, writer, reporter, conf);
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.mapreduce.security;
19+
20+
import java.io.IOException;
21+
import java.io.InputStream;
22+
23+
import org.apache.hadoop.classification.InterfaceAudience;
24+
import org.apache.hadoop.classification.InterfaceStability;
25+
import org.apache.hadoop.conf.Configuration;
26+
import org.apache.hadoop.fs.FSDataInputStream;
27+
import org.apache.hadoop.fs.FSDataOutputStream;
28+
import org.apache.hadoop.fs.Path;
29+
import org.apache.hadoop.mapreduce.CryptoUtils;
30+
31+
/**
32+
* Used to wrap helpers while spilling intermediate files.
33+
* Setting the {@link SpillCallBackInjector} helps in:
34+
* 1- adding callbacks to capture the path of the spilled files.
35+
* 2- Verifying the encryption when intermediate encryption is enabled.
36+
*/
37+
@InterfaceAudience.Private
38+
@InterfaceStability.Unstable
39+
public final class IntermediateEncryptedStream {
40+
41+
private static SpillCallBackInjector prevSpillCBInjector = null;
42+
43+
public static FSDataOutputStream wrapIfNecessary(Configuration conf,
44+
FSDataOutputStream out, Path outPath) throws IOException {
45+
SpillCallBackInjector.get().writeSpillFileCB(outPath, out, conf);
46+
return CryptoUtils.wrapIfNecessary(conf, out, true);
47+
}
48+
49+
public static FSDataOutputStream wrapIfNecessary(Configuration conf,
50+
FSDataOutputStream out, boolean closeOutputStream,
51+
Path outPath) throws IOException {
52+
SpillCallBackInjector.get().writeSpillFileCB(outPath, out, conf);
53+
return CryptoUtils.wrapIfNecessary(conf, out, closeOutputStream);
54+
}
55+
56+
public static FSDataInputStream wrapIfNecessary(Configuration conf,
57+
FSDataInputStream in, Path inputPath) throws IOException {
58+
SpillCallBackInjector.get().getSpillFileCB(inputPath, in, conf);
59+
return CryptoUtils.wrapIfNecessary(conf, in);
60+
}
61+
62+
public static InputStream wrapIfNecessary(Configuration conf,
63+
InputStream in, long length, Path inputPath) throws IOException {
64+
SpillCallBackInjector.get().getSpillFileCB(inputPath, in, conf);
65+
return CryptoUtils.wrapIfNecessary(conf, in, length);
66+
}
67+
68+
public static void addSpillIndexFile(Path indexFilename, Configuration conf) {
69+
SpillCallBackInjector.get().addSpillIndexFileCB(indexFilename, conf);
70+
}
71+
72+
public static void validateSpillIndexFile(Path indexFilename,
73+
Configuration conf) {
74+
SpillCallBackInjector.get().validateSpillIndexFileCB(indexFilename, conf);
75+
}
76+
77+
public static SpillCallBackInjector resetSpillCBInjector() {
78+
return setSpillCBInjector(prevSpillCBInjector);
79+
}
80+
81+
public synchronized static SpillCallBackInjector setSpillCBInjector(
82+
SpillCallBackInjector spillInjector) {
83+
prevSpillCBInjector =
84+
SpillCallBackInjector.getAndSet(spillInjector);
85+
return spillInjector;
86+
}
87+
88+
private IntermediateEncryptedStream() {}
89+
}

0 commit comments

Comments
 (0)