Skip to content

simplify QueryStage #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jan 22, 2019
Merged

simplify QueryStage #5

merged 7 commits into from
Jan 22, 2019

Conversation

cloud-fan
Copy link

No description provided.

@cloud-fan cloud-fan force-pushed the help2 branch 4 times, most recently from 3129cd7 to 62709d5 Compare January 17, 2019 07:25
@@ -37,44 +33,25 @@ import org.apache.spark.sql.types.StructType
case class PlanQueryStage(conf: SQLConf) extends Rule[SparkPlan] {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conf: SQLConf is no longer needed.

cachedRDD
}

def planToRun: SparkPlan = finalPlan

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the thinking behind planToRun, maybe directly use finalPlan is clean semantics.

override def outputPartitioning: Partitioning = child.outputPartitioning

override def outputOrdering: Seq[SortOrder] = child.outputOrdering
abstract class QueryStage extends LeafExecNode {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the reasons for changing QueryStage from UnaryExecNode to LeafExecNode are:

  • Getting rid of var in param list.
  • The original approach of UnaryExecNode didn't override child.

Is that right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because I removed QueryStageInput

protected def adaptivePreparations: Seq[Rule[SparkPlan]] = Seq(
PlanSubqueries(sparkSession),
EnsureRequirements(sparkSession.sessionState.conf),
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf),
// PlanQueryStage needs to be the last rule because it divides the plan into multiple sub-trees
// by inserting leaf node QueryStageInput. Transforming the plan after applying this rule will
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryStageInput -> QueryStage

cachedShuffleRDD
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need cachedShuffleRDD? How it be reused?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, if we execute plan twice, the cachedShuffleRDD could be used.

}

case StageReady(stage) =>
stageToParentStage.remove(stage.id).foreach { parentStage =>
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A stage being reused can have multiple parent stages. Need decrease all parent stages' numPendingChildStages.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is also possible when the reused stage is ready, another parent have not yet been triggered, right? So before triggering the child stages, we may also check if any one has already been ready.

Copy link
Owner

@carsonwang carsonwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @cloud-fan for the efforts!

override def output: Seq[Attribute] = plan.output
override def outputPartitioning: Partitioning = plan.outputPartitioning
override def outputOrdering: Seq[SortOrder] = plan.outputOrdering
override def executeCollect(): Array[InternalRow] = plan.executeCollect()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add executeToIterator?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

currentQueryStage
}

override def executeCollect(): Array[InternalRow] = finalPlan.executeCollect()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add executeToIterator?

@yucai
Copy link

yucai commented Jan 18, 2019

新的方法避免了Operator的递归调用,这样实现很赞!

一个open问题,我们是不是可以引入AdaptiveExchange,随着AE的执行,plan和partitioning都会动态地发生变化,这样,即使是一些预先plan好的Exchange都有可能可以在runtime被省去。

select * 
from store_sales join store on (ss_store_sk = s_store_sk)
join item on (ss_item_sk = i_item_sk)
where s_store_name like 'us%'

store_sales和store的join,本来是sort merge join,
因为我们对store做了过滤,很可能导致sort merge join -> broadcast join
broadcast join之后,partitioning是和store_sales一样的,
而如果store_sales已经按ss_item_sk bucket好了,
这样store_sales_join_store join item时store_sales_join_store就不需要shuffle了。

更激进点的,其实store_sales join store的时候store_sales也是不需要shuffle的。

使用AdaptiveExchange的关键在于,这个Exchange不是一定会发生的,它动态地判断目前的partitioning,决定是否做shuffle,在一些情况下可以退化为一个空操作。

这样我们可以从原来的4个shuffle,减少到2个shuffle,并且省去的是大表shuffle。

s"coordinator[target post-shuffle partition size: $advisoryTargetPostShuffleInputSize]"
case class CoalescedShuffleReaderExec(
child: ShuffleQueryStage,
partitionStartIndices: Array[Int]) extends LeafExecNode {
Copy link

@yucai yucai Jan 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make it UnaryExecNode? Otherwise, we cannot see its child in UI.

ThreadUtils.awaitResult(metricsFuture, Duration.Zero)
}.filter(_ != null) // ShuffleQueryStage may give null mapOutputStatistics, skip it.

if (shuffleMetrics.nonEmpty) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When bucket table joins non-bucket table, it could fail.

For example:

sql("drop table bucketed_table1").collect
sql("drop table table2").collect

val df1 = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1")
val df2 = (0 until 50).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k").as("df2")
df1.write.format("parquet").bucketBy(20, "i").saveAsTable("bucketed_table1")
df2.write.format("parquet").saveAsTable("table2")

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0")
spark.conf.set("spark.sql.adaptive.maxNumPostShufflePartitions", 10)
sql("select * from bucketed_table1 t1 join table2 t2 on t1.i = t2.i").collect()
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(20, 1)
  at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:58)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:254)
  at scala.Option.getOrElse(Option.scala:138)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:252)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:254)
  at scala.Option.getOrElse(Option.scala:138)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:252)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:254)

* statistics.
*/
case class AdaptiveSparkPlan(resultStage: ResultQueryStage, session: SparkSession)
extends LeafExecNode{
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need special consideration in SparkPlanInfo.fromSparkPlan?

  def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
    val children = plan match {
      case ReusedExchangeExec(_, child) => child :: Nil
      case stage: QueryStage => stage.plan :: Nil
      case adaptive: AdaptiveSparkPlan => adaptive.plan :: Nil
      case _ => plan.children ++ plan.subqueries
    }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we need, good catch!

@cloud-fan
Copy link
Author

随着AE的执行,plan和partitioning都会动态地发生变化,这样,即使是一些预先plan好的Exchange都有可能可以在runtime被省去。

好想法!按照现在的framework,还得runtime动态合并query stage,比较tricky。其实深入想一下,根本原因是我们等EnsureRequirements之后才开始分割query stage,导致有些exchange到了后面变得冗余。 有个优雅的办法是,我们bottom-up的做planning,当需要shuffle的时候,把当前的subtree包成一个query stage去materialize,然后继续往上planning+create query stage。不过这个改动比较大,我们可以以后再做。

} else {
// If not all leaf nodes are shuffle query stages, it's not safe to reduce the number of
// shuffle partitions, because we may break the assumption that all children of a spark plan
// have same number of output partitions.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@carsonwang
Copy link
Owner

Thanks @cloud-fan very much. Let's merge this.

@carsonwang carsonwang merged commit ea93dbf into carsonwang:AE_1 Jan 22, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants