Skip to content

Commit c59ad42

Browse files
viiryacloud-fan
authored andcommitted
[SPARK-20848][SQL] Shutdown the pool after reading parquet files
## What changes were proposed in this pull request? From JIRA: On each call to spark.read.parquet, a new ForkJoinPool is created. One of the threads in the pool is kept in the WAITING state, and never stopped, which leads to unbounded growth in number of threads. We should shutdown the pool after reading parquet files. ## How was this patch tested? Added a test to ParquetFileFormatSuite. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #18073 from viirya/SPARK-20848. (cherry picked from commit f72ad30) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 83aeac9 commit c59ad42

File tree

1 file changed

+4
-1
lines changed

1 file changed

+4
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,8 @@ object ParquetFileFormat extends Logging {
479479
partFiles: Seq[FileStatus],
480480
ignoreCorruptFiles: Boolean): Seq[Footer] = {
481481
val parFiles = partFiles.par
482-
parFiles.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
482+
val pool = new ForkJoinPool(8)
483+
parFiles.tasksupport = new ForkJoinTaskSupport(pool)
483484
parFiles.flatMap { currentFile =>
484485
try {
485486
// Skips row group information since we only need the schema.
@@ -495,6 +496,8 @@ object ParquetFileFormat extends Logging {
495496
} else {
496497
throw new IOException(s"Could not read footer for file: $currentFile", e)
497498
}
499+
} finally {
500+
pool.shutdown()
498501
}
499502
}.seq
500503
}

0 commit comments

Comments
 (0)