From 061545b2bd632586d8d8367ff6d90dea3949e52f Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Tue, 4 Apr 2023 17:06:57 +0800 Subject: [PATCH] [KYUUBI #4664] Fix empty relation when kill executors ### _Why are the changes needed?_ This pr fixes a corner case when repartition on a local relation. e.g., ``` Repartition | LocalRelation ``` it would throw exception since there is no a actually shuffle happen ``` java.util.NoSuchElementException: key not found: 3 at scala.collection.MapLike.default(MapLike.scala:235) at scala.collection.MapLike.default$(MapLike.scala:234) at scala.collection.AbstractMap.default(Map.scala:63) at scala.collection.MapLike.apply(MapLike.scala:144) at scala.collection.MapLike.apply$(MapLike.scala:143) at scala.collection.AbstractMap.apply(Map.scala:63) at org.apache.spark.sql.FinalStageResourceManager.findExecutorToKill(FinalStageResourceManager.scala:122) at org.apache.spark.sql.FinalStageResourceManager.killExecutors(FinalStageResourceManager.scala:175) ``` ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4664 from ulysses-you/kill-executors-followup. Closes #4664 3811eaee9 [ulysses-you] Fix empty relation Authored-by: ulysses-you Signed-off-by: ulyssesyou --- .../apache/spark/sql/FinalStageResourceManager.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala index 2bf7ae6b75e..ca3f762e169 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala @@ -69,6 +69,7 @@ case class FinalStageResourceManager(session: SparkSession) return plan } + // TODO: move this to query stage optimizer when updating Spark to 3.5.x // Since we are in `prepareQueryStage`, the AQE shuffle read has not been applied. // So we need to apply it by self. val shuffleRead = queryStageOptimizerRules.foldLeft(stageOpt.get.asInstanceOf[SparkPlan]) { @@ -119,7 +120,11 @@ case class FinalStageResourceManager(session: SparkSession) shuffleId: Int, numReduce: Int): Seq[String] = { val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] - val shuffleStatus = tracker.shuffleStatuses(shuffleId) + val shuffleStatusOpt = tracker.shuffleStatuses.get(shuffleId) + if (shuffleStatusOpt.isEmpty) { + return Seq.empty + } + val shuffleStatus = shuffleStatusOpt.get val executorToBlockSize = new mutable.HashMap[String, Long] shuffleStatus.withMapStatuses { mapStatus => mapStatus.foreach { status => @@ -175,6 +180,9 @@ case class FinalStageResourceManager(session: SparkSession) val executorsToKill = findExecutorToKill(sc, targetExecutors, shuffleId, numReduce) logInfo(s"Request to kill executors, total count ${executorsToKill.size}, " + s"[${executorsToKill.mkString(", ")}].") + if (executorsToKill.isEmpty) { + return + } // Note, `SparkContext#killExecutors` does not allow with DRA enabled, // see `https://github.com/apache/spark/pull/20604`. @@ -201,7 +209,7 @@ trait FinalRebalanceStageHelper { case f: FilterExec => findFinalRebalanceStage(f.child) case s: SortExec if !s.global => findFinalRebalanceStage(s.child) case stage: ShuffleQueryStageExec - if stage.isMaterialized && + if stage.isMaterialized && stage.mapStats.isDefined && stage.plan.isInstanceOf[ShuffleExchangeExec] && stage.plan.asInstanceOf[ShuffleExchangeExec].shuffleOrigin != ENSURE_REQUIREMENTS => Some(stage)