Skip to content

Commit f1fa617

Browse files
yinxusenmateiz
authored andcommitted
[SPARK-1133] Add whole text files reader in MLlib
Here is a pointer to the former [PR164](#164). I add the pull request for the JIRA issue [SPARK-1133](https://spark-project.atlassian.net/browse/SPARK-1133), which brings a new files reader API in MLlib. Author: Xusen Yin <yinxusen@gmail.com> Closes #252 from yinxusen/whole-files-input and squashes the following commits: 7191be6 [Xusen Yin] refine comments 0af3faf [Xusen Yin] add JavaAPI test 01745ee [Xusen Yin] fix deletion error cc97dca [Xusen Yin] move whole text file API to Spark core d792cee [Xusen Yin] remove the typo character "+" 6bdf2c2 [Xusen Yin] test for small local file system block size a1f1e7e [Xusen Yin] add two extra spaces 28cb0fe [Xusen Yin] add whole text files reader
1 parent 01cf4c4 commit f1fa617

File tree

6 files changed

+313
-3
lines changed

6 files changed

+313
-3
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import org.apache.mesos.MesosNativeLibrary
3737

3838
import org.apache.spark.broadcast.Broadcast
3939
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
40+
import org.apache.spark.input.WholeTextFileInputFormat
4041
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
4142
import org.apache.spark.rdd._
4243
import org.apache.spark.scheduler._
@@ -371,6 +372,39 @@ class SparkContext(
371372
minSplits).map(pair => pair._2.toString)
372373
}
373374

375+
/**
376+
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
377+
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
378+
* key-value pair, where the key is the path of each file, the value is the content of each file.
379+
*
380+
* <p> For example, if you have the following files:
381+
* {{{
382+
* hdfs://a-hdfs-path/part-00000
383+
* hdfs://a-hdfs-path/part-00001
384+
* ...
385+
* hdfs://a-hdfs-path/part-nnnnn
386+
* }}}
387+
*
388+
* Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")`,
389+
*
390+
* <p> then `rdd` contains
391+
* {{{
392+
* (a-hdfs-path/part-00000, its content)
393+
* (a-hdfs-path/part-00001, its content)
394+
* ...
395+
* (a-hdfs-path/part-nnnnn, its content)
396+
* }}}
397+
*
398+
* @note Small files are perferred, large file is also allowable, but may cause bad performance.
399+
*/
400+
def wholeTextFiles(path: String): RDD[(String, String)] = {
401+
newAPIHadoopFile(
402+
path,
403+
classOf[WholeTextFileInputFormat],
404+
classOf[String],
405+
classOf[String])
406+
}
407+
374408
/**
375409
* Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other
376410
* necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,34 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
154154
*/
155155
def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits)
156156

