-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-24935][SQL][followup] support INIT -> UPDATE -> MERGE -> FINISH in Hive UDAF adapter #24459
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #104904 has finished for PR 24459 at commit
|
// Deserializes an `AggregationBuffer` from the shuffled partial aggregation phase to prepare | ||
// for global aggregation by merging multiple partial aggregation results within a single group. | ||
aggBufferSerDe.deserialize(bytes) | ||
HiveUDAFBuffer(aggBufferSerDe.deserialize(bytes), false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once the value of canDoMerge is always set false after deserialization, in the merge() function, the aggregationBuffer will be always re-created even the passed buffer parameter is actually a Partial2 or Final state. This, correct me if I am wrong, is a flaw causing performance downgrade.
May need to do none-trivial work in serialize() to include the state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the deserialized buffer can only appear as the second parameter in merge
, so canDoMerge
doesn't matter here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, except for the case of falling back from hash agg, and that's what you want to address here, and this is not impacting spark udaf. The logic looks clear and good to me, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Logic looks good to me, will perform a few quick tests of my own on the PR and get back to you soon @cloud-fan |
@cloud-fan The fix LGTM. Had a question though. After this change, do we still need to fix the initialization of aggregate buffer for SortBasedAggregate that #24149 originally addressed in the first commit 400db3d ? |
@pgandhi999 Yes we need, but your patch can be simplified after the Hive UDAF issue is fixed. |
…H in Hive UDAF adapter ## What changes were proposed in this pull request? This is a followup of #24144 . #24144 missed one case: when hash aggregate fallback to sort aggregate, the life cycle of UDAF is: INIT -> UPDATE -> MERGE -> FINISH. However, not all Hive UDAF can support it. Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). The buffer for UPDATE may not support MERGE. This PR updates the Hive UDAF adapter in Spark to support INIT -> UPDATE -> MERGE -> FINISH, by turning it to INIT -> UPDATE -> FINISH + IINIT -> MERGE -> FINISH. ## How was this patch tested? a new test case Closes #24459 from cloud-fan/hive-udaf. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 7432e7d) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
thanks, merging to master/2.4! |
Since, apache#24459 fixes the init-update-merge issue, the fix here is reverted.
…H in Hive UDAF adapter ## What changes were proposed in this pull request? This is a followup of apache#24144 . apache#24144 missed one case: when hash aggregate fallback to sort aggregate, the life cycle of UDAF is: INIT -> UPDATE -> MERGE -> FINISH. However, not all Hive UDAF can support it. Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). The buffer for UPDATE may not support MERGE. This PR updates the Hive UDAF adapter in Spark to support INIT -> UPDATE -> MERGE -> FINISH, by turning it to INIT -> UPDATE -> FINISH + IINIT -> MERGE -> FINISH. ## How was this patch tested? a new test case Closes apache#24459 from cloud-fan/hive-udaf. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request? backport #24144 and #24459 to 2.3. ## How was this patch tested? existing tests Closes #24539 from cloud-fan/backport. Lead-authored-by: pgandhi <pgandhi@verizonmedia.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…H in Hive UDAF adapter ## What changes were proposed in this pull request? This is a followup of apache#24144 . apache#24144 missed one case: when hash aggregate fallback to sort aggregate, the life cycle of UDAF is: INIT -> UPDATE -> MERGE -> FINISH. However, not all Hive UDAF can support it. Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). The buffer for UPDATE may not support MERGE. This PR updates the Hive UDAF adapter in Spark to support INIT -> UPDATE -> MERGE -> FINISH, by turning it to INIT -> UPDATE -> FINISH + IINIT -> MERGE -> FINISH. ## How was this patch tested? a new test case Closes apache#24459 from cloud-fan/hive-udaf. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 7432e7d) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…H in Hive UDAF adapter ## What changes were proposed in this pull request? This is a followup of apache#24144 . apache#24144 missed one case: when hash aggregate fallback to sort aggregate, the life cycle of UDAF is: INIT -> UPDATE -> MERGE -> FINISH. However, not all Hive UDAF can support it. Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). The buffer for UPDATE may not support MERGE. This PR updates the Hive UDAF adapter in Spark to support INIT -> UPDATE -> MERGE -> FINISH, by turning it to INIT -> UPDATE -> FINISH + IINIT -> MERGE -> FINISH. ## How was this patch tested? a new test case Closes apache#24459 from cloud-fan/hive-udaf. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 7432e7d) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…H in Hive UDAF adapter ## What changes were proposed in this pull request? This is a followup of apache#24144 . apache#24144 missed one case: when hash aggregate fallback to sort aggregate, the life cycle of UDAF is: INIT -> UPDATE -> MERGE -> FINISH. However, not all Hive UDAF can support it. Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). The buffer for UPDATE may not support MERGE. This PR updates the Hive UDAF adapter in Spark to support INIT -> UPDATE -> MERGE -> FINISH, by turning it to INIT -> UPDATE -> FINISH + IINIT -> MERGE -> FINISH. ## How was this patch tested? a new test case Closes apache#24459 from cloud-fan/hive-udaf. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 7432e7d) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
This is a followup of #24144 . #24144 missed one case: when hash aggregate fallback to sort aggregate, the life cycle of UDAF is: INIT -> UPDATE -> MERGE -> FINISH.
However, not all Hive UDAF can support it. Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the sketches library. The buffer for UPDATE may not support MERGE.
This PR updates the Hive UDAF adapter in Spark to support INIT -> UPDATE -> MERGE -> FINISH, by turning it to INIT -> UPDATE -> FINISH + IINIT -> MERGE -> FINISH.
How was this patch tested?
a new test case