Skip to content

[SPARK-17666] Ensure that RecordReaders are closed by data source file scans #15245

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

JoshRosen
Copy link
Contributor

What changes were proposed in this pull request?

This patch addresses a potential cause of resource leaks in data source file scans. As reported in SPARK-17666, tasks which do not fully-consume their input may cause file handles / network connections (e.g. S3 connections) to be leaked. Spark's NewHadoopRDD uses a TaskContext callback to close its record readers, but the new data source file scans will only close record readers once their iterators are fully-consumed.

This patch modifies RecordReaderIterator and HadoopFileLinesReader to add close() methods and modifies all six implementations of FileFormat.buildReader() to register TaskContext task completion callbacks to guarantee that cleanup is eventually performed.

How was this patch tested?

Tested manually for now.

@@ -38,7 +41,7 @@ class RecordReaderIterator[T](rowReader: RecordReader[_, T]) extends Iterator[T]
// Close and release the reader here; close() will also be called when the task
// completes, but for tasks that read from many files, it helps to release the
// resources early.
rowReader.close()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is CompletionIterator-style cleanup.

override def close(): Unit = {
if (rowReader != null) {
try {
// Close the reader and release it. Note: it's very important that we don't close the
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is copied from NewHadoopRdd, which contains similar defensive programming.

@srowen
Copy link
Member

srowen commented Sep 26, 2016

Nice one, looks like something that can only help.

@@ -27,7 +29,8 @@ import org.apache.spark.sql.catalyst.InternalRow
* Note that this returns [[Object]]s instead of [[InternalRow]] because we rely on erasure to pass
* column batches by pretending they are rows.
*/
class RecordReaderIterator[T](rowReader: RecordReader[_, T]) extends Iterator[T] {
class RecordReaderIterator[T](
private[this] var rowReader: RecordReader[_, T]) extends Iterator[T] with Closeable {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By nulling out the rowReader I think that this will prevent memory consumption from becoming too high in the list of task completion callbacks.

@@ -30,7 +31,8 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
* An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which are all of the lines
* in that file.
*/
class HadoopFileLinesReader(file: PartitionedFile, conf: Configuration) extends Iterator[Text] {
class HadoopFileLinesReader(
file: PartitionedFile, conf: Configuration) extends Iterator[Text] with Closeable {
private val iterator = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a RecordReaderIterator, whose memory footprint should become practically nothing after close() is called, hence my decision to not null things out here.

@SparkQA
Copy link

SparkQA commented Sep 26, 2016

Test build #65931 has finished for PR 15245 at commit e4b8577.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor

davies commented Sep 28, 2016

LGTM

@asfgit asfgit closed this in b03b4ad Sep 28, 2016
@rxin
Copy link
Contributor

rxin commented Sep 28, 2016

Merging in master. There is a conflict with 2.0. Can you make a backport pull request?

JoshRosen added a commit to JoshRosen/spark that referenced this pull request Sep 28, 2016
…e scans

This patch addresses a potential cause of resource leaks in data source file scans. As reported in [SPARK-17666](https://issues.apache.org/jira/browse/SPARK-17666), tasks which do not fully-consume their input may cause file handles / network connections (e.g. S3 connections) to be leaked. Spark's `NewHadoopRDD` uses a TaskContext callback to [close its record readers](https://github.com/apache/spark/blame/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L208), but the new data source file scans will only close record readers once their iterators are fully-consumed.

This patch modifies `RecordReaderIterator` and `HadoopFileLinesReader` to add `close()` methods and modifies all six implementations of `FileFormat.buildReader()` to register TaskContext task completion callbacks to guarantee that cleanup is eventually performed.

Tested manually for now.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#15245 from JoshRosen/SPARK-17666-close-recordreader.
@JoshRosen JoshRosen deleted the SPARK-17666-close-recordreader branch September 28, 2016 01:36
@JoshRosen
Copy link
Contributor Author

I've opened #15271 to backport.

asfgit pushed a commit that referenced this pull request Sep 28, 2016
…e scans (backport)

This is a branch-2.0 backport of #15245.

## What changes were proposed in this pull request?

This patch addresses a potential cause of resource leaks in data source file scans. As reported in [SPARK-17666](https://issues.apache.org/jira/browse/SPARK-17666), tasks which do not fully-consume their input may cause file handles / network connections (e.g. S3 connections) to be leaked. Spark's `NewHadoopRDD` uses a TaskContext callback to [close its record readers](https://github.com/apache/spark/blame/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L208), but the new data source file scans will only close record readers once their iterators are fully-consumed.

This patch modifies `RecordReaderIterator` and `HadoopFileLinesReader` to add `close()` methods and modifies all six implementations of `FileFormat.buildReader()` to register TaskContext task completion callbacks to guarantee that cleanup is eventually performed.

## How was this patch tested?

Tested manually for now.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #15271 from JoshRosen/SPARK-17666-backport.
ghost pushed a commit to dbtsai/spark that referenced this pull request Oct 1, 2016
… streams are closed

## What changes were proposed in this pull request?

As a followup to SPARK-17666, ensure filesystem connections are not leaked at least in unit tests. This is done here by intercepting filesystem calls as suggested by JoshRosen . At the end of each test, we assert no filesystem streams are left open.

This applies to all tests using SharedSQLContext or SharedSparkContext.

## How was this patch tested?

I verified that tests in sql and core are indeed using the filesystem backend, and fixed the detected leaks. I also checked that reverting apache#15245 causes many actual test failures due to connection leaks.

Author: Eric Liang <ekl@databricks.com>
Author: Eric Liang <ekhliang@gmail.com>

Closes apache#15306 from ericl/sc-4672.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants