@@ -697,13 +697,7 @@ private[spark] class TaskSetManager(
697
697
val index = info.index
698
698
info.markFinished(TaskState .FINISHED )
699
699
removeRunningTask(tid)
700
- // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
701
- // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
702
- // "deserialize" the value when holding a lock to avoid blocking other threads. So we call
703
- // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
704
- // Note: "result.value()" only deserializes the value when it's called at the first time, so
705
- // here "result.value()" just returns the value and won't block other threads.
706
- sched.dagScheduler.taskEnded(tasks(index), Success , result.value(), result.accumUpdates, info)
700
+
707
701
// Kill any other attempts for the same task (since those are unnecessary now that one
708
702
// attempt completed successfully).
709
703
for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {
@@ -726,6 +720,13 @@ private[spark] class TaskSetManager(
726
720
logInfo(" Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
727
721
" because task " + index + " has already completed successfully" )
728
722
}
723
+ // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
724
+ // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
725
+ // "deserialize" the value when holding a lock to avoid blocking other threads. So we call
726
+ // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
727
+ // Note: "result.value()" only deserializes the value when it's called at the first time, so
728
+ // here "result.value()" just returns the value and won't block other threads.
729
+ sched.dagScheduler.taskEnded(tasks(index), Success , result.value(), result.accumUpdates, info)
729
730
maybeFinishTaskSet()
730
731
}
731
732
0 commit comments