157+
/**
158+
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
159+
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
160+
* key-value pair, where the key is the path of each file, the value is the content of each file.
161+
*
162+
* <p> For example, if you have the following files:
163+
* {{{
164+
* hdfs://a-hdfs-path/part-00000
165+
* hdfs://a-hdfs-path/part-00001
166+
* ...
167+
* hdfs://a-hdfs-path/part-nnnnn
168+
* }}}
169+
*
170+
* Do `JavaPairRDD<String, String> rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path")`,
171+
*
172+
* <p> then `rdd` contains
173+
* {{{
174+
* (a-hdfs-path/part-00000, its content)
175+
* (a-hdfs-path/part-00001, its content)
176+
* ...
177+
* (a-hdfs-path/part-nnnnn, its content)
178+
* }}}
179+
*
180+
* @note Small files are perferred, large file is also allowable, but may cause bad performance.
181+
*/
182+
def wholeTextFiles(path: String): JavaPairRDD[String, String] =
183+
new JavaPairRDD(sc.wholeTextFiles(path))
184+
157185
/** Get an RDD for a Hadoop SequenceFile with given key and value types.
158186
*
159187
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.input
19+
20+
import org.apache.hadoop.fs.Path
21+
import org.apache.hadoop.mapreduce.InputSplit
22+
import org.apache.hadoop.mapreduce.JobContext
23+
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
24+
import org.apache.hadoop.mapreduce.RecordReader
25+
import org.apache.hadoop.mapreduce.TaskAttemptContext
26+
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
27+
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
28+
29+
/**
30+
* A [[org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat CombineFileInputFormat]] for
31+
* reading whole text files. Each file is read as key-value pair, where the key is the file path and
32+
* the value is the entire content of file.
33+
*/
34+
35+
private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[String, String] {
36+
override protected def isSplitable(context: JobContext, file: Path): Boolean = false
37+
38+
override def createRecordReader(
39+
split: InputSplit,
40+
context: TaskAttemptContext): RecordReader[String, String] = {
41+
42+
new CombineFileRecordReader[String, String](
43+
split.asInstanceOf[CombineFileSplit],
44+
context,
45+
classOf[WholeTextFileRecordReader])
46+
}
47+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.input
19+
20+
import com.google.common.io.{ByteStreams, Closeables}
21+
22+
import org.apache.hadoop.io.Text
23+
import org.apache.hadoop.mapreduce.InputSplit
24+
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
25+
import org.apache.hadoop.mapreduce.RecordReader
26+
import org.apache.hadoop.mapreduce.TaskAttemptContext
27+
28+
/**
29+
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
30+
* out in a key-value pair, where the key is the file path and the value is the entire content of
31+
* the file.
32+
*/
33+
private[spark] class WholeTextFileRecordReader(
34+
split: CombineFileSplit,
35+
context: TaskAttemptContext,
36+
index: Integer)
37+
extends RecordReader[String, String] {
38+
39+
private val path = split.getPath(index)
40+
private val fs = path.getFileSystem(context.getConfiguration)
41+
42+
// True means the current file has been processed, then skip it.
43+
private var processed = false
44+
45+
private val key = path.toString
46+
private var value: String = null
47+
48+
override def initialize(split: InputSplit, context: TaskAttemptContext) = {}
49+
50+
override def close() = {}
51+
52+
override def getProgress = if (processed) 1.0f else 0.0f
53+
54+
override def getCurrentKey = key
55+
56+
override def getCurrentValue = value
57+
58+
override def nextKeyValue = {
59+
if (!processed) {
60+
val fileIn = fs.open(path)
61+
val innerBuffer = ByteStreams.toByteArray(fileIn)
62+
63+
value = new Text(innerBuffer).toString
64+
Closeables.close(fileIn, false)
65+
66+
processed = true
67+
true
68+
} else {
69+
false
70+
}
71+
}
72+
}

core/src/test/java/org/apache/spark/JavaAPISuite.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717

1818
package org.apache.spark;
1919

20-
import java.io.File;
21-
import java.io.IOException;
22-
import java.io.Serializable;
20+
import java.io.*;
2321
import java.util.*;
2422

2523
import scala.Tuple2;
@@ -599,6 +597,32 @@ public void textFiles() throws IOException {
599597
Assert.assertEquals(expected, readRDD.collect());
600598
}
601599

600+
@Test
601+
public void wholeTextFiles() throws IOException {
602+
byte[] content1 = "spark is easy to use.\n".getBytes();
603+
byte[] content2 = "spark is also easy to use.\n".getBytes();
604+
605+
File tempDir = Files.createTempDir();
606+
String tempDirName = tempDir.getAbsolutePath();
607+
DataOutputStream ds = new DataOutputStream(new FileOutputStream(tempDirName + "/part-00000"));
608+
ds.write(content1);
609+
ds.close();
610+
ds = new DataOutputStream(new FileOutputStream(tempDirName + "/part-00001"));
611+
ds.write(content2);
612+
ds.close();
613+
614+
HashMap<String, String> container = new HashMap<String, String>();
615+
container.put(tempDirName+"/part-00000", new Text(content1).toString());
616+
container.put(tempDirName+"/part-00001", new Text(content2).toString());
617+
618+
JavaPairRDD<String, String> readRDD = sc.wholeTextFiles(tempDirName);
619+
List<Tuple2<String, String>> result = readRDD.collect();
620+
621+
for (Tuple2<String, String> res : result) {
622+
Assert.assertEquals(res._2(), container.get(res._1()));
623+
}
624+
}
625+
602626
@Test
603627
public void textFilesCompressed() throws IOException {
604628
File tempDir = Files.createTempDir();
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.input
19+
20+
import java.io.DataOutputStream
21+
import java.io.File
22+
import java.io.FileOutputStream
23+
24+
import scala.collection.immutable.IndexedSeq
25+
26+
import com.google.common.io.Files
27+
28+
import org.scalatest.BeforeAndAfterAll
29+
import org.scalatest.FunSuite
30+
31+
import org.apache.hadoop.io.Text
32+
33+
import org.apache.spark.SparkContext
34+
35+
/**
36+
* Tests the correctness of
37+
* [[org.apache.spark.input.WholeTextFileRecordReader WholeTextFileRecordReader]]. A temporary
38+
* directory is created as fake input. Temporal storage would be deleted in the end.
39+
*/
40+
class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
41+
private var sc: SparkContext = _
42+
43+
override def beforeAll() {
44+
sc = new SparkContext("local", "test")
45+
46+
// Set the block size of local file system to test whether files are split right or not.
47+
sc.hadoopConfiguration.setLong("fs.local.block.size", 32)
48+
}
49+
50+
override def afterAll() {
51+
sc.stop()
52+
}
53+
54+
private def createNativeFile(inputDir: File, fileName: String, contents: Array[Byte]) = {
55+
val out = new DataOutputStream(new FileOutputStream(s"${inputDir.toString}/$fileName"))
56+
out.write(contents, 0, contents.length)
57+
out.close()
58+
}
59+
60+
/**
61+
* This code will test the behaviors of WholeTextFileRecordReader based on local disk. There are
62+
* three aspects to check:
63+
* 1) Whether all files are read;
64+
* 2) Whether paths are read correctly;
65+
* 3) Does the contents be the same.
66+
*/
67+
test("Correctness of WholeTextFileRecordReader.") {
68+
69+
val dir = Files.createTempDir()
70+
println(s"Local disk address is ${dir.toString}.")
71+
72+
WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
73+
createNativeFile(dir, filename, contents)
74+
}
75+
76+
val res = sc.wholeTextFiles(dir.toString).collect()
77+
78+
assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size,
79+
"Number of files read out does not fit with the actual value.")
80+
81+
for ((filename, contents) <- res) {
82+
val shortName = filename.split('/').last
83+
assert(WholeTextFileRecordReaderSuite.fileNames.contains(shortName),
84+
s"Missing file name $filename.")
85+
assert(contents === new Text(WholeTextFileRecordReaderSuite.files(shortName)).toString,
86+
s"file $filename contents can not match.")
87+
}
88+
89+
dir.delete()
90+
}
91+
}
92+
93+
/**
94+
* Files to be tested are defined here.
95+
*/
96+
object WholeTextFileRecordReaderSuite {
97+
private val testWords: IndexedSeq[Byte] = "Spark is easy to use.\n".map(_.toByte)
98+
99+
private val fileNames = Array("part-00000", "part-00001", "part-00002")
100+
private val fileLengths = Array(10, 100, 1000)
101+
102+
private val files = fileLengths.zip(fileNames).map { case (upperBound, filename) =>
103+
filename -> Stream.continually(testWords.toList.toStream).flatten.take(upperBound).toArray
104+
}.toMap
105+
}

0 commit comments

Comments
 (0)