Skip to content

[SPARK-14930][SPARK-13693] Fix race condition in CheckpointWriter.stop() #12712

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

JoshRosen
Copy link
Contributor

CheckpointWriter.stop() is prone to a race condition: if one thread calls stop() right as a checkpoint write task begins to execute, that write task may become blocked when trying to access fs, the shared Hadoop FileSystem, since both the fs getter and stop method synchronize on the same lock. Here's a thread-dump excerpt which illustrates the problem:

"pool-31-thread-1" #156 prio=5 os_prio=31 tid=0x00007fea02cd2000 nid=0x5c0b waiting for monitor entry [0x000000013bc4c000]
   java.lang.Thread.State: BLOCKED (on object monitor)
    at org.apache.spark.streaming.CheckpointWriter.org$apache$spark$streaming$CheckpointWriter$$fs(Checkpoint.scala:302)
    - waiting to lock <0x00000007bf53ee78> (a org.apache.spark.streaming.CheckpointWriter)
    at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:224)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

"pool-1-thread-1-ScalaTest-running-MapWithStateSuite" #11 prio=5 os_prio=31 tid=0x00007fe9ff879800 nid=0x5703 waiting on condition [0x000000012e54c000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007bf564568> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
    at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1465)
    at org.apache.spark.streaming.CheckpointWriter.stop(Checkpoint.scala:291)
    - locked <0x00000007bf53ee78> (a org.apache.spark.streaming.CheckpointWriter)
    at org.apache.spark.streaming.scheduler.JobGenerator.stop(JobGenerator.scala:159)
    - locked <0x00000007bf53ea90> (a org.apache.spark.streaming.scheduler.JobGenerator)
    at org.apache.spark.streaming.scheduler.JobScheduler.stop(JobScheduler.scala:115)
    - locked <0x00000007bf53d3f0> (a org.apache.spark.streaming.scheduler.JobScheduler)
    at org.apache.spark.streaming.StreamingContext$$anonfun$stop$1.apply$mcV$sp(StreamingContext.scala:680)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1219)
    at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:679)
    - locked <0x00000007bf516a70> (a org.apache.spark.streaming.StreamingContext)
    at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:644)
    - locked <0x00000007bf516a70> (a org.apache.spark.streaming.StreamingContext)
[...]

We can fix this problem by having stop and fs be synchronized on different locks: the synchronization on stop only needs to guard against multiple threads calling stop at the same time, whereas the synchronization on fs is only necessary for cross-thread visibility. There's only ever a single active checkpoint writer thread at a time, so we don't need to guard against concurrent access to fs. Thus, fs can simply become a @volatile var, similar to lastCheckpointTime.

This change should fix SPARK-13693, a flaky MapWithStateSuite test suite which has recently been failing several times per day. It also results in a huge test speedup: prior to this patch, MapWithStateSuite took about 80 seconds to run, whereas it now runs in less than 10 seconds. For the streaming project's tests as a whole, they now run in ~220 seconds vs. ~354 before.

/cc @zsxwing and @tdas for review.

@JoshRosen JoshRosen changed the title [SPARK-14930][SPARK-13693 Fix race condition in CheckpointWriter.stop() [SPARK-14930][SPARK-13693] Fix race condition in CheckpointWriter.stop() Apr 26, 2016
@SparkQA
Copy link

SparkQA commented Apr 26, 2016

Test build #57030 has finished for PR 12712 at commit 61a388a.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Apr 26, 2016

Test build #57040 has finished for PR 12712 at commit 61a388a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@marmbrus
Copy link
Contributor

LGTM, thanks for fixing this!

@@ -184,8 +184,7 @@ class CheckpointWriter(
val executor = Executors.newFixedThreadPool(1)
val compressionCodec = CompressionCodec.createCodec(conf)
private var stopped = false
private var _fs: FileSystem = _

@volatile private[this] var fs: FileSystem = null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this even need to be a member? it looks like it's used entirely in one method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually referenced by inner classes. I think the idea here is that we'll avoid calling .getFileSystem too much by caching the result, but will have the ability to clear the cache in case the cached FileSystem seems to be in a bad state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: fs and latestCheckpointTime don't need to be volatile as we use Executors.newFixedThreadPool(1) here. We should also give the thread a name for better debugging.

@tdas
Copy link
Contributor

tdas commented Apr 27, 2016

LGTM. Merging to master

@asfgit asfgit closed this in 450136e Apr 27, 2016
@JoshRosen JoshRosen deleted the fix-checkpoint-writer-race branch April 27, 2016 18:50
zzcclp added a commit to zzcclp/spark that referenced this pull request Apr 28, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants