Skip to content

[SPARK-14914] Fix Resource not closed after using, mostly for unit tests #12693

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 16 commits into from

Conversation

taoli91
Copy link
Contributor

@taoli91 taoli91 commented Apr 26, 2016

What changes were proposed in this pull request?

Close FileStreams, ZipFiles etc to release the resources after using. Not closing the resources will cause IO Exception to be raised while deleting temp files.

How was this patch tested?

Run unit test on both windows and Linux. Note that this fix can't resolve all the problem of Unit tests on Windows.

@@ -243,7 +243,16 @@ private[spark] object ReliableCheckpointRDD extends Logging {
if (fs.exists(partitionerFilePath)) {
val fileInputStream = fs.open(partitionerFilePath, bufferSize)
val serializer = SparkEnv.get.serializer.newInstance()
val deserializeStream = serializer.deserializeStream(fileInputStream)
// make sure that the file is closed if error occurrs during deserialization
val deserializeStream =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use finally in contexts like this.

Copy link
Contributor Author

@taoli91 taoli91 Apr 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen The deserializeStream will be closed. We only need this when the deserializeStream method throws exception and we have no chance to go to the line 258, in which the deserializeStream will be closed, with the inner fileInputStream.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I mean put this in the block below that closes the deserialization stream, at best.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I mean put this in the block below that closes the deserialization stream, at best.

@srowen It seems we need extra code to accommodate the scope problem if putting the close in other clauses. Probably it's cleaner to stick with this solution.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean -- it's less code. Just open the deserializeStream inside the block and close it in the finally block that follows. Actually, closing deserializeStream will already (should already) close the underlying stream anyway. It doesn't handle errors while making the stream from the original stream, but, constructors of the deserializer streams aren't reading the stream anyway. I suspect it's fine as-is, but it would be extra-defensive to also close the fileInputStream, yes.

Copy link
Contributor Author

@taoli91 taoli91 May 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen Probably a code snippet will explain things better. Do you mean this:

        var deserializeStream : DeserializationStream = null

        val partitioner = Utils.tryWithSafeFinally[Partitioner] {
          deserializeStream = serializer.deserializeStream(fileInputStream)
          deserializeStream.readObject[Partitioner]
        } {
          deserializeStream.close()
        }

There are a few things I am concerning

  1. The semantic has slightly changed (deserializeStream is now var instead of val)
  2. I'm not sure is it always safe to close an partially initialized deserializeStream, in the case of the deserialization throwing exception. I'm pretty sure that if we close the fileInputStream first may cause the closing deserializeSteam throwing exception complaining that the input has been closed already.

FYI, I found this problem by while running the test case "checkpointing partitioners" in which the corruptPartitionerFile flag is turned on in the CheckpointSuite.

Copy link
Member

@srowen srowen May 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's what I mean. Well, if I were right that serializer.deserializeStream can't fail, then the existing code would already be fine, so this suggestion doesn't help unless fileInputStream gets a similar treatment, and that's too complex. I'm curious how that fails given the current implementation, but, even if it couldn't now, it could in the future.

You might normally resolve this with nested try blocks; I think streams aren't supposed to fail if closed twice, so, safe to close the underlying stream for good measure. Still at that point it's no less complex, so I can see that this is as clear as anything.

@srowen
Copy link
Member

srowen commented Apr 26, 2016

This cleanup looks generally good

@@ -39,18 +39,15 @@ class MapWithStateSuite extends SparkFunSuite

before {
StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noticed. I think

foreach(_.stop(stopSparkContext = false))

might be better although this is not the part of this PR.

eventSet.remove(event)
try {
val logStart = SparkListenerLogStart(SPARK_VERSION)
val lines = readLines(logData)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I think readLines is the only thing that reads logData? after it's done you could close the stream. Maybe that's tidier than wrapping so much in the try-finally block.

Copy link
Contributor Author

@taoli91 taoli91 May 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen I tried:

    val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem)
    var lines : Seq[String] = null
    try {
      lines = readLines(logData)
    } finally {
      logData.close()
    }

It thrown IOException complaining that Stream closed.

I think the readLines is sort of lazy such that it won't read the file until we actually move the iterator.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK sounds fine. How about pulling out eventSet at least?

@andrewor14
Copy link
Contributor

add to whitelist

@SparkQA
Copy link

SparkQA commented May 9, 2016

Test build #58158 has finished for PR 12693 at commit 69dbceb.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@taoli91
Copy link
Contributor Author

taoli91 commented May 10, 2016

@andrewor14 I've fixed the failure. Could you please try it again? Thanks

@SparkQA
Copy link

SparkQA commented May 10, 2016

Test build #58206 has finished for PR 12693 at commit 3ae2396.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@taoli91
Copy link
Contributor Author

taoli91 commented May 10, 2016

Well it's wired, I can't repro the test failure locally.

@taoli91
Copy link
Contributor Author

taoli91 commented May 12, 2016

I can't repro the failure on my environment. It seems my changes on this commit shouldn't change the logic of the Launcher part. Is it possible that it's a random failure?

@taoli91
Copy link
Contributor Author

taoli91 commented May 16, 2016

@andrewor14 Could you please schedule another test for me?

@andrewor14
Copy link
Contributor

retest this please

@andrewor14
Copy link
Contributor

you should be able to trigger it too

@SparkQA
Copy link

SparkQA commented May 16, 2016

Test build #58652 has finished for PR 12693 at commit 3ae2396.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Sep 21, 2016

@taoli91 Hi. Maybe we could close this and #12696 and then start to resolve https://issues.apache.org/jira/browse/SPARK-17591 first step by step? If we could verify the tests, I guess it'd be faster to merge each PR.

@srowen
Copy link
Member

srowen commented Sep 21, 2016

@taoli91 if you want to rebase this I'd be happy to finish the review too. It looked like it was almost there or already done.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Oct 5, 2016

Oh, sorry, this was not Windows specific issue. Sorry for the comment above.

@tdas
Copy link
Contributor

tdas commented Oct 25, 2016

@HyukjinKwon could you rebase this PR. Thanks for the detailed work to find these issues. This is a very useful PR and would be good to merge if you rebase the PR.

@HyukjinKwon
Copy link
Member

Sure, I definitely will.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants