Skip to content

[SPARK-13511][SQL] Add wholestage codegen for limit #11391

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Feb 26, 2016

JIRA: https://issues.apache.org/jira/browse/SPARK-13511

What changes were proposed in this pull request?

Current limit operator doesn't support wholestage codegen. This is open to add support for it.

In the doConsume of GlobalLimit and LocalLimit, we use a count term to count the processed rows. Once the row numbers catches the limit number, we set the variable stopEarly of BufferedRowIterator newly added in this pr to true that indicates we want to stop processing remaining rows. Then when the wholestage codegen framework checks shouldStop(), it will stop the processing of the row iterator.

Before this, the executed plan for a query sqlContext.range(N).limit(100).groupBy().sum() is:

TungstenAggregate(key=[], functions=[(sum(id#5L),mode=Final,isDistinct=false)], output=[sum(id)#6L])
+- TungstenAggregate(key=[], functions=[(sum(id#5L),mode=Partial,isDistinct=false)], output=[sum#9L])
   +- GlobalLimit 100
      +- Exchange SinglePartition, None
         +- LocalLimit 100
            +- Range 0, 1, 1, 524288000, [id#5L]

After add wholestage codegen support:

WholeStageCodegen
:  +- TungstenAggregate(key=[], functions=[(sum(id#40L),mode=Final,isDistinct=false)], output=[sum(id)#41L])
:     +- TungstenAggregate(key=[], functions=[(sum(id#40L),mode=Partial,isDistinct=false)], output=[sum#44L])
:        +- GlobalLimit 100
:           +- INPUT
+- Exchange SinglePartition, None
   +- WholeStageCodegen
      :  +- LocalLimit 100
      :     +- Range 0, 1, 1, 524288000, [id#40L]

How was this patch tested?

A test is added into BenchmarkWholeStageCodegen.

@SparkQA
Copy link

SparkQA commented Feb 26, 2016

Test build #52046 has finished for PR 11391 at commit a213ed1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode with CodegenSupport
    • trait BaseLimit extends UnaryNode with CodegenSupport

@viirya
Copy link
Member Author

viirya commented Feb 26, 2016

Just realized this implementation is wrong. Will fix it later.

@hvanhovell
Copy link
Contributor

Mark it as WIP?

| $countTerm += 1;
| ${consume(ctx, ctx.currentVars)}
| }
| if (true) return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is special?

@viirya viirya changed the title [SPARK-13511][SQL] Add wholestage codegen for limit [SPARK-13511][SQL][WIP] Add wholestage codegen for limit Feb 26, 2016
@SparkQA
Copy link

SparkQA commented Feb 26, 2016

Test build #52053 has finished for PR 11391 at commit fa38e7c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Feb 26, 2016

Can you update the pr description to describe briefly how you are supporting it?

@SparkQA
Copy link

SparkQA commented Feb 27, 2016

Test build #52107 has finished for PR 11391 at commit 8bac699.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 27, 2016

Test build #52110 has finished for PR 11391 at commit 9c072aa.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode

@viirya
Copy link
Member Author

viirya commented Feb 27, 2016

The failed test ParquetHadoopFsRelationSuite is due to the lack of short type support in UnsafeRowParquetRecordReader. I submitted another PR #11412 to fix it. The change is also included here to prove the tests can be passed.

@SparkQA
Copy link

SparkQA commented Feb 27, 2016

Test build #52118 has finished for PR 11391 at commit 8e69d8d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya viirya changed the title [SPARK-13511][SQL][WIP] Add wholestage codegen for limit [SPARK-13511][SQL] Add wholestage codegen for limit Feb 28, 2016
@viirya
Copy link
Member Author

viirya commented Feb 29, 2016

cc @davies

@@ -765,6 +765,9 @@ private void readIntBatch(int rowId, int num, ColumnVector column) throws IOExce
} else if (DecimalType.is64BitDecimalType(column.dataType())) {
defColumn.readIntsAsLongs(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else if (column.dataType() == DataTypes.ShortType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you pull this out as an separate PR? so we can merge it quickly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it has been merged, let me sync with it.

@SparkQA
Copy link

SparkQA commented Mar 1, 2016

Test build #52213 has finished for PR 11391 at commit c887cf4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -35,6 +35,8 @@
// used when there is no column in output
protected UnsafeRow unsafeRow = new UnsafeRow(0);

protected boolean stopEarly = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since stopEarly is only accessed generated functions, we don't need this anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use addMutableState

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. I am updating it.


assert(plan.find(p =>
p.isInstanceOf[WholeStageCodegen] &&
p.asInstanceOf[WholeStageCodegen].plan.isInstanceOf[Sort] &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sort is not related to limit, could you remove it from this PR? (we may revert the commit for sort)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, because we can't leave limit as last operator otherwise it will transform to collect limit, so I add a sort here. I will remove it once I am back to laptop (few hours later).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These kind of tests are easy to break, we may don't need this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Let me remove this later.

@SparkQA
Copy link

SparkQA commented Mar 1, 2016

Test build #52219 has finished for PR 11391 at commit 8d254d2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 1, 2016

Test build #52222 has finished for PR 11391 at commit b64e52d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 1, 2016

Test build #52237 has finished for PR 11391 at commit 3d1e397.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor

davies commented Mar 1, 2016

LGTM, merging this into master, thanks!

@asfgit asfgit closed this in c43899a Mar 1, 2016
roygao94 pushed a commit to roygao94/spark that referenced this pull request Mar 22, 2016
JIRA: https://issues.apache.org/jira/browse/SPARK-13511

## What changes were proposed in this pull request?

Current limit operator doesn't support wholestage codegen. This is open to add support for it.

In the `doConsume` of `GlobalLimit` and `LocalLimit`, we use a count term to count the processed rows. Once the row numbers catches the limit number, we set the variable `stopEarly` of `BufferedRowIterator` newly added in this pr to `true` that indicates we want to stop processing remaining rows. Then when the wholestage codegen framework checks `shouldStop()`, it will stop the processing of the row iterator.

Before this, the executed plan for a query `sqlContext.range(N).limit(100).groupBy().sum()` is:

    TungstenAggregate(key=[], functions=[(sum(id#5L),mode=Final,isDistinct=false)], output=[sum(id)#6L])
    +- TungstenAggregate(key=[], functions=[(sum(id#5L),mode=Partial,isDistinct=false)], output=[sum#9L])
       +- GlobalLimit 100
          +- Exchange SinglePartition, None
             +- LocalLimit 100
                +- Range 0, 1, 1, 524288000, [id#5L]

After add wholestage codegen support:

    WholeStageCodegen
    :  +- TungstenAggregate(key=[], functions=[(sum(id#40L),mode=Final,isDistinct=false)], output=[sum(id)#41L])
    :     +- TungstenAggregate(key=[], functions=[(sum(id#40L),mode=Partial,isDistinct=false)], output=[sum#44L])
    :        +- GlobalLimit 100
    :           +- INPUT
    +- Exchange SinglePartition, None
       +- WholeStageCodegen
          :  +- LocalLimit 100
          :     +- Range 0, 1, 1, 524288000, [id#40L]

## How was this patch tested?

A test is added into BenchmarkWholeStageCodegen.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes apache#11391 from viirya/wholestage-limit.
@viirya viirya deleted the wholestage-limit branch December 27, 2023 18:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants