-
Notifications
You must be signed in to change notification settings - Fork 0
insert query stages dynamically #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
d1a3c60
to
5e233e4
Compare
65fd25a
to
8f20f69
Compare
private val postStageCreationOptimizerRules: Seq[Rule[SparkPlan]] = Seq( | ||
ReduceNumShufflePartitions(conf), | ||
CollapseCodegenStages(conf), | ||
ReuseSubquery(conf)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ReuseSubquery
will only be applied to a single stage, it seems it can't be reused by another stage now.
case class InsertAdaptiveSparkPlan(session: SparkSession) extends Rule[SparkPlan] { | ||
|
||
override def apply(plan: SparkPlan): SparkPlan = plan match { | ||
case _: ExecutedCommandExec => plan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also need support AE for ExecutedCommandExec
. For example the last node may be a CreateHiveTableAsSelectCommand
, we also want to run the plan in AE mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CreateHiveTableAsSelectCommand
is not a ExecutedCommandExec
any more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CreateHiveTableAsSelectCommand
is not a ExecutedCommandExec
any more.
stop() | ||
} else { | ||
readyStages += stage.id | ||
currentPlan = createQueryStages(currentPlan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The initial call and this call to createQueryStages
are invoked from two different threads? Only one thread should call it, right?
val results = plan.children.map(createQueryStages0) | ||
CreateStageResult( | ||
newPlan = plan.withNewChildren(results.map(_.newPlan)), | ||
allChildStagesReady = results.forall(_.allChildStagesReady)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a case for leaf nodes? This also works but is not obvious to me at the first look..
Great work, @cloud-fan ! I just left a few comments. |
No description provided.