Skip to content

SPARK-1627: Support external aggregation by using Aggregator in Spark SQL #867

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from

Conversation

lamuguo
Copy link

@lamuguo lamuguo commented May 23, 2014

No description provided.

lamuguo added 4 commits May 1, 2014 23:04
First try.

Some test errors, but matches with another baseline result. Will double
check whether all test cases can be passed currently later.

Error info below:
[info] Passed: Total 197, Failed 0, Errors 0, Passed 195, Skipped 2
[error] (repl/test:test) sbt.TestsFailedException: Tests unsuccessful
[error] (streaming/test:test) sbt.TestsFailedException: Tests
unsuccessful
[error] (core/test:test) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 1755 s, completed May 1, 2014 10:49:27 PM
And fixed some style problems.
To eliminate saving of rows in interim data.
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@lamuguo
Copy link
Author

lamuguo commented May 23, 2014

Possible to forward to Reynold? Thanks!

@hsaputra
Copy link
Contributor

Hi,

Please add more description on what is the desired affect.
It would be also appreciated if you file ASF JIRA ticket [1] in addition to help trace the changes.

[1] https://issues.apache.org/jira/secure/Dashboard.jspa

// AggregateFunction.update(agg: AggregateFunction) in the future.
def createCombiner(row: Row) = mergeValue(newAggregateBuffer(), row)
def mergeValue(buffer: Array[AggregateFunction], row: Row) = {
for (i <- 0 to buffer.length - 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be better to rewrite this using a while loop, since while loops perform much better than for loop in Scala.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@rxin
Copy link
Contributor

rxin commented May 25, 2014

@lamuguo the issue is https://issues.apache.org/jira/browse/SPARK-1627

Please update the title of the pull request to:

SPARK-1627: Support external aggregation by using Aggregator in Spark SQL

val group = entry._1
val data = entry._2

for (i <- 0 to data.length - 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, while loop here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi there,

I made some changes per comments couple of days ago here:
#867 (comment). Please take
another look. Thanks!

Best Regards,
Xiaofeng

On Sat, May 24, 2014 at 8:52 PM, Reynold Xin notifications@github.com
wrote:

In sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala:

       override final def next(): Row = {
  •        val currentEntry = hashTableIter.next()
    
  •        val currentGroup = currentEntry.getKey
    

- val currentBuffer = currentEntry.getValue

  •        var i = 0
    
  •        while (i < currentBuffer.length) {
    
  •          // Evaluating an aggregate buffer returns the result.  No row is required since we
    
  •          // already added all rows in the group using update.
    
  •          aggregateResults(i) = currentBuffer(i).eval(EmptyRow)
    
  •          i += 1
    
  •        val entry = aggIter.next()
    
  •        val group = entry._1
    
  •        val data = entry._2
    
  •        for (i <- 0 to data.length - 1) {
    

again, while loop here.


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/867/files#r13029115.

@lamuguo lamuguo changed the title Use Aggregator for Spark SQL SPARK-1627: Support external aggregation by using Aggregator in Spark SQL May 30, 2014
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@chenghao-intel
Copy link
Contributor

This is very helpful when the partition can not fit in memory. However, I think we'd better keep previous implementation; Aggregator in spark is for very typical scenario, but we do have many algorithms / ways to optimize the aggregation in SQL for performance, hence tightly coupled with Spark Aggregator may not a good idea for further improvement.

@marmbrus
Copy link
Contributor

marmbrus commented Sep 3, 2014

I think this has been subsumed by #1822, so we should close this issue for now. If there is anything missing from the implementation there please let us know!

@SparkQA
Copy link

SparkQA commented Sep 5, 2014

Can one of the admins verify this patch?

@asfgit asfgit closed this in eae81b0 Sep 12, 2014
wangyum pushed a commit that referenced this pull request May 26, 2023
<!--
Thanks for sending a pull request!  Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
  2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
  4. Be sure to keep the PR description updated to reflect all changes.
  5. Please write your PR title to summarize what this PR proposes.
  6. If possible, provide a concise example to reproduce the issue for a faster review.
-->

### What changes were proposed in this pull request?

Bug fix

### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
  1. If you propose a new API, clarify the use case for a new API.
  2. If you fix a bug, you can clarify why it is a bug.
-->


### Does this PR introduce any user-facing change?
<!--
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If no, write 'No'.
-->


### How was this patch tested?

Added UT
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants