Skip to content

Commit

Permalink
[KYUUBI #4664] Fix empty relation when kill executors
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_

This pr fixes a corner case when repartition on a local relation. e.g.,
```
Repartition
      |
LocalRelation
```

it would throw exception since there is no a actually shuffle happen
```
java.util.NoSuchElementException: key not found: 3
	at scala.collection.MapLike.default(MapLike.scala:235)
	at scala.collection.MapLike.default$(MapLike.scala:234)
	at scala.collection.AbstractMap.default(Map.scala:63)
	at scala.collection.MapLike.apply(MapLike.scala:144)
	at scala.collection.MapLike.apply$(MapLike.scala:143)
	at scala.collection.AbstractMap.apply(Map.scala:63)
	at org.apache.spark.sql.FinalStageResourceManager.findExecutorToKill(FinalStageResourceManager.scala:122)
	at org.apache.spark.sql.FinalStageResourceManager.killExecutors(FinalStageResourceManager.scala:175)
```

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4664 from ulysses-you/kill-executors-followup.

Closes #4664

3811eae [ulysses-you] Fix empty relation

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: ulyssesyou <ulyssesyou@apache.org>
  • Loading branch information
ulysses-you committed Apr 4, 2023
1 parent 82c5392 commit 061545b
Showing 1 changed file with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ case class FinalStageResourceManager(session: SparkSession)
return plan
}

// TODO: move this to query stage optimizer when updating Spark to 3.5.x
// Since we are in `prepareQueryStage`, the AQE shuffle read has not been applied.
// So we need to apply it by self.
val shuffleRead = queryStageOptimizerRules.foldLeft(stageOpt.get.asInstanceOf[SparkPlan]) {
Expand Down Expand Up @@ -119,7 +120,11 @@ case class FinalStageResourceManager(session: SparkSession)
shuffleId: Int,
numReduce: Int): Seq[String] = {
val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
val shuffleStatus = tracker.shuffleStatuses(shuffleId)
val shuffleStatusOpt = tracker.shuffleStatuses.get(shuffleId)
if (shuffleStatusOpt.isEmpty) {
return Seq.empty
}
val shuffleStatus = shuffleStatusOpt.get
val executorToBlockSize = new mutable.HashMap[String, Long]
shuffleStatus.withMapStatuses { mapStatus =>
mapStatus.foreach { status =>
Expand Down Expand Up @@ -175,6 +180,9 @@ case class FinalStageResourceManager(session: SparkSession)
val executorsToKill = findExecutorToKill(sc, targetExecutors, shuffleId, numReduce)
logInfo(s"Request to kill executors, total count ${executorsToKill.size}, " +
s"[${executorsToKill.mkString(", ")}].")
if (executorsToKill.isEmpty) {
return
}

// Note, `SparkContext#killExecutors` does not allow with DRA enabled,
// see `https://github.com/apache/spark/pull/20604`.
Expand All @@ -201,7 +209,7 @@ trait FinalRebalanceStageHelper {
case f: FilterExec => findFinalRebalanceStage(f.child)
case s: SortExec if !s.global => findFinalRebalanceStage(s.child)
case stage: ShuffleQueryStageExec
if stage.isMaterialized &&
if stage.isMaterialized && stage.mapStats.isDefined &&
stage.plan.isInstanceOf[ShuffleExchangeExec] &&
stage.plan.asInstanceOf[ShuffleExchangeExec].shuffleOrigin != ENSURE_REQUIREMENTS =>
Some(stage)
Expand Down

0 comments on commit 061545b

Please sign in to comment.