Skip to content

Commit 4f3ce06

Browse files
viiryacloud-fan
authored andcommitted
[SPARK-19082][SQL] Make ignoreCorruptFiles work for Parquet
## What changes were proposed in this pull request? We have a config `spark.sql.files.ignoreCorruptFiles` which can be used to ignore corrupt files when reading files in SQL. Currently the `ignoreCorruptFiles` config has two issues and can't work for Parquet: 1. We only ignore corrupt files in `FileScanRDD` . Actually, we begin to read those files as early as inferring data schema from the files. For corrupt files, we can't read the schema and fail the program. A related issue reported at http://apache-spark-developers-list.1001551.n3.nabble.com/Skip-Corrupted-Parquet-blocks-footer-tc20418.html 2. In `FileScanRDD`, we assume that we only begin to read the files when starting to consume the iterator. However, it is possibly the files are read before that. In this case, `ignoreCorruptFiles` config doesn't work too. This patch targets Parquet datasource. If this direction is ok, we can address the same issue for other datasources like Orc. Two main changes in this patch: 1. Replace `ParquetFileReader.readAllFootersInParallel` by implementing the logic to read footers in multi-threaded manner We can't ignore corrupt files if we use `ParquetFileReader.readAllFootersInParallel`. So this patch implements the logic to do the similar thing in `readParquetFootersInParallel`. 2. In `FileScanRDD`, we need to ignore corrupt file too when we call `readFunction` to return iterator. One thing to notice is: We read schema from Parquet file's footer. The method to read footer `ParquetFileReader.readFooter` throws `RuntimeException`, instead of `IOException`, if it can't successfully read the footer. Please check out https://github.com/apache/parquet-mr/blob/df9d8e415436292ae33e1ca0b8da256640de9710/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L470. So this patch catches `RuntimeException`. One concern is that it might also shadow other runtime exceptions other than reading corrupt files. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #16474 from viirya/fix-ignorecorrupted-parquet-files. (cherry picked from commit 61e48f5) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent bf2f233 commit 4f3ce06

File tree

5 files changed

+140
-8
lines changed

5 files changed

+140
-8
lines changed

core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.rdd
2020
import java.io.{IOException, ObjectOutputStream}
2121

2222
import scala.collection.mutable.ArrayBuffer
23-
import scala.collection.parallel.{ForkJoinTaskSupport, ThreadPoolTaskSupport}
23+
import scala.collection.parallel.ForkJoinTaskSupport
2424
import scala.concurrent.forkjoin.ForkJoinPool
2525
import scala.reflect.ClassTag
2626

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,17 @@ class FileScanRDD(
134134
try {
135135
if (ignoreCorruptFiles) {
136136
currentIterator = new NextIterator[Object] {
137-
private val internalIter = readFunction(currentFile)
137+
private val internalIter = {
138+
try {
139+
// The readFunction may read files before consuming the iterator.
140+
// E.g., vectorized Parquet reader.
141+
readFunction(currentFile)
142+
} catch {
143+
case e @(_: RuntimeException | _: IOException) =>
144+
logWarning(s"Skipped the rest content in the corrupted file: $currentFile", e)
145+
Iterator.empty
146+
}
147+
}
138148

139149
override def getNext(): AnyRef = {
140150
try {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717

1818
package org.apache.spark.sql.execution.datasources.parquet
1919

20+
import java.io.IOException
2021
import java.net.URI
2122

2223
import scala.collection.JavaConverters._
2324
import scala.collection.mutable
25+
import scala.collection.parallel.ForkJoinTaskSupport
26+
import scala.concurrent.forkjoin.ForkJoinPool
2427
import scala.util.{Failure, Try}
2528

2629
import org.apache.hadoop.conf.Configuration
@@ -30,6 +33,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit
3033
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
3134
import org.apache.parquet.filter2.compat.FilterCompat
3235
import org.apache.parquet.filter2.predicate.FilterApi
36+
import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
3337
import org.apache.parquet.hadoop._
3438
import org.apache.parquet.hadoop.codec.CodecConfig
3539
import org.apache.parquet.hadoop.util.ContextUtil
@@ -151,7 +155,7 @@ class ParquetFileFormat
151155
}
152156
}
153157

154-
def inferSchema(
158+
override def inferSchema(
155159
sparkSession: SparkSession,
156160
parameters: Map[String, String],
157161
files: Seq[FileStatus]): Option[StructType] = {
@@ -547,6 +551,36 @@ object ParquetFileFormat extends Logging {
547551
StructType(parquetSchema ++ missingFields)
548552
}
549553

554+
/**
555+
* Reads Parquet footers in multi-threaded manner.
556+
* If the config "spark.sql.files.ignoreCorruptFiles" is set to true, we will ignore the corrupted
557+
* files when reading footers.
558+
*/
559+
private[parquet] def readParquetFootersInParallel(
560+
conf: Configuration,
561+
partFiles: Seq[FileStatus],
562+
ignoreCorruptFiles: Boolean): Seq[Footer] = {
563+
val parFiles = partFiles.par
564+
parFiles.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
565+
parFiles.flatMap { currentFile =>
566+
try {
567+
// Skips row group information since we only need the schema.
568+
// ParquetFileReader.readFooter throws RuntimeException, instead of IOException,
569+
// when it can't read the footer.
570+
Some(new Footer(currentFile.getPath(),
571+
ParquetFileReader.readFooter(
572+
conf, currentFile, SKIP_ROW_GROUPS)))
573+
} catch { case e: RuntimeException =>
574+
if (ignoreCorruptFiles) {
575+
logWarning(s"Skipped the footer in the corrupted file: $currentFile", e)
576+
None
577+
} else {
578+
throw new IOException(s"Could not read footer for file: $currentFile", e)
579+
}
580+
}
581+
}.seq
582+
}
583+
550584
/**
551585
* Figures out a merged Parquet schema with a distributed Spark job.
552586
*
@@ -587,6 +621,8 @@ object ParquetFileFormat extends Logging {
587621
val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),
588622
sparkSession.sparkContext.defaultParallelism)
589623

624+
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
625+
590626
// Issues a Spark job to read Parquet schema in parallel.
591627
val partiallyMergedSchemas =
592628
sparkSession
@@ -598,13 +634,10 @@ object ParquetFileFormat extends Logging {
598634
new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path))
599635
}.toSeq
600636

601-
// Skips row group information since we only need the schema
602-
val skipRowGroups = true
603-
604637
// Reads footers in multi-threaded manner within each task
605638
val footers =
606-
ParquetFileReader.readAllFootersInParallel(
607-
serializedConf.value, fakeFileStatuses.asJava, skipRowGroups).asScala
639+
ParquetFileFormat.readParquetFootersInParallel(
640+
serializedConf.value, fakeFileStatuses, ignoreCorruptFiles)
608641

609642
// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
610643
val converter =
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.parquet
19+
20+
import org.apache.hadoop.fs.{FileSystem, Path}
21+
22+
import org.apache.spark.SparkException
23+
import org.apache.spark.sql.QueryTest
24+
import org.apache.spark.sql.internal.SQLConf
25+
import org.apache.spark.sql.test.SharedSQLContext
26+
27+
class ParquetFileFormatSuite extends QueryTest with ParquetTest with SharedSQLContext {
28+
29+
test("read parquet footers in parallel") {
30+
def testReadFooters(ignoreCorruptFiles: Boolean): Unit = {
31+
withTempDir { dir =>
32+
val fs = FileSystem.get(sparkContext.hadoopConfiguration)
33+
val basePath = dir.getCanonicalPath
34+
35+
val path1 = new Path(basePath, "first")
36+
val path2 = new Path(basePath, "second")
37+
val path3 = new Path(basePath, "third")
38+
39+
spark.range(1).toDF("a").coalesce(1).write.parquet(path1.toString)
40+
spark.range(1, 2).toDF("a").coalesce(1).write.parquet(path2.toString)
41+
spark.range(2, 3).toDF("a").coalesce(1).write.json(path3.toString)
42+
43+
val fileStatuses =
44+
Seq(fs.listStatus(path1), fs.listStatus(path2), fs.listStatus(path3)).flatten
45+
46+
val footers = ParquetFileFormat.readParquetFootersInParallel(
47+
sparkContext.hadoopConfiguration, fileStatuses, ignoreCorruptFiles)
48+
49+
assert(footers.size == 2)
50+
}
51+
}
52+
53+
testReadFooters(true)
54+
val exception = intercept[java.io.IOException] {
55+
testReadFooters(false)
56+
}
57+
assert(exception.getMessage().contains("Could not read footer for file"))
58+
}
59+
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.io.File
2222
import org.apache.hadoop.fs.{FileSystem, Path}
2323
import org.apache.parquet.hadoop.ParquetOutputFormat
2424

25+
import org.apache.spark.SparkException
2526
import org.apache.spark.sql._
2627
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
2728
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
@@ -212,6 +213,35 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
212213
}
213214
}
214215

216+
test("Enabling/disabling ignoreCorruptFiles") {
217+
def testIgnoreCorruptFiles(): Unit = {
218+
withTempDir { dir =>
219+
val basePath = dir.getCanonicalPath
220+
spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString)
221+
spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString)
222+
spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
223+
val df = spark.read.parquet(
224+
new Path(basePath, "first").toString,
225+
new Path(basePath, "second").toString,
226+
new Path(basePath, "third").toString)
227+
checkAnswer(
228+
df,
229+
Seq(Row(0), Row(1)))
230+
}
231+
}
232+
233+
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
234+
testIgnoreCorruptFiles()
235+
}
236+
237+
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
238+
val exception = intercept[SparkException] {
239+
testIgnoreCorruptFiles()
240+
}
241+
assert(exception.getMessage().contains("is not a Parquet file"))
242+
}
243+
}
244+
215245
test("SPARK-8990 DataFrameReader.parquet() should respect user specified options") {
216246
withTempPath { dir =>
217247
val basePath = dir.getCanonicalPath

0 commit comments

Comments
 (0)