Skip to content

Conversation

@wenxuanguan
Copy link
Contributor

What changes were proposed in this pull request?

Track timing info for each rule in optimization phase using QueryPlanningTracker in Structured Streaming

Why are the changes needed?

In Structured Streaming we only track rule info in analysis phase, not in optimization phase.

Does this PR introduce any user-facing change?

No

@wenxuanguan wenxuanguan changed the title [SPARK-29227][SS]track rule info in optimization phase [SPARK-29227][SS]Track rule info in optimization phase Sep 24, 2019
@gaborgsomogyi
Copy link
Contributor

Just for my own understanding where such tracking info can be seen after it's collected?

@wenxuanguan
Copy link
Contributor Author

Just for my own understanding where such tracking info can be seen after it's collected?

@gaborgsomogyi Thanks for your reply.
Follow by jira SPARK-26221, it is improvements for better metrics and instrumentation. And the reporting functions can be used for self-test at present.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what's missing here.

/**
* Executes the batches of rules defined by the subclass, and also tracks timing info for each
* rule using the provided tracker.
* @see [[execute]]
*/
def executeAndTrack(plan: TreeType, tracker: QueryPlanningTracker): TreeType = {
QueryPlanningTracker.withTracker(tracker) {
execute(plan)
}
}
/**
* Executes the batches of rules defined by the subclass. The batches are executed serially
* using the defined execution strategy. Within each batch, rules are also executed serially.
*/
def execute(plan: TreeType): TreeType = {
var curPlan = plan
val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
val planChangeLogger = new PlanChangeLogger()
val tracker: Option[QueryPlanningTracker] = QueryPlanningTracker.get

For now elapsed time for each rule would not be measured in streaming query, and this patch fixes it.

In batch query it does correctly, see below:

lazy val optimizedPlan: LogicalPlan = tracker.measurePhase(QueryPlanningTracker.OPTIMIZATION) {
// clone the plan to avoid sharing the plan instance between different stages like analyzing,
// optimizing and planning.
sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)
}

LGTM

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Checked a query output manually.
Haven't had a super deep look but I think it can be unit tested.

@wenxuanguan
Copy link
Contributor Author

@HyukjinKwon @dongjoon-hyun Is there any thing I can do to move on, Thanks.

@HyukjinKwon
Copy link
Member

ok to test

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, can we add a test? (see #23096)

@wenxuanguan
Copy link
Contributor Author

Add commit for UT.
The only change is in QueryPlanningTrackerEndToEndSuite.

@SparkQA
Copy link

SparkQA commented Oct 21, 2019

Test build #112394 has finished for PR 25914 at commit 060b329.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class QueryPlanningTrackerEndToEndSuite extends StreamTest

@SparkQA
Copy link

SparkQA commented Oct 22, 2019

Test build #112433 has finished for PR 25914 at commit 41fddba.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Oct 22, 2019

Test build #112450 has finished for PR 25914 at commit 41fddba.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM again

@wenxuanguan
Copy link
Contributor Author

merge master to redo check, nothing changed.

@SparkQA
Copy link

SparkQA commented Oct 23, 2019

Test build #112541 has finished for PR 25914 at commit 81b891c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • skeleton_class = type_constructor(name, bases, type_kwargs)
  • enum_class = metacls.__new__(metacls, name, bases, classdict)
  • abstract class AbstractSqlParser(conf: SQLConf) extends ParserInterface with Logging
  • class CatalystSqlParser(conf: SQLConf) extends AbstractSqlParser(conf)
  • case class CreateNamespaceStatement(
  • case class TruncateTableStatement(
  • case class ShowPartitionsStatement(tableName: Seq[String],
  • case class CreateNamespace(
  • class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser(conf)
  • case class CreateNamespaceExec(

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-29227][SS]Track rule info in optimization phase [SPARK-29227][SS] Track rule info in optimization phase Oct 24, 2019
@HyukjinKwon
Copy link
Member

Merged to master.

@wenxuanguan
Copy link
Contributor Author

@HeartSaVioR @gaborgsomogyi @HyukjinKwon @dongjoon-hyun Thank you all for review and merge.

lazy val optimizedPlan: LogicalPlan = tracker.measurePhase(QueryPlanningTracker.OPTIMIZATION) {
sparkSession.sessionState.optimizer.execute(withCachedData) transformAllExpressions {
sparkSession.sessionState.optimizer.executeAndTrack(withCachedData,
tracker) transformAllExpressions {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpit: in most cases, we do not break the function call in the middle of parameter lists. We can change it to

    val sessionState = sparkSession.sessionState
    sessionState.optimizer.executeAndTrack(withCachedData, tracker).transformAllExpressions {

or

    sparkSession.sessionState.optimizer
      .executeAndTrack(withCachedData, tracker).transformAllExpressions {

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants