-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-29227][SS] Track rule info in optimization phase #25914
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java
Outdated
Show resolved
Hide resolved
|
Just for my own understanding where such tracking info can be seen after it's collected? |
@gaborgsomogyi Thanks for your reply. |
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what's missing here.
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
Lines 91 to 110 in a1b90bf
| /** | |
| * Executes the batches of rules defined by the subclass, and also tracks timing info for each | |
| * rule using the provided tracker. | |
| * @see [[execute]] | |
| */ | |
| def executeAndTrack(plan: TreeType, tracker: QueryPlanningTracker): TreeType = { | |
| QueryPlanningTracker.withTracker(tracker) { | |
| execute(plan) | |
| } | |
| } | |
| /** | |
| * Executes the batches of rules defined by the subclass. The batches are executed serially | |
| * using the defined execution strategy. Within each batch, rules are also executed serially. | |
| */ | |
| def execute(plan: TreeType): TreeType = { | |
| var curPlan = plan | |
| val queryExecutionMetrics = RuleExecutor.queryExecutionMeter | |
| val planChangeLogger = new PlanChangeLogger() | |
| val tracker: Option[QueryPlanningTracker] = QueryPlanningTracker.get |
For now elapsed time for each rule would not be measured in streaming query, and this patch fixes it.
In batch query it does correctly, see below:
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
Lines 77 to 81 in a1b90bf
| lazy val optimizedPlan: LogicalPlan = tracker.measurePhase(QueryPlanningTracker.OPTIMIZATION) { | |
| // clone the plan to avoid sharing the plan instance between different stages like analyzing, | |
| // optimizing and planning. | |
| sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker) | |
| } |
LGTM
gaborgsomogyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Checked a query output manually.
Haven't had a super deep look but I think it can be unit tested.
|
@HyukjinKwon @dongjoon-hyun Is there any thing I can do to move on, Thanks. |
|
ok to test |
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, can we add a test? (see #23096)
19d988b to
060b329
Compare
|
Add commit for UT. |
|
Test build #112394 has finished for PR 25914 at commit
|
sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala
Outdated
Show resolved
Hide resolved
|
Test build #112433 has finished for PR 25914 at commit
|
|
retest this please |
|
Test build #112450 has finished for PR 25914 at commit
|
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM again
|
merge master to redo check, nothing changed. |
|
Test build #112541 has finished for PR 25914 at commit
|
sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala
Show resolved
Hide resolved
|
Merged to master. |
|
@HeartSaVioR @gaborgsomogyi @HyukjinKwon @dongjoon-hyun Thank you all for review and merge. |
| lazy val optimizedPlan: LogicalPlan = tracker.measurePhase(QueryPlanningTracker.OPTIMIZATION) { | ||
| sparkSession.sessionState.optimizer.execute(withCachedData) transformAllExpressions { | ||
| sparkSession.sessionState.optimizer.executeAndTrack(withCachedData, | ||
| tracker) transformAllExpressions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpit: in most cases, we do not break the function call in the middle of parameter lists. We can change it to
val sessionState = sparkSession.sessionState
sessionState.optimizer.executeAndTrack(withCachedData, tracker).transformAllExpressions {or
sparkSession.sessionState.optimizer
.executeAndTrack(withCachedData, tracker).transformAllExpressions {
What changes were proposed in this pull request?
Track timing info for each rule in optimization phase using
QueryPlanningTrackerin Structured StreamingWhy are the changes needed?
In Structured Streaming we only track rule info in analysis phase, not in optimization phase.
Does this PR introduce any user-facing change?
No