-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-14914] Fix Resource not closed after using, mostly for unit tests #12693
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Helping script for windows to download dependency and start zinc to support incremental building on windows.
@@ -243,7 +243,16 @@ private[spark] object ReliableCheckpointRDD extends Logging { | |||
if (fs.exists(partitionerFilePath)) { | |||
val fileInputStream = fs.open(partitionerFilePath, bufferSize) | |||
val serializer = SparkEnv.get.serializer.newInstance() | |||
val deserializeStream = serializer.deserializeStream(fileInputStream) | |||
// make sure that the file is closed if error occurrs during deserialization | |||
val deserializeStream = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should use finally
in contexts like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen The deserializeStream
will be closed. We only need this when the deserializeStream
method throws exception and we have no chance to go to the line 258, in which the deserializeStream
will be closed, with the inner fileInputStream
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I mean put this in the block below that closes the deserialization stream, at best.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I mean put this in the block below that closes the deserialization stream, at best.
@srowen It seems we need extra code to accommodate the scope problem if putting the close in other clauses. Probably it's cleaner to stick with this solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what you mean -- it's less code. Just open the deserializeStream inside the block and close it in the finally block that follows. Actually, closing deserializeStream will already (should already) close the underlying stream anyway. It doesn't handle errors while making the stream from the original stream, but, constructors of the deserializer streams aren't reading the stream anyway. I suspect it's fine as-is, but it would be extra-defensive to also close the fileInputStream, yes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen Probably a code snippet will explain things better. Do you mean this:
var deserializeStream : DeserializationStream = null
val partitioner = Utils.tryWithSafeFinally[Partitioner] {
deserializeStream = serializer.deserializeStream(fileInputStream)
deserializeStream.readObject[Partitioner]
} {
deserializeStream.close()
}
There are a few things I am concerning
- The semantic has slightly changed (
deserializeStream
is nowvar
instead ofval
) - I'm not sure is it always safe to close an partially initialized
deserializeStream
, in the case of the deserialization throwing exception. I'm pretty sure that if we close thefileInputStream
first may cause the closingdeserializeSteam
throwing exception complaining that the input has been closed already.
FYI, I found this problem by while running the test case "checkpointing partitioners" in which the corruptPartitionerFile
flag is turned on in the CheckpointSuite
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's what I mean. Well, if I were right that serializer.deserializeStream
can't fail, then the existing code would already be fine, so this suggestion doesn't help unless fileInputStream
gets a similar treatment, and that's too complex. I'm curious how that fails given the current implementation, but, even if it couldn't now, it could in the future.
You might normally resolve this with nested try blocks; I think streams aren't supposed to fail if closed twice, so, safe to close the underlying stream for good measure. Still at that point it's no less complex, so I can see that this is as clear as anything.
This cleanup looks generally good |
@@ -39,18 +39,15 @@ class MapWithStateSuite extends SparkFunSuite | |||
|
|||
before { | |||
StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just noticed. I think
foreach(_.stop(stopSparkContext = false))
might be better although this is not the part of this PR.
eventSet.remove(event) | ||
try { | ||
val logStart = SparkListenerLogStart(SPARK_VERSION) | ||
val lines = readLines(logData) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, I think readLines is the only thing that reads logData? after it's done you could close the stream. Maybe that's tidier than wrapping so much in the try-finally block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen I tried:
val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem)
var lines : Seq[String] = null
try {
lines = readLines(logData)
} finally {
logData.close()
}
It thrown IOException complaining that Stream closed.
I think the readLines is sort of lazy such that it won't read the file until we actually move the iterator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK sounds fine. How about pulling out eventSet
at least?
add to whitelist |
Test build #58158 has finished for PR 12693 at commit
|
@andrewor14 I've fixed the failure. Could you please try it again? Thanks |
Test build #58206 has finished for PR 12693 at commit
|
Well it's wired, I can't repro the test failure locally. |
I can't repro the failure on my environment. It seems my changes on this commit shouldn't change the logic of the Launcher part. Is it possible that it's a random failure? |
@andrewor14 Could you please schedule another test for me? |
retest this please |
you should be able to trigger it too |
Test build #58652 has finished for PR 12693 at commit
|
|
@taoli91 if you want to rebase this I'd be happy to finish the review too. It looked like it was almost there or already done. |
Oh, sorry, this was not Windows specific issue. Sorry for the comment above. |
@HyukjinKwon could you rebase this PR. Thanks for the detailed work to find these issues. This is a very useful PR and would be good to merge if you rebase the PR. |
Sure, I definitely will. |
Closes apache#11610 Closes apache#15411 Closes apache#15501 Closes apache#12613 Closes apache#12518 Closes apache#12026 Closes apache#15524 Closes apache#12693 Closes apache#12358 Closes apache#15588 Closes apache#15635 Closes apache#15678 Closes apache#14699 Closes apache#9008
What changes were proposed in this pull request?
Close
FileStream
s,ZipFile
s etc to release the resources after using. Not closing the resources will cause IO Exception to be raised while deleting temp files.How was this patch tested?
Run unit test on both windows and Linux. Note that this fix can't resolve all the problem of Unit tests on Windows.