Skip to content

insert query stages dynamically #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 31, 2019
Merged

Conversation

cloud-fan
Copy link

No description provided.

@cloud-fan cloud-fan force-pushed the help branch 2 times, most recently from d1a3c60 to 5e233e4 Compare January 28, 2019 13:57
@cloud-fan cloud-fan force-pushed the help branch 2 times, most recently from 65fd25a to 8f20f69 Compare January 29, 2019 14:42
private val postStageCreationOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
ReduceNumShufflePartitions(conf),
CollapseCodegenStages(conf),
ReuseSubquery(conf))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReuseSubquery will only be applied to a single stage, it seems it can't be reused by another stage now.

case class InsertAdaptiveSparkPlan(session: SparkSession) extends Rule[SparkPlan] {

override def apply(plan: SparkPlan): SparkPlan = plan match {
case _: ExecutedCommandExec => plan
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need support AE for ExecutedCommandExec. For example the last node may be a CreateHiveTableAsSelectCommand, we also want to run the plan in AE mode.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateHiveTableAsSelectCommand is not a ExecutedCommandExec any more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateHiveTableAsSelectCommand is not a ExecutedCommandExec any more.

stop()
} else {
readyStages += stage.id
currentPlan = createQueryStages(currentPlan)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial call and this call to createQueryStages are invoked from two different threads? Only one thread should call it, right?

val results = plan.children.map(createQueryStages0)
CreateStageResult(
newPlan = plan.withNewChildren(results.map(_.newPlan)),
allChildStagesReady = results.forall(_.allChildStagesReady))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a case for leaf nodes? This also works but is not obvious to me at the first look..

@carsonwang
Copy link
Owner

Great work, @cloud-fan ! I just left a few comments.

@carsonwang carsonwang merged commit 068ef94 into carsonwang:AE_1 Jan 31, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants