-
Notifications
You must be signed in to change notification settings - Fork 28.5k
SPARK-1627: Support external aggregation by using Aggregator in Spark SQL #867
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
First try. Some test errors, but matches with another baseline result. Will double check whether all test cases can be passed currently later. Error info below: [info] Passed: Total 197, Failed 0, Errors 0, Passed 195, Skipped 2 [error] (repl/test:test) sbt.TestsFailedException: Tests unsuccessful [error] (streaming/test:test) sbt.TestsFailedException: Tests unsuccessful [error] (core/test:test) sbt.TestsFailedException: Tests unsuccessful [error] Total time: 1755 s, completed May 1, 2014 10:49:27 PM
And fixed some style problems.
To eliminate saving of rows in interim data.
Can one of the admins verify this patch? |
Possible to forward to Reynold? Thanks! |
Hi, Please add more description on what is the desired affect. |
// AggregateFunction.update(agg: AggregateFunction) in the future. | ||
def createCombiner(row: Row) = mergeValue(newAggregateBuffer(), row) | ||
def mergeValue(buffer: Array[AggregateFunction], row: Row) = { | ||
for (i <- 0 to buffer.length - 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be better to rewrite this using a while loop, since while loops perform much better than for loop in Scala.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@lamuguo the issue is https://issues.apache.org/jira/browse/SPARK-1627 Please update the title of the pull request to: SPARK-1627: Support external aggregation by using Aggregator in Spark SQL |
val group = entry._1 | ||
val data = entry._2 | ||
|
||
for (i <- 0 to data.length - 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again, while loop here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi there,
I made some changes per comments couple of days ago here:
#867 (comment). Please take
another look. Thanks!
Best Regards,
Xiaofeng
On Sat, May 24, 2014 at 8:52 PM, Reynold Xin notifications@github.com
wrote:
In sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala:
override final def next(): Row = {
val currentEntry = hashTableIter.next()
val currentGroup = currentEntry.getKey
- val currentBuffer = currentEntry.getValue
var i = 0
while (i < currentBuffer.length) {
// Evaluating an aggregate buffer returns the result. No row is required since we
// already added all rows in the group using update.
aggregateResults(i) = currentBuffer(i).eval(EmptyRow)
i += 1
val entry = aggIter.next()
val group = entry._1
val data = entry._2
for (i <- 0 to data.length - 1) {
again, while loop here.
—
Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/867/files#r13029115.
Can one of the admins verify this patch? |
This is very helpful when the partition can not fit in memory. However, I think we'd better keep previous implementation; Aggregator in spark is for very typical scenario, but we do have many algorithms / ways to optimize the aggregation in SQL for performance, hence tightly coupled with Spark Aggregator may not a good idea for further improvement. |
I think this has been subsumed by #1822, so we should close this issue for now. If there is anything missing from the implementation there please let us know! |
Can one of the admins verify this patch? |
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html 2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'. 4. Be sure to keep the PR description updated to reflect all changes. 5. Please write your PR title to summarize what this PR proposes. 6. If possible, provide a concise example to reproduce the issue for a faster review. --> ### What changes were proposed in this pull request? Bug fix ### Why are the changes needed? <!-- Please clarify why the changes are needed. For instance, 1. If you propose a new API, clarify the use case for a new API. 2. If you fix a bug, you can clarify why it is a bug. --> ### Does this PR introduce any user-facing change? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If no, write 'No'. --> ### How was this patch tested? Added UT
No description provided.