-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-17666] Ensure that RecordReaders are closed by data source file scans #15245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -38,7 +41,7 @@ class RecordReaderIterator[T](rowReader: RecordReader[_, T]) extends Iterator[T] | |||
// Close and release the reader here; close() will also be called when the task | |||
// completes, but for tasks that read from many files, it helps to release the | |||
// resources early. | |||
rowReader.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is CompletionIterator
-style cleanup.
override def close(): Unit = { | ||
if (rowReader != null) { | ||
try { | ||
// Close the reader and release it. Note: it's very important that we don't close the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is copied from NewHadoopRdd
, which contains similar defensive programming.
Nice one, looks like something that can only help. |
@@ -27,7 +29,8 @@ import org.apache.spark.sql.catalyst.InternalRow | |||
* Note that this returns [[Object]]s instead of [[InternalRow]] because we rely on erasure to pass | |||
* column batches by pretending they are rows. | |||
*/ | |||
class RecordReaderIterator[T](rowReader: RecordReader[_, T]) extends Iterator[T] { | |||
class RecordReaderIterator[T]( | |||
private[this] var rowReader: RecordReader[_, T]) extends Iterator[T] with Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By nulling out the rowReader
I think that this will prevent memory consumption from becoming too high in the list of task completion callbacks.
@@ -30,7 +31,8 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl | |||
* An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which are all of the lines | |||
* in that file. | |||
*/ | |||
class HadoopFileLinesReader(file: PartitionedFile, conf: Configuration) extends Iterator[Text] { | |||
class HadoopFileLinesReader( | |||
file: PartitionedFile, conf: Configuration) extends Iterator[Text] with Closeable { | |||
private val iterator = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a RecordReaderIterator
, whose memory footprint should become practically nothing after close()
is called, hence my decision to not null things out here.
Test build #65931 has finished for PR 15245 at commit
|
LGTM |
Merging in master. There is a conflict with 2.0. Can you make a backport pull request? |
…e scans This patch addresses a potential cause of resource leaks in data source file scans. As reported in [SPARK-17666](https://issues.apache.org/jira/browse/SPARK-17666), tasks which do not fully-consume their input may cause file handles / network connections (e.g. S3 connections) to be leaked. Spark's `NewHadoopRDD` uses a TaskContext callback to [close its record readers](https://github.com/apache/spark/blame/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L208), but the new data source file scans will only close record readers once their iterators are fully-consumed. This patch modifies `RecordReaderIterator` and `HadoopFileLinesReader` to add `close()` methods and modifies all six implementations of `FileFormat.buildReader()` to register TaskContext task completion callbacks to guarantee that cleanup is eventually performed. Tested manually for now. Author: Josh Rosen <joshrosen@databricks.com> Closes apache#15245 from JoshRosen/SPARK-17666-close-recordreader.
I've opened #15271 to backport. |
…e scans (backport) This is a branch-2.0 backport of #15245. ## What changes were proposed in this pull request? This patch addresses a potential cause of resource leaks in data source file scans. As reported in [SPARK-17666](https://issues.apache.org/jira/browse/SPARK-17666), tasks which do not fully-consume their input may cause file handles / network connections (e.g. S3 connections) to be leaked. Spark's `NewHadoopRDD` uses a TaskContext callback to [close its record readers](https://github.com/apache/spark/blame/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L208), but the new data source file scans will only close record readers once their iterators are fully-consumed. This patch modifies `RecordReaderIterator` and `HadoopFileLinesReader` to add `close()` methods and modifies all six implementations of `FileFormat.buildReader()` to register TaskContext task completion callbacks to guarantee that cleanup is eventually performed. ## How was this patch tested? Tested manually for now. Author: Josh Rosen <joshrosen@databricks.com> Closes #15271 from JoshRosen/SPARK-17666-backport.
… streams are closed ## What changes were proposed in this pull request? As a followup to SPARK-17666, ensure filesystem connections are not leaked at least in unit tests. This is done here by intercepting filesystem calls as suggested by JoshRosen . At the end of each test, we assert no filesystem streams are left open. This applies to all tests using SharedSQLContext or SharedSparkContext. ## How was this patch tested? I verified that tests in sql and core are indeed using the filesystem backend, and fixed the detected leaks. I also checked that reverting apache#15245 causes many actual test failures due to connection leaks. Author: Eric Liang <ekl@databricks.com> Author: Eric Liang <ekhliang@gmail.com> Closes apache#15306 from ericl/sc-4672.
What changes were proposed in this pull request?
This patch addresses a potential cause of resource leaks in data source file scans. As reported in SPARK-17666, tasks which do not fully-consume their input may cause file handles / network connections (e.g. S3 connections) to be leaked. Spark's
NewHadoopRDD
uses a TaskContext callback to close its record readers, but the new data source file scans will only close record readers once their iterators are fully-consumed.This patch modifies
RecordReaderIterator
andHadoopFileLinesReader
to addclose()
methods and modifies all six implementations ofFileFormat.buildReader()
to register TaskContext task completion callbacks to guarantee that cleanup is eventually performed.How was this patch tested?
Tested manually for now.