Skip to content

[SPARK-1415] Hadoop min split for wholeTextFiles() #376

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -454,14 +454,21 @@ class SparkContext(config: SparkConf) extends Logging {
* (a-hdfs-path/part-nnnnn, its content)
* }}}
*
* @note Small files are preferred, as each file will be loaded fully in memory.
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
*
* @param minSplits A suggestion value of the minimal splitting number for input data.
*/
def wholeTextFiles(path: String): RDD[(String, String)] = {
newAPIHadoopFile(
path,
def wholeTextFiles(path: String, minSplits: Int = defaultMinSplits): RDD[(String, String)] = {
val job = new NewHadoopJob(hadoopConfiguration)
NewFileInputFormat.addInputPath(job, new Path(path))
val updateConf = job.getConfiguration
new WholeTextFileRDD(
this,
classOf[WholeTextFileInputFormat],
classOf[String],
classOf[String])
classOf[String],
updateConf,
minSplits)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,19 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* (a-hdfs-path/part-nnnnn, its content)
* }}}
*
* @note Small files are preferred, as each file will be loaded fully in memory.
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
*
* @param minSplits A suggestion value of the minimal splitting number for input data.
*/
def wholeTextFiles(path: String, minSplits: Int): JavaPairRDD[String, String] =
new JavaPairRDD(sc.wholeTextFiles(path, minSplits))

/**
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
* key-value pair, where the key is the path of each file, the value is the content of each file.
*
* @see `wholeTextFiles(path: String, minSplits: Int)`.
*/
def wholeTextFiles(path: String): JavaPairRDD[String, String] =
new JavaPairRDD(sc.wholeTextFiles(path))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.input

import scala.collection.JavaConversions._

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.JobContext
Expand Down Expand Up @@ -44,4 +46,16 @@ private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[Str
context,
classOf[WholeTextFileRecordReader])
}

/**
* Allow minSplits set by end-user in order to keep compatibility with old Hadoop API.
*/
def setMaxSplitSize(context: JobContext, minSplits: Int) {
val files = listStatus(context)
val totalLen = files.map { file =>
if (file.isDir) 0L else file.getLen
}.sum
val maxSplitSize = Math.ceil(totalLen * 1.0 / (if (minSplits == 0) 1 else minSplits)).toLong
super.setMaxSplitSize(maxSplitSize)
}
}
60 changes: 49 additions & 11 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,18 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._

import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable, SparkContext, TaskContext}
import org.apache.spark.annotation.DeveloperApi

private[spark]
class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable)
import org.apache.spark.input.WholeTextFileInputFormat
import org.apache.spark.InterruptibleIterator
import org.apache.spark.Logging
import org.apache.spark.Partition
import org.apache.spark.SerializableWritable
import org.apache.spark.{SparkContext, TaskContext}

private[spark] class NewHadoopPartition(
rddId: Int,
val index: Int,
@transient rawSplit: InputSplit with Writable)
extends Partition {

val serializableHadoopSplit = new SerializableWritable(rawSplit)
Expand Down Expand Up @@ -65,17 +72,19 @@ class NewHadoopRDD[K, V](
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
// private val serializableConf = new SerializableWritable(conf)

private val jobtrackerId: String = {
private val jobTrackerId: String = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
formatter.format(new Date())
}

@transient private val jobId = new JobID(jobtrackerId, id)
@transient protected val jobId = new JobID(jobTrackerId, id)

override def getPartitions: Array[Partition] = {
val inputFormat = inputFormatClass.newInstance
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(conf)
inputFormat match {
case configurable: Configurable =>
configurable.setConf(conf)
case _ =>
}
val jobContext = newJobContext(conf, jobId)
val rawSplits = inputFormat.getSplits(jobContext).toArray
Expand All @@ -91,11 +100,13 @@ class NewHadoopRDD[K, V](
val split = theSplit.asInstanceOf[NewHadoopPartition]
logInfo("Input split: " + split.serializableHadoopSplit)
val conf = confBroadcast.value.value
val attemptId = newTaskAttemptID(jobtrackerId, id, isMap = true, split.index, 0)
val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
val format = inputFormatClass.newInstance
if (format.isInstanceOf[Configurable]) {
format.asInstanceOf[Configurable].setConf(conf)
format match {
case configurable: Configurable =>
configurable.setConf(conf)
case _ =>
}
val reader = format.createRecordReader(
split.serializableHadoopSplit.value, hadoopAttemptContext)
Expand Down Expand Up @@ -141,3 +152,30 @@ class NewHadoopRDD[K, V](
def getConf: Configuration = confBroadcast.value.value
}

private[spark] class WholeTextFileRDD(
sc : SparkContext,
inputFormatClass: Class[_ <: WholeTextFileInputFormat],
keyClass: Class[String],
valueClass: Class[String],
@transient conf: Configuration,
minSplits: Int)
extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, valueClass, conf) {

override def getPartitions: Array[Partition] = {
val inputFormat = inputFormatClass.newInstance
inputFormat match {
case configurable: Configurable =>
configurable.setConf(conf)
case _ =>
}
val jobContext = newJobContext(conf, jobId)
inputFormat.setMaxSplitSize(jobContext, minSplits)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
}
result
}
}

2 changes: 1 addition & 1 deletion core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ public void wholeTextFiles() throws IOException {
container.put(tempDirName+"/part-00000", new Text(content1).toString());
container.put(tempDirName+"/part-00001", new Text(content2).toString());

JavaPairRDD<String, String> readRDD = sc.wholeTextFiles(tempDirName);
JavaPairRDD<String, String> readRDD = sc.wholeTextFiles(tempDirName, 3);
List<Tuple2<String, String>> result = readRDD.collect();

for (Tuple2<String, String> res : result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
createNativeFile(dir, filename, contents)
}

val res = sc.wholeTextFiles(dir.toString).collect()
val res = sc.wholeTextFiles(dir.toString, 3).collect()

assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size,
"Number of files read out does not fit with the actual value.")
Expand Down