-
Notifications
You must be signed in to change notification settings - Fork 0
simplify QueryStage #5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
3129cd7
to
62709d5
Compare
@@ -37,44 +33,25 @@ import org.apache.spark.sql.types.StructType | |||
case class PlanQueryStage(conf: SQLConf) extends Rule[SparkPlan] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
conf: SQLConf
is no longer needed.
cachedRDD | ||
} | ||
|
||
def planToRun: SparkPlan = finalPlan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the thinking behind planToRun
, maybe directly use finalPlan is clean semantics.
override def outputPartitioning: Partitioning = child.outputPartitioning | ||
|
||
override def outputOrdering: Seq[SortOrder] = child.outputOrdering | ||
abstract class QueryStage extends LeafExecNode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand the reasons for changing QueryStage from UnaryExecNode to LeafExecNode are:
- Getting rid of
var
in param list. - The original approach of UnaryExecNode didn't override
child
.
Is that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because I removed QueryStageInput
protected def adaptivePreparations: Seq[Rule[SparkPlan]] = Seq( | ||
PlanSubqueries(sparkSession), | ||
EnsureRequirements(sparkSession.sessionState.conf), | ||
ReuseExchange(sparkSession.sessionState.conf), | ||
ReuseSubquery(sparkSession.sessionState.conf), | ||
// PlanQueryStage needs to be the last rule because it divides the plan into multiple sub-trees | ||
// by inserting leaf node QueryStageInput. Transforming the plan after applying this rule will |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QueryStageInput
-> QueryStage
cachedShuffleRDD | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need cachedShuffleRDD
? How it be reused?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm, if we execute plan twice, the cachedShuffleRDD
could be used.
} | ||
|
||
case StageReady(stage) => | ||
stageToParentStage.remove(stage.id).foreach { parentStage => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A stage being reused can have multiple parent stages. Need decrease all parent stages' numPendingChildStages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is also possible when the reused stage is ready, another parent have not yet been triggered, right? So before triggering the child stages, we may also check if any one has already been ready.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @cloud-fan for the efforts!
override def output: Seq[Attribute] = plan.output | ||
override def outputPartitioning: Partitioning = plan.outputPartitioning | ||
override def outputOrdering: Seq[SortOrder] = plan.outputOrdering | ||
override def executeCollect(): Array[InternalRow] = plan.executeCollect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also add executeToIterator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
currentQueryStage | ||
} | ||
|
||
override def executeCollect(): Array[InternalRow] = finalPlan.executeCollect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also add executeToIterator?
新的方法避免了Operator的递归调用,这样实现很赞! 一个open问题,我们是不是可以引入
store_sales和store的join,本来是sort merge join, 更激进点的,其实store_sales join store的时候store_sales也是不需要shuffle的。 使用 这样我们可以从原来的4个shuffle,减少到2个shuffle,并且省去的是大表shuffle。 |
s"coordinator[target post-shuffle partition size: $advisoryTargetPostShuffleInputSize]" | ||
case class CoalescedShuffleReaderExec( | ||
child: ShuffleQueryStage, | ||
partitionStartIndices: Array[Int]) extends LeafExecNode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make it UnaryExecNode
? Otherwise, we cannot see its child in UI.
ThreadUtils.awaitResult(metricsFuture, Duration.Zero) | ||
}.filter(_ != null) // ShuffleQueryStage may give null mapOutputStatistics, skip it. | ||
|
||
if (shuffleMetrics.nonEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When bucket table joins non-bucket table, it could fail.
For example:
sql("drop table bucketed_table1").collect
sql("drop table table2").collect
val df1 = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1")
val df2 = (0 until 50).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k").as("df2")
df1.write.format("parquet").bucketBy(20, "i").saveAsTable("bucketed_table1")
df2.write.format("parquet").saveAsTable("table2")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0")
spark.conf.set("spark.sql.adaptive.maxNumPostShufflePartitions", 10)
sql("select * from bucketed_table1 t1 join table2 t2 on t1.i = t2.i").collect()
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(20, 1)
at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:58)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:254)
at scala.Option.getOrElse(Option.scala:138)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:252)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:254)
at scala.Option.getOrElse(Option.scala:138)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:252)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:254)
* statistics. | ||
*/ | ||
case class AdaptiveSparkPlan(resultStage: ResultQueryStage, session: SparkSession) | ||
extends LeafExecNode{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need special consideration in SparkPlanInfo.fromSparkPlan
?
def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
val children = plan match {
case ReusedExchangeExec(_, child) => child :: Nil
case stage: QueryStage => stage.plan :: Nil
case adaptive: AdaptiveSparkPlan => adaptive.plan :: Nil
case _ => plan.children ++ plan.subqueries
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes we need, good catch!
好想法!按照现在的framework,还得runtime动态合并query stage,比较tricky。其实深入想一下,根本原因是我们等EnsureRequirements之后才开始分割query stage,导致有些exchange到了后面变得冗余。 有个优雅的办法是,我们bottom-up的做planning,当需要shuffle的时候,把当前的subtree包成一个query stage去materialize,然后继续往上planning+create query stage。不过这个改动比较大,我们可以以后再做。 |
} else { | ||
// If not all leaf nodes are shuffle query stages, it's not safe to reduce the number of | ||
// shuffle partitions, because we may break the assumption that all children of a spark plan | ||
// have same number of output partitions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bug fix, the test case is https://github.com/carsonwang/spark/pull/5/files#diff-ab139fa5ac5c1a7f4c8bd15970db3567R489
Thanks @cloud-fan very much. Let's merge this. |
No description provided.