Skip to content

[SPARK-24935][SQL][followup] support INIT -> UPDATE -> MERGE -> FINISH in Hive UDAF adapter #24459

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

This is a followup of #24144 . #24144 missed one case: when hash aggregate fallback to sort aggregate, the life cycle of UDAF is: INIT -> UPDATE -> MERGE -> FINISH.

However, not all Hive UDAF can support it. Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the sketches library. The buffer for UPDATE may not support MERGE.

This PR updates the Hive UDAF adapter in Spark to support INIT -> UPDATE -> MERGE -> FINISH, by turning it to INIT -> UPDATE -> FINISH + IINIT -> MERGE -> FINISH.

How was this patch tested?

a new test case

@cloud-fan
Copy link
Contributor Author

cc @pgandhi999 @m44444

@SparkQA
Copy link

SparkQA commented Apr 25, 2019

Test build #104904 has finished for PR 24459 at commit 3d523b6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class HiveUDAFBuffer(buf: AggregationBuffer, canDoMerge: Boolean)

// Deserializes an `AggregationBuffer` from the shuffled partial aggregation phase to prepare
// for global aggregation by merging multiple partial aggregation results within a single group.
aggBufferSerDe.deserialize(bytes)
HiveUDAFBuffer(aggBufferSerDe.deserialize(bytes), false)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the value of canDoMerge is always set false after deserialization, in the merge() function, the aggregationBuffer will be always re-created even the passed buffer parameter is actually a Partial2 or Final state. This, correct me if I am wrong, is a flaw causing performance downgrade.
May need to do none-trivial work in serialize() to include the state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the deserialized buffer can only appear as the second parameter in merge, so canDoMerge doesn't matter here.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, except for the case of falling back from hash agg, and that's what you want to address here, and this is not impacting spark udaf. The logic looks clear and good to me, thanks!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@pgandhi999
Copy link

Logic looks good to me, will perform a few quick tests of my own on the PR and get back to you soon @cloud-fan

@pgandhi999
Copy link

@cloud-fan The fix LGTM. Had a question though. After this change, do we still need to fix the initialization of aggregate buffer for SortBasedAggregate that #24149 originally addressed in the first commit 400db3d ?

@cloud-fan
Copy link
Contributor Author

@pgandhi999 Yes we need, but your patch can be simplified after the Hive UDAF issue is fixed.

cloud-fan added a commit that referenced this pull request Apr 30, 2019
…H in Hive UDAF adapter

## What changes were proposed in this pull request?

This is a followup of #24144 . #24144 missed one case: when hash aggregate fallback to sort aggregate, the life cycle of UDAF is: INIT -> UPDATE -> MERGE -> FINISH.

However, not all Hive UDAF can support it. Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). The buffer for UPDATE may not support MERGE.

This PR updates the Hive UDAF adapter in Spark to support INIT -> UPDATE -> MERGE -> FINISH, by turning it to  INIT -> UPDATE -> FINISH + IINIT -> MERGE -> FINISH.

## How was this patch tested?

a new test case

Closes #24459 from cloud-fan/hive-udaf.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 7432e7d)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan
Copy link
Contributor Author

thanks, merging to master/2.4!

@cloud-fan cloud-fan closed this in 7432e7d Apr 30, 2019
pgandhi999 pushed a commit to pgandhi999/spark that referenced this pull request Apr 30, 2019
Since, apache#24459 fixes the init-update-merge issue, the fix here is reverted.
cloud-fan added a commit to cloud-fan/spark that referenced this pull request May 6, 2019
…H in Hive UDAF adapter

## What changes were proposed in this pull request?

This is a followup of apache#24144 . apache#24144 missed one case: when hash aggregate fallback to sort aggregate, the life cycle of UDAF is: INIT -> UPDATE -> MERGE -> FINISH.

However, not all Hive UDAF can support it. Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). The buffer for UPDATE may not support MERGE.

This PR updates the Hive UDAF adapter in Spark to support INIT -> UPDATE -> MERGE -> FINISH, by turning it to  INIT -> UPDATE -> FINISH + IINIT -> MERGE -> FINISH.

## How was this patch tested?

a new test case

Closes apache#24459 from cloud-fan/hive-udaf.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
dongjoon-hyun pushed a commit that referenced this pull request May 6, 2019
## What changes were proposed in this pull request?

backport #24144 and #24459 to 2.3.

## How was this patch tested?

existing tests

Closes #24539 from cloud-fan/backport.

Lead-authored-by: pgandhi <pgandhi@verizonmedia.com>
Co-authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Jul 23, 2019
…H in Hive UDAF adapter

## What changes were proposed in this pull request?

This is a followup of apache#24144 . apache#24144 missed one case: when hash aggregate fallback to sort aggregate, the life cycle of UDAF is: INIT -> UPDATE -> MERGE -> FINISH.

However, not all Hive UDAF can support it. Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). The buffer for UPDATE may not support MERGE.

This PR updates the Hive UDAF adapter in Spark to support INIT -> UPDATE -> MERGE -> FINISH, by turning it to  INIT -> UPDATE -> FINISH + IINIT -> MERGE -> FINISH.

## How was this patch tested?

a new test case

Closes apache#24459 from cloud-fan/hive-udaf.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 7432e7d)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Jul 25, 2019
…H in Hive UDAF adapter

## What changes were proposed in this pull request?

This is a followup of apache#24144 . apache#24144 missed one case: when hash aggregate fallback to sort aggregate, the life cycle of UDAF is: INIT -> UPDATE -> MERGE -> FINISH.

However, not all Hive UDAF can support it. Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). The buffer for UPDATE may not support MERGE.

This PR updates the Hive UDAF adapter in Spark to support INIT -> UPDATE -> MERGE -> FINISH, by turning it to  INIT -> UPDATE -> FINISH + IINIT -> MERGE -> FINISH.

## How was this patch tested?

a new test case

Closes apache#24459 from cloud-fan/hive-udaf.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 7432e7d)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Aug 1, 2019
…H in Hive UDAF adapter

## What changes were proposed in this pull request?

This is a followup of apache#24144 . apache#24144 missed one case: when hash aggregate fallback to sort aggregate, the life cycle of UDAF is: INIT -> UPDATE -> MERGE -> FINISH.

However, not all Hive UDAF can support it. Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). The buffer for UPDATE may not support MERGE.

This PR updates the Hive UDAF adapter in Spark to support INIT -> UPDATE -> MERGE -> FINISH, by turning it to  INIT -> UPDATE -> FINISH + IINIT -> MERGE -> FINISH.

## How was this patch tested?

a new test case

Closes apache#24459 from cloud-fan/hive-udaf.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 7432e7d)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants