Skip to content

Commit cb245da

Browse files
committed
finally found the issue ... clean up debug stuff
1 parent 8c29707 commit cb245da

File tree

2 files changed

+7
-35
lines changed

2 files changed

+7
-35
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -851,7 +851,6 @@ class DAGScheduler(
851851
stage.pendingTasks.clear()
852852

853853
// First figure out the indexes of partition ids to compute.
854-
println(s"finding partitions to compute for $stage")
855854
val partitionsToCompute: Seq[Int] = {
856855
stage match {
857856
case stage: ShuffleMapStage =>
@@ -945,7 +944,6 @@ class DAGScheduler(
945944
s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
946945
}
947946
logDebug(debugString)
948-
println(debugString)
949947
}
950948
}
951949

@@ -1064,7 +1062,6 @@ class DAGScheduler(
10641062

10651063
if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) {
10661064
markStageAsFinished(shuffleStage)
1067-
println(s"marking $shuffleStage as finished")
10681065
logInfo("looking for newly runnable stages")
10691066
logInfo("running: " + runningStages)
10701067
logInfo("waiting: " + waitingStages)
@@ -1091,7 +1088,6 @@ class DAGScheduler(
10911088
.map(_._2).mkString(", "))
10921089
submitStage(shuffleStage)
10931090
} else {
1094-
println(s"looking for newly runnable stage")
10951091
val newlyRunnable = new ArrayBuffer[Stage]
10961092
for (shuffleStage <- waitingStages) {
10971093
logInfo("Missing parents for " + shuffleStage + ": " +
@@ -1102,7 +1098,6 @@ class DAGScheduler(
11021098
newlyRunnable += shuffleStage
11031099
}
11041100
val newlyRunnableWithJob = newlyRunnable.map{x => x -> activeJobForStage(x)}
1105-
println(s"newly runnable stages = $newlyRunnableWithJob")
11061101
waitingStages --= newlyRunnable
11071102
runningStages ++= newlyRunnable
11081103
for {

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 7 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ class DAGSchedulerSuite
8888
// normally done by TaskSetManager
8989
taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
9090
taskSets += taskSet
91-
println(s"submitting taskSet $taskSet. taskSets = $taskSets")
9291
}
9392
override def cancelTasks(stageId: Int, interruptThread: Boolean) {
9493
cancelledStages += stageId
@@ -558,18 +557,11 @@ class DAGSchedulerSuite
558557
/** This tests the case where another FetchFailed comes in while the map stage is getting
559558
* re-run. */
560559
test("late fetch failures don't cause multiple concurrent attempts for the same map stage") {
561-
println("begin late fetch failure")
562560
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
563561
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
564562
val shuffleId = shuffleDep.shuffleId
565563
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
566-
val jobId = submit(reduceRdd, Array(0, 1))
567-
println(s"late fetch failure: jobId = $jobId")
568-
println(s"late fetch failure: jobToStages = ${scheduler.jobIdToStageIds}")
569-
println(s"late fetch failure: jobToActiveJob = ${scheduler.jobIdToActiveJob}")
570-
println(s"late fetch failure: waitingStages = ${scheduler.waitingStages}")
571-
println(s"late fetch failure: runningStages = ${scheduler.runningStages}")
572-
println(s"late fetch failure: failedStages = ${scheduler.failedStages}")
564+
submit(reduceRdd, Array(0, 1))
573565

574566
val mapStageId = 0
575567
def countSubmittedMapStageAttempts(): Int = {
@@ -579,26 +571,14 @@ class DAGSchedulerSuite
579571
// The map stage should have been submitted.
580572
assert(countSubmittedMapStageAttempts() === 1)
581573

582-
println("late fetch failure: taskSets = " + taskSets)
583-
println(s"late fetch failure: jobToStages = ${scheduler.jobIdToStageIds}")
584-
println(s"late fetch failure: jobToActiveJob = ${scheduler.jobIdToActiveJob}")
585-
println(s"late fetch failure: waitingStages = ${scheduler.waitingStages}")
586-
println(s"late fetch failure: runningStages = ${scheduler.runningStages}")
587-
println(s"late fetch failure: failedStages = ${scheduler.failedStages}")
588574
complete(taskSets(0), Seq(
589-
(Success, makeMapStatus("hostA", 1)),
590-
(Success, makeMapStatus("hostB", 1))))
575+
(Success, makeMapStatus("hostA", 2)),
576+
(Success, makeMapStatus("hostB", 2))))
591577
// The MapOutputTracker should know about both map output locations.
592578
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) ===
593579
Array("hostA", "hostB"))
594-
595-
println("late fetch failure: taskSets = " + taskSets)
596-
println("late fetch failure: submittedStages = " + sparkListener.submittedStageInfos)
597-
println(s"late fetch failure: jobToStages = ${scheduler.jobIdToStageIds}")
598-
println(s"late fetch failure: jobToActiveJob = ${scheduler.jobIdToActiveJob}")
599-
println(s"late fetch failure: waitingStages = ${scheduler.waitingStages}")
600-
println(s"late fetch failure: runningStages = ${scheduler.runningStages}")
601-
println(s"late fetch failure: failedStages = ${scheduler.failedStages}")
580+
assert(mapOutputTracker.getServerStatuses(shuffleId, 1).map(_._1.host) ===
581+
Array("hostA", "hostB"))
602582

603583
// The first result task fails, with a fetch failure for the output from the first mapper.
604584
runEvent(CompletionEvent(
@@ -643,7 +623,6 @@ class DAGSchedulerSuite
643623
*/
644624
test("extremely late fetch failures don't cause multiple concurrent attempts for " +
645625
"the same stage") {
646-
println("begin extremely late fetch failure")
647626
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
648627
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
649628
val shuffleId = shuffleDep.shuffleId
@@ -661,17 +640,15 @@ class DAGSchedulerSuite
661640
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
662641
assert(countSubmittedMapStageAttempts() === 1)
663642

664-
println("extremely late fetch failure: taskSets = " + taskSets)
665643
// Complete the map stage.
666644
complete(taskSets(0), Seq(
667-
(Success, makeMapStatus("hostA", 1)),
668-
(Success, makeMapStatus("hostB", 1))))
645+
(Success, makeMapStatus("hostA", 2)),
646+
(Success, makeMapStatus("hostB", 2))))
669647

670648
// The reduce stage should have been submitted.
671649
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
672650
assert(countSubmittedReduceStageAttempts() === 1)
673651

674-
println("extremely late fetch failure: taskSets = " + taskSets)
675652
// The first result task fails, with a fetch failure for the output from the first mapper.
676653
runEvent(CompletionEvent(
677654
taskSets(1).tasks(0),

0 commit comments

Comments
 (0)