Skip to content

[SPARK-17850][Core]Add a flag to ignore corrupt files (branch 1.6) #15454

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ class HadoopRDD[K, V](

private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)

private val ignoreCorruptFiles =
sparkContext.conf.getBoolean("spark.files.ignoreCorruptFiles", true)

// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
val conf: Configuration = broadcastedConf.value.value
Expand Down Expand Up @@ -245,8 +248,7 @@ class HadoopRDD[K, V](
try {
finished = !reader.next(key, value)
} catch {
case eof: EOFException =>
finished = true
case _: EOFException if ignoreCorruptFiles => finished = true
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't use IOException to keep the default behavior is same as before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

}
if (!finished) {
inputMetrics.incRecordsRead(1)
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.rdd

import java.io.EOFException
import java.text.SimpleDateFormat
import java.util.Date

Expand Down Expand Up @@ -84,6 +85,9 @@ class NewHadoopRDD[K, V](

private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)

private val ignoreCorruptFiles =
sparkContext.conf.getBoolean("spark.files.ignoreCorruptFiles", true)

def getConf: Configuration = {
val conf: Configuration = confBroadcast.value.value
if (shouldCloneJobConf) {
Expand Down Expand Up @@ -171,7 +175,11 @@ class NewHadoopRDD[K, V](

override def hasNext: Boolean = {
if (!finished && !havePair) {
finished = !reader.nextKeyValue
try {
finished = !reader.nextKeyValue
} catch {
case _: EOFException if ignoreCorruptFiles => finished = true
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a behavior change to NewHadoopRDD, which may surprise the existing 1.6 users.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I am slightly worried about this change of behavior too.
Though I think it should be fine.

}
if (finished) {
// Close and release the reader here; close() will also be called when the task
// completes, but for tasks that read from many files, it helps to release the
Expand Down
62 changes: 61 additions & 1 deletion core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark

import java.io.{File, FileWriter}
import java.io._
import java.util.zip.GZIPOutputStream

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.input.PortableDataStream
Expand Down Expand Up @@ -540,4 +541,63 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
}.collect()
assert(inputPaths.toSet === Set(s"$outDir/part-00000", s"$outDir/part-00001"))
}

test("spark.files.ignoreCorruptFiles should work both HadoopRDD and NewHadoopRDD") {
val inputFile = File.createTempFile("input-", ".gz")
try {
// Create a corrupt gzip file
val byteOutput = new ByteArrayOutputStream()
val gzip = new GZIPOutputStream(byteOutput)
try {
gzip.write(Array[Byte](1, 2, 3, 4))
} finally {
gzip.close()
}
val bytes = byteOutput.toByteArray
val o = new FileOutputStream(inputFile)
try {
// It's corrupt since we only write half of bytes into the file.
o.write(bytes.take(bytes.length / 2))
} finally {
o.close()
}

// Spark job should ignore corrupt files by default
sc = new SparkContext("local", "test")
// Test HadoopRDD
assert(sc.textFile(inputFile.toURI.toString).collect().isEmpty)
// Test NewHadoopRDD
assert {
sc.newAPIHadoopFile(
inputFile.toURI.toString,
classOf[NewTextInputFormat],
classOf[LongWritable],
classOf[Text]).collect().isEmpty
}
sc.stop()

// Reading a corrupt gzip file should throw EOFException
val conf = new SparkConf().set("spark.files.ignoreCorruptFiles", "false")
sc = new SparkContext("local", "test", conf)
// Test HadoopRDD
var e = intercept[SparkException] {
sc.textFile(inputFile.toURI.toString).collect()
}
assert(e.getCause.isInstanceOf[EOFException])
assert(e.getCause.getMessage === "Unexpected end of input stream")
// Test NewHadoopRDD
e = intercept[SparkException] {
sc.newAPIHadoopFile(
inputFile.toURI.toString,
classOf[NewTextInputFormat],
classOf[LongWritable],
classOf[Text]).collect()
}
assert(e.getCause.isInstanceOf[EOFException])
assert(e.getCause.getMessage === "Unexpected end of input stream")
} finally {
inputFile.delete()
}
}

}