Skip to content

SNAP-1840 TPCH Q22 failure #68

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed

SNAP-1840 TPCH Q22 failure #68

wants to merge 3 commits into from

Conversation

ymahajan
Copy link

@ymahajan ymahajan commented Aug 8, 2017

What changes were proposed in this pull request?

The TPCH Q22 with less data generates a plan involving BroadcastHashJoin. But if you increase more data in those tables, it uses SortMergeJoinExec. And whenever the SortMergeJoinExec(and LeftAnti) is selected and query is executed incorrectly with InputAdapter. The changes which came from this checkin seem to be problematic. #16 It causes different execution paths to be taken. Also this will be different for embedded vs split mode. Tried to revert the WholeStageCodeGen changes from this checkin but aqp dunits start failing. Ideally we should match WholeStageCodeGen#insertWholeStageCodegen that of stock spark. For now i have avoided cases involving SortMergeJoinExec. This bug reproduced using same Q22 in local mode after ingesting more data (to choose for SortMergeJoin)

  • query plan with less data - where query execution succeeds as it selects
    BroadcastHashJoin
== Physical Plan ==
*CachedPlanHelper
+- *Sort [CNTRYCODE#807 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(CNTRYCODE#807 ASC NULLS FIRST, 5)
      +- *CachedPlanHelper
         +- *SnappyHashAggregate(keys=[CNTRYCODE#807], modes=Final, functions=[count(1), sum(C_ACCTBAL#461)])
            +- Exchange hashpartitioning(CNTRYCODE#807, 5)
               +- *CachedPlanHelper
                  +- *SnappyHashAggregate(keys=[CNTRYCODE#807], modes=Partial, functions=[partial_count(1), partial_sum(C_ACCTBAL#461)])
                     +- *Project [substring(C_PHONE#460, 1, 2) AS CNTRYCODE#807, C_ACCTBAL#461]
                        +- *BroadcastHashJoin [C_CUSTKEY#456], [O_CUSTKEY#51], LeftAnti, BuildRight
                           :- *Filter (substring(C_PHONE#460, 1, 2) IN (13,31,23,29,30,18,17) && (C_ACCTBAL#461 > Subquery subquery808))
                           :  :  +- Subquery subquery808
                           :  :     +- CollectAggregate SnappyHashAggregate(keys=[], modes=Final, functions=[avg(C_ACCTBAL#461)])
                           :  :        +- *SnappyHashAggregate(keys=[], modes=Partial, functions=[partial_avg(C_ACCTBAL#461)])
                           :  :           +- *Project [C_ACCTBAL#461]
                           :  :              +- *Filter ((C_ACCTBAL#461 > DynamicExpression(DynamicExpression(cast(0.00 as double)))) && substring(C_PHONE#460, 1, 2) IN (13,31,23,29,30,18,17))
                           :  :                 +- *Partitioned Scan RowFormatRelation[APP.CUSTOMER] , Requested Columns = [C_ACCTBAL#461,C_PHONE#460] partitionColumns = [C_CUSTKEY#456] numBuckets= 5 numPartitions= 5
                           :  +- *Partitioned Scan RowFormatRelation[APP.CUSTOMER] , Requested Columns = [C_CUSTKEY#456,C_NAME#457,C_ADDRESS#458,C_NATIONKEY#459,C_PHONE#460,C_ACCTBAL#461,C_MKTSEGMENT#462,C_COMMENT#463] partitionColumns = [C_CUSTKEY#456] numBuckets= 5 numPartitions= 5
                           +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
                              +- *CachedPlanHelper
                                 +- *Partitioned Scan RowFormatRelation[APP.ORDERS] , Requested Columns = [O_CUSTKEY#51] partitionColumns = [O_ORDERKEY#50L] numBuckets= 5 numPartitions= 5

  • successful query execution codepath -
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:82)
	at org.apache.spark.sql.execution.FilterExec.produce(basicPhysicalOperators.scala:86)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:77)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:87)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:82)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:82)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:38)
	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:44)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:87)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:82)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:82)
	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:34)
	at org.apache.spark.sql.execution.aggregate.SnappyHashAggregateExec.doProduceWithKeys(SnappyHashAggregateExec.scala:534)
	at org.apache.spark.sql.execution.aggregate.SnappyHashAggregateExec.doProduce(SnappyHashAggregateExec.scala:234)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:87)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:82)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:82)
	at org.apache.spark.sql.execution.aggregate.SnappyHashAggregateExec.produce(SnappyHashAggregateExec.scala:62)
	at org.apache.spark.sql.execution.CachedPlanHelperExec.doProduce(CachedPlanHelperExec.scala:108)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:87)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:82)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:82)
	at org.apache.spark.sql.execution.CachedPlanHelperExec.produce(CachedPlanHelperExec.scala:40)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:317)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:358)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)

  • query plan with more data - where query execution fails (SortMergeJoin case)
== Physical Plan ==
*CachedPlanHelper
+- *Sort [CNTRYCODE#38047 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(CNTRYCODE#38047 ASC NULLS FIRST, 5)
      +- *CachedPlanHelper
         +- *SnappyHashAggregate(keys=[CNTRYCODE#38047], modes=Final, functions=[count(1), sum(C_ACCTBAL#20061)])
            +- Exchange hashpartitioning(CNTRYCODE#38047, 5)
               +- *CachedPlanHelper
                  +- *SnappyHashAggregate(keys=[CNTRYCODE#38047], modes=Partial, functions=[partial_count(1), partial_sum(C_ACCTBAL#20061)])
                     +- *Project [substring(C_PHONE#20060, 1, 2) AS CNTRYCODE#38047, C_ACCTBAL#20061]
                        +- SortMergeJoin [C_CUSTKEY#20056], [O_CUSTKEY#51], LeftAnti
                           :- Sort [C_CUSTKEY#20056 ASC NULLS FIRST], false, 0
                           :  +- Filter (substring(C_PHONE#20060, 1, 2) IN (13,31,23,29,30,18,17) && (C_ACCTBAL#20061 > Subquery subquery38048))
                           :     :  +- Subquery subquery38048
                           :     :     +- CollectAggregate SnappyHashAggregate(keys=[], modes=Final, functions=[avg(C_ACCTBAL#20061)])
                           :     :        +- *SnappyHashAggregate(keys=[], modes=Partial, functions=[partial_avg(C_ACCTBAL#20061)])
                           :     :           +- *Project [C_ACCTBAL#20061]
                           :     :              +- *Filter ((C_ACCTBAL#20061 > DynamicExpression(DynamicExpression(cast(0.00 as double)))) && substring(C_PHONE#20060, 1, 2) IN (13,31,23,29,30,18,17))
                           :     :                 +- *Partitioned Scan RowFormatRelation[APP.CUSTOMER] , Requested Columns = [C_ACCTBAL#20061,C_PHONE#20060] partitionColumns = [C_CUSTKEY#20056] numBuckets= 5 numPartitions= 5
                           :     +- Partitioned Scan RowFormatRelation[APP.CUSTOMER] , Requested Columns = [C_CUSTKEY#20056,C_NAME#20057,C_ADDRESS#20058,C_NATIONKEY#20059,C_PHONE#20060,C_ACCTBAL#20061,C_MKTSEGMENT#20062,C_COMMENT#20063] partitionColumns = [C_CUSTKEY#20056] numBuckets= 5 numPartitions= 5
                           +- Sort [O_CUSTKEY#51 ASC NULLS FIRST], false, 0
                              +- Exchange hashpartitioning(O_CUSTKEY#51, 5)
                                 +- *CachedPlanHelper
                                    +- *Partitioned Scan RowFormatRelation[APP.ORDERS] , Requested Columns = [O_CUSTKEY#51] partitionColumns = [O_ORDERKEY#50L] numBuckets= 5 numPartitions= 5
  • query execution - failure case codepath
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
	at org.apache.spark.sql.execution.SortExec.doExecute(SortExec.scala:101)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
	at org.apache.spark.sql.execution.joins.SortMergeJoinExec.doExecute(SortMergeJoinExec.scala:111)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:239)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:40)
	at org.apache.spark.sql.execution.aggregate.SnappyHashAggregateExec.inputRDDs(SnappyHashAggregateExec.scala:224)
	at org.apache.spark.sql.execution.CachedPlanHelperExec.inputRDDs(CachedPlanHelperExec.scala:52)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:372)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)

How was this patch tested?

clean precheckin

Other PRs ?

None

@ymahajan ymahajan requested review from hbhanawat and sumwale August 8, 2017 07:23
@sumwale
Copy link

sumwale commented Aug 9, 2017

@ymahajan The changes in #16 have always been very dicey to my mind without enough rationale just to get bootstrap to work or improve its performance. The blanket changes to WholeStage and GenerateSafeProjection were especially quite troubling to me though I have insufficient background to tell off-hand what the proper solution should be like @ahshahid asked for.

Unfortunately I didn't have enough time to delve into greater details to tell exactly what was going on, and from the explanation could not tell why those changes were required in the first place. Some of it looked like perf improvements while others looked like fix to some issue. Latter should have been shown in a simplest possible plan first and then proposed upstream. It rather looks like those changes were just a band-aid to hide some other issue with the code generation of bootstrap and related code.

This particular fix is just another band-aid. While I do not understand clearly why this change "fixes" the original issue introduced by #16 but this change looks harmless for now. However, I would like to track a highest priority issue of what changes #16 introduced, why they were necessary or are they any more, what are the other implications of that change etc. We now know SNAP-1840 is one of the implications but I fear that it is just the tip of iceberg.

@hbhanawat do you have any comments on this? Do you understand what #16 actually tried to do, why it was required and whether my fear that it was just a band-aid to hide some other issues with bootstrap code generation has any merit? To my mind #16 lacks sufficient reasoning and should be taken out and replaced with the proper fix (unfortunately figuring that out might be more time consuming that I can afford at this point).

@ymahajan
Copy link
Author

ymahajan commented Aug 9, 2017

Let me do this - I will back out all the changes in #16
I am assuming corresponding PR's on other repos are the following ones based on the dates - (#16 doesn't mention anything about other PRs)

AQP - https://github.com/SnappyDataInc/snappy-aqp/pull/102. This has 48 commits.
And there is one more - https://github.com/SnappyDataInc/snappy-aqp/pull/109

On SnappyData - TIBCOSoftware/snappydata#397

I am assuming no changes were done on store and spark-jobserver side during #16

@sumwale
Copy link

sumwale commented Aug 10, 2017

@ymahajan it might be slightly involved so can do this after 1.0. For now the workaround in this PR is ok but lets track the cleanup for 1.1

@ymahajan
Copy link
Author

ymahajan commented Aug 12, 2017

@sumwale I have reverted #16 changes. And moved the AQP specific handling to a separate rule.
That way it will not impact regular SnappyData functionality. SampleTablePlan and its codegen is very specific to certain conditions in supportCodegen. Other corresponding changes for bootstrap perf are not reverted. precheckin is clean.

@ymahajan ymahajan requested a review from ahshahid August 12, 2017 03:50
@sumwale
Copy link

sumwale commented Aug 12, 2017

@ymahajan These changes look good. However, I am not sure how the new rule added to AQP side will only come into play for AQP queries because SnappyAQPSessionState is the one that is always created (at least in embedded mode but I think in connector mode too).

@ymahajan
Copy link
Author

@sumwale, yeah didn't realize that. Even if the precheckin is clean the original bug test still failing. So the above changes won't be sufficient. I tried adding an additional rule which insertsInputAdapter only for SampleTablePlan but that doesn't work either. I can't think of any other solution but supporting codegen for SampleTablePlan in all the cases, so that supportCodegen is always true for SampleTablePlan.

@sumwale
Copy link

sumwale commented Feb 15, 2018

The changes in 9d7e2ba cleaned up this area quite a bit with snappy-spark test suite being fully clean, so closing this PR.

@sumwale sumwale closed this Feb 15, 2018
@sumwale sumwale deleted the SNAP-1840 branch February 15, 2018 17:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants