-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-13511][SQL] Add wholestage codegen for limit #11391
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #52046 has finished for PR 11391 at commit
|
Just realized this implementation is wrong. Will fix it later. |
Mark it as WIP? |
| $countTerm += 1; | ||
| ${consume(ctx, ctx.currentVars)} | ||
| } | ||
| if (true) return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is special?
Test build #52053 has finished for PR 11391 at commit
|
Can you update the pr description to describe briefly how you are supporting it? |
8bac699
to
9c072aa
Compare
Test build #52107 has finished for PR 11391 at commit
|
Test build #52110 has finished for PR 11391 at commit
|
The failed test |
Test build #52118 has finished for PR 11391 at commit
|
cc @davies |
@@ -765,6 +765,9 @@ private void readIntBatch(int rowId, int num, ColumnVector column) throws IOExce | |||
} else if (DecimalType.is64BitDecimalType(column.dataType())) { | |||
defColumn.readIntsAsLongs( | |||
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); | |||
} else if (column.dataType() == DataTypes.ShortType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you pull this out as an separate PR? so we can merge it quickly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, it has been merged, let me sync with it.
Test build #52213 has finished for PR 11391 at commit
|
@@ -35,6 +35,8 @@ | |||
// used when there is no column in output | |||
protected UnsafeRow unsafeRow = new UnsafeRow(0); | |||
|
|||
protected boolean stopEarly = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since stopEarly
is only accessed generated functions, we don't need this anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use addMutableState
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah. I am updating it.
|
||
assert(plan.find(p => | ||
p.isInstanceOf[WholeStageCodegen] && | ||
p.asInstanceOf[WholeStageCodegen].plan.isInstanceOf[Sort] && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sort is not related to limit, could you remove it from this PR? (we may revert the commit for sort)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, because we can't leave limit as last operator otherwise it will transform to collect limit, so I add a sort here. I will remove it once I am back to laptop (few hours later).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These kind of tests are easy to break, we may don't need this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Let me remove this later.
Test build #52219 has finished for PR 11391 at commit
|
Test build #52222 has finished for PR 11391 at commit
|
Test build #52237 has finished for PR 11391 at commit
|
LGTM, merging this into master, thanks! |
JIRA: https://issues.apache.org/jira/browse/SPARK-13511 ## What changes were proposed in this pull request? Current limit operator doesn't support wholestage codegen. This is open to add support for it. In the `doConsume` of `GlobalLimit` and `LocalLimit`, we use a count term to count the processed rows. Once the row numbers catches the limit number, we set the variable `stopEarly` of `BufferedRowIterator` newly added in this pr to `true` that indicates we want to stop processing remaining rows. Then when the wholestage codegen framework checks `shouldStop()`, it will stop the processing of the row iterator. Before this, the executed plan for a query `sqlContext.range(N).limit(100).groupBy().sum()` is: TungstenAggregate(key=[], functions=[(sum(id#5L),mode=Final,isDistinct=false)], output=[sum(id)#6L]) +- TungstenAggregate(key=[], functions=[(sum(id#5L),mode=Partial,isDistinct=false)], output=[sum#9L]) +- GlobalLimit 100 +- Exchange SinglePartition, None +- LocalLimit 100 +- Range 0, 1, 1, 524288000, [id#5L] After add wholestage codegen support: WholeStageCodegen : +- TungstenAggregate(key=[], functions=[(sum(id#40L),mode=Final,isDistinct=false)], output=[sum(id)#41L]) : +- TungstenAggregate(key=[], functions=[(sum(id#40L),mode=Partial,isDistinct=false)], output=[sum#44L]) : +- GlobalLimit 100 : +- INPUT +- Exchange SinglePartition, None +- WholeStageCodegen : +- LocalLimit 100 : +- Range 0, 1, 1, 524288000, [id#40L] ## How was this patch tested? A test is added into BenchmarkWholeStageCodegen. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes apache#11391 from viirya/wholestage-limit.
JIRA: https://issues.apache.org/jira/browse/SPARK-13511
What changes were proposed in this pull request?
Current limit operator doesn't support wholestage codegen. This is open to add support for it.
In the
doConsume
ofGlobalLimit
andLocalLimit
, we use a count term to count the processed rows. Once the row numbers catches the limit number, we set the variablestopEarly
ofBufferedRowIterator
newly added in this pr totrue
that indicates we want to stop processing remaining rows. Then when the wholestage codegen framework checksshouldStop()
, it will stop the processing of the row iterator.Before this, the executed plan for a query
sqlContext.range(N).limit(100).groupBy().sum()
is:After add wholestage codegen support:
How was this patch tested?
A test is added into BenchmarkWholeStageCodegen.