-
Notifications
You must be signed in to change notification settings - Fork 28.6k
SPARK-4454 Fix race condition in DAGScheduler #3345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Can one of the admins verify this patch? |
} | ||
|
||
private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = { | ||
cacheLocs.getOrElseUpdate(rdd.id,getLocs(rdd)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, is this actually thread-safe either? It's not a concurrent Map
. Even if it is, I'm not clear that it stops many values for being cached and computed for an ID, but if that's cheap and they're immutable, that could be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if this was a concurrent map, I'm not sure that getOrElseUpdate
would be thread-safe (surprisingly!):
Where is the race condition? |
ok to test |
Test build #23584 has started for PR 3345 at commit
|
Test build #23584 has finished for PR 3345 at commit
|
Test PASSed. |
@markhamstra I noticed that there's a mailing list post (which never made it to the list, I think, because the author had not completed the mailing list registration and posted through Nabble): http://apache-spark-user-list.1001560.n3.nabble.com/Confirming-race-condition-in-DagScheduler-NoSuchElementException-td20691.html According to that post:
I can confirm that this is the case. It looks like /**
* Synchronized method that might be called from other threads.
* @param rdd whose partitions are to be looked at
* @param partition to lookup locality information for
* @return list of machines that are preferred by the partition
*/
private[spark]
def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = synchronized {
getPreferredLocsInternal(rdd, partition, new HashSet)
} However, it's not obvious why this is synchronized. There are no other synchronized blocks or methods in So, I think that there could be a legitimate synchronization issue here, but I don't think that this PR is the right fix. I think that a clean fix would be to replace this |
Ah yes, I see now. Thanks for coming back to this one, Josh.
Something that also concerns me in looking at the usages of |
Let's close this issue, some people were confused why we didn't merge it... but it was not correct. |
I've opened #4660 as an alternative fix for this issue; please take a look. |
This patch addresses a race condition in DAGScheduler by properly synchronizing accesses to its `cacheLocs` map. This map is accessed by the `getCacheLocs` and `clearCacheLocs()` methods, which can be called by separate threads, since DAGScheduler's `getPreferredLocs()` method is called by SparkContext and indirectly calls `getCacheLocs()`. If this map is cleared by the DAGScheduler event processing thread while a user thread is submitting a job and computing preferred locations, then this can cause the user thread to throw "NoSuchElementException: key not found" errors. Most accesses to DAGScheduler's internal state do not need synchronization because that state is only accessed from the event processing loop's thread. An alternative approach to fixing this bug would be to refactor this code so that SparkContext sends the DAGScheduler a message in order to get the list of preferred locations. However, this would involve more extensive changes to this code and would be significantly harder to backport to maintenance branches since some of the related code has undergone significant refactoring (e.g. the introduction of EventLoop). Since `cacheLocs` is the only state that's accessed in this way, adding simple synchronization seems like a better short-term fix. See #3345 for additional context. Author: Josh Rosen <joshrosen@databricks.com> Closes #4660 from JoshRosen/SPARK-4454 and squashes the following commits: 12d64ba [Josh Rosen] Properly synchronize accesses to DAGScheduler cacheLocs map. (cherry picked from commit d46d624) Signed-off-by: Patrick Wendell <patrick@databricks.com>
This patch addresses a race condition in DAGScheduler by properly synchronizing accesses to its `cacheLocs` map. This map is accessed by the `getCacheLocs` and `clearCacheLocs()` methods, which can be called by separate threads, since DAGScheduler's `getPreferredLocs()` method is called by SparkContext and indirectly calls `getCacheLocs()`. If this map is cleared by the DAGScheduler event processing thread while a user thread is submitting a job and computing preferred locations, then this can cause the user thread to throw "NoSuchElementException: key not found" errors. Most accesses to DAGScheduler's internal state do not need synchronization because that state is only accessed from the event processing loop's thread. An alternative approach to fixing this bug would be to refactor this code so that SparkContext sends the DAGScheduler a message in order to get the list of preferred locations. However, this would involve more extensive changes to this code and would be significantly harder to backport to maintenance branches since some of the related code has undergone significant refactoring (e.g. the introduction of EventLoop). Since `cacheLocs` is the only state that's accessed in this way, adding simple synchronization seems like a better short-term fix. See #3345 for additional context. Author: Josh Rosen <joshrosen@databricks.com> Closes #4660 from JoshRosen/SPARK-4454 and squashes the following commits: 12d64ba [Josh Rosen] Properly synchronize accesses to DAGScheduler cacheLocs map.
FYI the related post (in nabble) exists here (in developer list) with subsequent comments. (i'm the OP) #4660 is similar to the fix attempt i did to patch spark, but in some way it slows down our concurrent processing greatly. Is there a benchmark to make sure the fix would have no severe performance penalty? Personally i don't believe so but will try #4660 for sure to make sure. |
Simple fix to race condition