-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-14930][SPARK-13693] Fix race condition in CheckpointWriter.stop() #12712
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #57030 has finished for PR 12712 at commit
|
Jenkins, retest this please. |
Test build #57040 has finished for PR 12712 at commit
|
LGTM, thanks for fixing this! |
@@ -184,8 +184,7 @@ class CheckpointWriter( | |||
val executor = Executors.newFixedThreadPool(1) | |||
val compressionCodec = CompressionCodec.createCodec(conf) | |||
private var stopped = false | |||
private var _fs: FileSystem = _ | |||
|
|||
@volatile private[this] var fs: FileSystem = null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this even need to be a member? it looks like it's used entirely in one method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's actually referenced by inner classes. I think the idea here is that we'll avoid calling .getFileSystem
too much by caching the result, but will have the ability to clear the cache in case the cached FileSystem seems to be in a bad state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: fs
and latestCheckpointTime
don't need to be volatile
as we use Executors.newFixedThreadPool(1)
here. We should also give the thread a name for better debugging.
LGTM. Merging to master |
CheckpointWriter.stop() is prone to a race condition: if one thread calls
stop()
right as a checkpoint write task begins to execute, that write task may become blocked when trying to accessfs
, the shared Hadoop FileSystem, since both thefs
getter andstop
method synchronize on the same lock. Here's a thread-dump excerpt which illustrates the problem:We can fix this problem by having
stop
andfs
be synchronized on different locks: the synchronization onstop
only needs to guard against multiple threads callingstop
at the same time, whereas the synchronization onfs
is only necessary for cross-thread visibility. There's only ever a single active checkpoint writer thread at a time, so we don't need to guard against concurrent access tofs
. Thus,fs
can simply become a@volatile
var, similar tolastCheckpointTime
.This change should fix SPARK-13693, a flaky
MapWithStateSuite
test suite which has recently been failing several times per day. It also results in a huge test speedup: prior to this patch,MapWithStateSuite
took about 80 seconds to run, whereas it now runs in less than 10 seconds. For thestreaming
project's tests as a whole, they now run in ~220 seconds vs. ~354 before./cc @zsxwing and @tdas for review.