File tree Expand file tree Collapse file tree 2 files changed +4
-0
lines changed
main/scala/org/apache/spark/scheduler
test/scala/org/apache/spark/scheduler Expand file tree Collapse file tree 2 files changed +4
-0
lines changed Original file line number Diff line number Diff line change @@ -1046,6 +1046,7 @@ class DAGScheduler(
1046
1046
1047
1047
if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) {
1048
1048
markStageAsFinished(shuffleStage)
1049
+ println(s " marking $shuffleStage as finished " )
1049
1050
logInfo(" looking for newly runnable stages" )
1050
1051
logInfo(" running: " + runningStages)
1051
1052
logInfo(" waiting: " + waitingStages)
@@ -1072,6 +1073,7 @@ class DAGScheduler(
1072
1073
.map(_._2).mkString(" , " ))
1073
1074
submitStage(shuffleStage)
1074
1075
} else {
1076
+ println(s " looking for newly runnable stage " )
1075
1077
val newlyRunnable = new ArrayBuffer [Stage ]
1076
1078
for (shuffleStage <- waitingStages) {
1077
1079
logInfo(" Missing parents for " + shuffleStage + " : " +
@@ -1081,6 +1083,7 @@ class DAGScheduler(
1081
1083
{
1082
1084
newlyRunnable += shuffleStage
1083
1085
}
1086
+ println(s " newly runnable stages = $newlyRunnable" )
1084
1087
waitingStages --= newlyRunnable
1085
1088
runningStages ++= newlyRunnable
1086
1089
for {
Original file line number Diff line number Diff line change @@ -88,6 +88,7 @@ class DAGSchedulerSuite
88
88
// normally done by TaskSetManager
89
89
taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
90
90
taskSets += taskSet
91
+ println(s " submitting taskSet $taskSet. taskSets = $taskSets" )
91
92
}
92
93
override def cancelTasks (stageId : Int , interruptThread : Boolean ) {
93
94
cancelledStages += stageId
You can’t perform that action at this time.
0 commit comments