Skip to content

Commit c613296

Browse files
committed
MAPREDUCE-7241. FileInputFormat listStatus with less memory footprint. Contributed by Zhihua Deng
1 parent c162648 commit c613296

File tree

4 files changed

+114
-11
lines changed

4 files changed

+114
-11
lines changed

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,8 @@ protected void addInputPathRecursively(List<FileStatus> result,
193193
if (stat.isDirectory()) {
194194
addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
195195
} else {
196-
result.add(stat);
196+
result.add(org.apache.hadoop.mapreduce.lib.input.
197+
FileInputFormat.shrinkStatus(stat));
197198
}
198199
}
199200
}
@@ -290,7 +291,8 @@ private List<FileStatus> singleThreadedListStatus(JobConf job, Path[] dirs,
290291
addInputPathRecursively(result, fs, stat.getPath(),
291292
inputFilter);
292293
} else {
293-
result.add(stat);
294+
result.add(org.apache.hadoop.mapreduce.lib.input.
295+
FileInputFormat.shrinkStatus(stat));
294296
}
295297
}
296298
}

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,8 @@ public Result call() throws Exception {
259259
if (recursive && stat.isDirectory()) {
260260
result.dirsNeedingRecursiveCalls.add(stat);
261261
} else {
262-
result.locatedFileStatuses.add(stat);
262+
result.locatedFileStatuses.add(org.apache.hadoop.mapreduce.lib.
263+
input.FileInputFormat.shrinkStatus(stat));
263264
}
264265
}
265266
}

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs,
325325
addInputPathRecursively(result, fs, stat.getPath(),
326326
inputFilter);
327327
} else {
328-
result.add(stat);
328+
result.add(shrinkStatus(stat));
329329
}
330330
}
331331
}
@@ -364,13 +364,42 @@ protected void addInputPathRecursively(List<FileStatus> result,
364364
if (stat.isDirectory()) {
365365
addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
366366
} else {
367-
result.add(stat);
367+
result.add(shrinkStatus(stat));
368368
}
369369
}
370370
}
371371
}
372-
373-
372+
373+
/**
374+
* The HdfsBlockLocation includes a LocatedBlock which contains messages
375+
* for issuing more detailed queries to datanodes about a block, but these
376+
* messages are useless during job submission currently. This method tries
377+
* to exclude the LocatedBlock from HdfsBlockLocation by creating a new
378+
* BlockLocation from original, reshaping the LocatedFileStatus,
379+
* allowing {@link #listStatus(JobContext)} to scan more files with less
380+
* memory footprint.
381+
* @see BlockLocation
382+
* @see org.apache.hadoop.fs.HdfsBlockLocation
383+
* @param origStat The fat FileStatus.
384+
* @return The FileStatus that has been shrunk.
385+
*/
386+
public static FileStatus shrinkStatus(FileStatus origStat) {
387+
if (origStat.isDirectory() || origStat.getLen() == 0 ||
388+
!(origStat instanceof LocatedFileStatus)) {
389+
return origStat;
390+
} else {
391+
BlockLocation[] blockLocations =
392+
((LocatedFileStatus)origStat).getBlockLocations();
393+
BlockLocation[] locs = new BlockLocation[blockLocations.length];
394+
int i = 0;
395+
for (BlockLocation location : blockLocations) {
396+
locs[i++] = new BlockLocation(location);
397+
}
398+
LocatedFileStatus newStat = new LocatedFileStatus(origStat, locs);
399+
return newStat;
400+
}
401+
}
402+
374403
/**
375404
* A factory that makes the split for this class. It can be overridden
376405
* by sub-classes to make sub-types

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java

Lines changed: 75 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,17 @@
3232
import org.apache.hadoop.fs.BlockLocation;
3333
import org.apache.hadoop.fs.FileStatus;
3434
import org.apache.hadoop.fs.FileSystem;
35+
import org.apache.hadoop.fs.HdfsBlockLocation;
3536
import org.apache.hadoop.fs.LocatedFileStatus;
3637
import org.apache.hadoop.fs.Path;
3738
import org.apache.hadoop.fs.PathFilter;
3839
import org.apache.hadoop.fs.RawLocalFileSystem;
3940
import org.apache.hadoop.fs.RemoteIterator;
41+
import org.apache.hadoop.hdfs.protocol.DatanodeID;
42+
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
43+
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
44+
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
45+
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
4046
import org.apache.hadoop.mapred.SplitLocationInfo;
4147
import org.apache.hadoop.mapreduce.InputSplit;
4248
import org.apache.hadoop.mapreduce.Job;
@@ -238,6 +244,50 @@ public void testListStatusErrorOnNonExistantDir() throws IOException {
238244
}
239245
}
240246

247+
@Test
248+
public void testShrinkStatus() throws IOException {
249+
Configuration conf = getConfiguration();
250+
MockFileSystem mockFs =
251+
(MockFileSystem) new Path("test:///").getFileSystem(conf);
252+
Path dir1 = new Path("test:/a1");
253+
RemoteIterator<LocatedFileStatus> statuses = mockFs.listLocatedStatus(dir1);
254+
boolean verified = false;
255+
while (statuses.hasNext()) {
256+
LocatedFileStatus orig = statuses.next();
257+
LocatedFileStatus shrink =
258+
(LocatedFileStatus)FileInputFormat.shrinkStatus(orig);
259+
Assert.assertTrue(orig.equals(shrink));
260+
if (shrink.getBlockLocations() != null) {
261+
Assert.assertEquals(orig.getBlockLocations().length,
262+
shrink.getBlockLocations().length);
263+
for (int i = 0; i < shrink.getBlockLocations().length; i++) {
264+
verified = true;
265+
BlockLocation location = shrink.getBlockLocations()[i];
266+
BlockLocation actual = orig.getBlockLocations()[i];
267+
Assert.assertNotNull(((HdfsBlockLocation)actual).getLocatedBlock());
268+
Assert.assertEquals(BlockLocation.class.getName(),
269+
location.getClass().getName());
270+
Assert.assertArrayEquals(actual.getHosts(), location.getHosts());
271+
Assert.assertArrayEquals(actual.getCachedHosts(),
272+
location.getCachedHosts());
273+
Assert.assertArrayEquals(actual.getStorageIds(),
274+
location.getStorageIds());
275+
Assert.assertArrayEquals(actual.getStorageTypes(),
276+
location.getStorageTypes());
277+
Assert.assertArrayEquals(actual.getTopologyPaths(),
278+
location.getTopologyPaths());
279+
Assert.assertArrayEquals(actual.getNames(), location.getNames());
280+
Assert.assertEquals(actual.getLength(), location.getLength());
281+
Assert.assertEquals(actual.getOffset(), location.getOffset());
282+
Assert.assertEquals(actual.isCorrupt(), location.isCorrupt());
283+
}
284+
} else {
285+
Assert.assertTrue(orig.getBlockLocations() == null);
286+
}
287+
}
288+
Assert.assertTrue(verified);
289+
}
290+
241291
public static List<Path> configureTestSimple(Configuration conf, FileSystem localFs)
242292
throws IOException {
243293
Path base1 = new Path(TEST_ROOT_DIR, "input1");
@@ -437,10 +487,31 @@ public FileStatus[] listStatus(Path f, PathFilter filter)
437487
@Override
438488
public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len)
439489
throws IOException {
440-
return new BlockLocation[] {
441-
new BlockLocation(new String[] { "localhost:9866", "otherhost:9866" },
442-
new String[] { "localhost", "otherhost" }, new String[] { "localhost" },
443-
new String[0], 0, len, false) }; }
490+
DatanodeInfo[] ds = new DatanodeInfo[2];
491+
ds[0] = new DatanodeDescriptor(
492+
new DatanodeID("127.0.0.1", "localhost", "abcd",
493+
9866, 9867, 9868, 9869));
494+
ds[1] = new DatanodeDescriptor(
495+
new DatanodeID("1.0.0.1", "otherhost", "efgh",
496+
9866, 9867, 9868, 9869));
497+
long blockLen = len / 3;
498+
ExtendedBlock b1 = new ExtendedBlock("bpid", 0, blockLen, 0);
499+
ExtendedBlock b2 = new ExtendedBlock("bpid", 1, blockLen, 1);
500+
ExtendedBlock b3 = new ExtendedBlock("bpid", 2, len - 2 * blockLen, 2);
501+
String[] names = new String[]{ "localhost:9866", "otherhost:9866" };
502+
String[] hosts = new String[]{ "localhost", "otherhost" };
503+
String[] cachedHosts = {"localhost"};
504+
BlockLocation loc1 = new BlockLocation(names, hosts, cachedHosts,
505+
new String[0], 0, blockLen, false);
506+
BlockLocation loc2 = new BlockLocation(names, hosts, cachedHosts,
507+
new String[0], blockLen, blockLen, false);
508+
BlockLocation loc3 = new BlockLocation(names, hosts, cachedHosts,
509+
new String[0], 2 * blockLen, len - 2 * blockLen, false);
510+
return new BlockLocation[]{
511+
new HdfsBlockLocation(loc1, new LocatedBlock(b1, ds)),
512+
new HdfsBlockLocation(loc2, new LocatedBlock(b2, ds)),
513+
new HdfsBlockLocation(loc3, new LocatedBlock(b3, ds)) };
514+
}
444515

445516
@Override
446517
protected RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f,

0 commit comments

Comments
 (0)