Skip to content

SPARK-4454 Fix race condition in DAGScheduler #3345

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

mag-
Copy link

@mag- mag- commented Nov 18, 2014

Simple fix to race condition

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

}

private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = {
cacheLocs.getOrElseUpdate(rdd.id,getLocs(rdd))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, is this actually thread-safe either? It's not a concurrent Map. Even if it is, I'm not clear that it stops many values for being cached and computed for an ID, but if that's cheap and they're immutable, that could be fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if this was a concurrent map, I'm not sure that getOrElseUpdate would be thread-safe (surprisingly!):

https://issues.scala-lang.org/browse/SI-7943

@markhamstra
Copy link
Contributor

Where is the race condition? cacheLocs is state local to the DAGScheduler and should only be used within the actions of DAGSchedulerEventProcessActor, which by their nature are not concurrent.

@andrewor14
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Nov 19, 2014

Test build #23584 has started for PR 3345 at commit 9d5d758.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 19, 2014

Test build #23584 has finished for PR 3345 at commit 9d5d758.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23584/
Test PASSed.

@JoshRosen
Copy link
Contributor

@markhamstra I noticed that there's a mailing list post (which never made it to the list, I think, because the author had not completed the mailing list registration and posted through Nabble): http://apache-spark-user-list.1001560.n3.nabble.com/Confirming-race-condition-in-DagScheduler-NoSuchElementException-td20691.html

According to that post:

[...] looking at the DagScheduler code (even in master), it seems like the problem was due to getCacheLocs are called from two places: getPreferredLocation and getMissingParentStages. One of them is sync', while the other is not.

I can confirm that this is the case. It looks like getPreferredLocs was marked as synchronized because it's called from SparkContext:

  /**
   * Synchronized method that might be called from other threads.
   * @param rdd whose partitions are to be looked at
   * @param partition to lookup locality information for
   * @return list of machines that are preferred by the partition
   */
  private[spark]
  def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = synchronized {
    getPreferredLocsInternal(rdd, partition, new HashSet)
  }

However, it's not obvious why this is synchronized. There are no other synchronized blocks or methods in DAGScheduler, so I think this synchronization is only serving to prevent multiple threads from being in getPreferredLocs calls at the same time, not to guard access to DAGScheduler state.

So, I think that there could be a legitimate synchronization issue here, but I don't think that this PR is the right fix. I think that a clean fix would be to replace this synchronized with a message send / reply to the DAGScheduler actor, which ensures that all accesses of the DAGScheduler state are processed through the actor.

@markhamstra
Copy link
Contributor

Ah yes, I see now. Thanks for coming back to this one, Josh.

DAGScheduler#getPreferredLocs is definitely broken. You're correct that the access to and potential update of the cacheLocs needs to be routed through the actor. But because of the need to return the preferred locations, this will be a little different than the fire-and-forget messages that are currently sent to the eventProcessActor, and will need to be an ask pattern instead.

Something that also concerns me in looking at the usages of SparkContext#getPreferredLocs in CoalescedRDD and PartitionerAwareUnionRDD is that they both have a currPrefLocs method with a comment that this is supposed to "Get the current preferred locations from the DAGScheduler". I'm not sure just what the expectation or requirement there for "current" is -- "current" when the RDD is defined, when actions are run on it, something else? This feels like a potential race condition to me, and I am wondering whether it might make sense to make this getting of current preferred locations as lazy as possible and resolved during the execution of a job. That's just speculation as to the need for or desirability of that laziness, but I think it deserves a look.

@pwendell
Copy link
Contributor

Let's close this issue, some people were confused why we didn't merge it... but it was not correct.

@JoshRosen
Copy link
Contributor

I've opened #4660 as an alternative fix for this issue; please take a look.

asfgit pushed a commit that referenced this pull request Feb 18, 2015
This patch addresses a race condition in DAGScheduler by properly synchronizing accesses to its `cacheLocs` map.

This map is accessed by the `getCacheLocs` and `clearCacheLocs()` methods, which can be called by separate threads, since DAGScheduler's `getPreferredLocs()` method is called by SparkContext and indirectly calls `getCacheLocs()`.  If this map is cleared by the DAGScheduler event processing thread while a user thread is submitting a job and computing preferred locations, then this can cause the user thread to throw "NoSuchElementException: key not found" errors.

Most accesses to DAGScheduler's internal state do not need synchronization because that state is only accessed from the event processing loop's thread.  An alternative approach to fixing this bug would be to refactor this code so that SparkContext sends the DAGScheduler a message in order to get the list of preferred locations.  However, this would involve more extensive changes to this code and would be significantly harder to backport to maintenance branches since some of the related code has undergone significant refactoring (e.g. the introduction of EventLoop).  Since `cacheLocs` is the only state that's accessed in this way, adding simple synchronization seems like a better short-term fix.

See #3345 for additional context.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #4660 from JoshRosen/SPARK-4454 and squashes the following commits:

12d64ba [Josh Rosen] Properly synchronize accesses to DAGScheduler cacheLocs map.

(cherry picked from commit d46d624)
Signed-off-by: Patrick Wendell <patrick@databricks.com>
asfgit pushed a commit that referenced this pull request Feb 18, 2015
This patch addresses a race condition in DAGScheduler by properly synchronizing accesses to its `cacheLocs` map.

This map is accessed by the `getCacheLocs` and `clearCacheLocs()` methods, which can be called by separate threads, since DAGScheduler's `getPreferredLocs()` method is called by SparkContext and indirectly calls `getCacheLocs()`.  If this map is cleared by the DAGScheduler event processing thread while a user thread is submitting a job and computing preferred locations, then this can cause the user thread to throw "NoSuchElementException: key not found" errors.

Most accesses to DAGScheduler's internal state do not need synchronization because that state is only accessed from the event processing loop's thread.  An alternative approach to fixing this bug would be to refactor this code so that SparkContext sends the DAGScheduler a message in order to get the list of preferred locations.  However, this would involve more extensive changes to this code and would be significantly harder to backport to maintenance branches since some of the related code has undergone significant refactoring (e.g. the introduction of EventLoop).  Since `cacheLocs` is the only state that's accessed in this way, adding simple synchronization seems like a better short-term fix.

See #3345 for additional context.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #4660 from JoshRosen/SPARK-4454 and squashes the following commits:

12d64ba [Josh Rosen] Properly synchronize accesses to DAGScheduler cacheLocs map.
@tiongl
Copy link

tiongl commented Feb 19, 2015

FYI the related post (in nabble) exists here (in developer list) with subsequent comments. (i'm the OP)
http://apache-spark-developers-list.1001551.n3.nabble.com/Confirming-race-condition-in-DagScheduler-NoSuchElementException-td9798.html

#4660 is similar to the fix attempt i did to patch spark, but in some way it slows down our concurrent processing greatly. Is there a benchmark to make sure the fix would have no severe performance penalty? Personally i don't believe so but will try #4660 for sure to make sure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants