Skip to content

Conversation

@alamb
Copy link
Contributor

@alamb alamb commented Jun 14, 2023

Which issue does this PR close?

Closes #6611

Rationale for this change

When used as window functions, AggregateUDF can not use the retract_batch API for sliding window calculations

What changes are included in this PR?

  1. Add a new supports_retract_batch in Accumulator trait (to mirror the API added by @mustafasrepo for Window Functions)
  2. If the Accumulator reports it supports retract_batch, use the sliding accumulator code

Are these changes tested?

Yes

Are there any user-facing changes?

New API, though they should all be backwards compatible

@alamb alamb marked this pull request as draft June 14, 2023 19:56
@github-actions github-actions bot added core Core DataFusion crate logical-expr Logical plan and expressions labels Jun 14, 2023
@alamb alamb force-pushed the alamb/sliding_udf_real branch from 6282c54 to 5198ae8 Compare June 14, 2023 19:59
@alamb alamb changed the title Allow AggregateUDF to define retractable batch @alamb Allow AggregateUDF to define retractable batch Jun 14, 2023
Rationale:

The default implementation of the `Accumulator` trait returns an error
for the `retract_batch` API.
@alamb alamb force-pushed the alamb/sliding_udf_real branch from 5198ae8 to 0d5adc4 Compare June 14, 2023 21:14
@alamb alamb force-pushed the alamb/sliding_udf_real branch from 0d5adc4 to 579b4d9 Compare June 14, 2023 21:19

/// Returns true if this can support sliding accumulators
pub fn retractable(&self) -> Result<bool> {
Ok((self.fun.accumulator)(&self.data_type)?.supports_retract_batch())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is non ideal that we need to instanatiate the accumulator just to check if it supports retract batch, but that is consistent with the window function API so I think that is ok

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can fix this and its window function counterpart in a refactor. We discussed this instantiation thing in a meeting recently and we are not super happy with it either. Let's get the basics in and then we can work on the rough edges.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another other formulation I tried looked like the following. The upside is it avoided the instantiation, but the downside is that it an API change and is not consistent with the window functions

pub struct AggregateUDF {
    /// name
    pub name: String,
    /// Signature (input arguments)
    pub signature: Signature,
    /// Return type
    pub return_type: ReturnTypeFunction,
    /// Return an accumulator without retract batch
    pub accumulator: AccumulatorFunctionImplementation,
    /// Return an accumulator with retract batch
    pub retractable_accumulator: Option<AccumulatorFunctionImplementation>,  <---- this is added
    /// the accumulator's state's description as a function of the return type
    pub state_type: StateTypeFunction,
}


let TestContext { ctx, test_state } = TestContext::new_with_test_state(test_state);
let sql = "SELECT time_sum(time) OVER(ORDER BY time ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as time_sum from t";
// TODO: It is not clear why this is a different value than when retract batch is used
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why the UDWF gets different answers when implemented with retract_batch 🤔 maybe @mustafasrepo can offer some hints or insights

Copy link
Contributor

@mustafasrepo mustafasrepo Jun 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accumulators when they have window frame startings different than UNBOUNDED PRECEDING such as 1 preceding, need to implement retract_batch method, to be able to run correctly (If this method is lacking there is no way to calculate result). Consider the query

SELECT SUM(a) OVER(ORDER BY a ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as sum_a from t

First sum value will be the sum of rows between [0, 1),
Second sum value will be the sum of rows between [0, 2),
Third sum value will be the sum of rows between [1, 3), etc.
Since accumulator keeps in its state, running sum
for first sum we add to the state sum value between [0, 1)
for second sum we add to the state sum value between [1, 2) ([0, 1) is already in the state sum, hence running sum will cover [0, 2) range)
for third sum we add to the state sum value between [2, 3) ([0, 2) is already in the state sum). Also we need to retract values between [0, 1) by this way we can obtain sum between [1, 3) which is indeed the apropriate range.

When we use UNBOUNDED PRECEDING in the query starting index will always be 0 for the desired range. Hence we will never call retract_batch method. In this case having retract_batch is not a requirement.

This approach is a a bit different than window function approach. In window function(when they use frame) they get all the desired range during evaluation. If we were to have a window_function sum we could have calculated sum from scratch each time inside given range. However, for accumulators this is the case, and above query should give error if retract_batch is not implemented. During review I have pinpointed the section where I think this error is prevented

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you -- I am glad I asked!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this some more and have made queries with unbounded windows without retract_batch error as you suggested in 999866e -- much better to get an error rather than the wrong answer.

It seems to me that there is no fundamental reason DataFusion couldn't support sliding windows with user defined aggregate functions without retract_batch -- for example it could make a new Accumulator instance for each row and feed in the entire widow.

However, given it is likely to be much more efficient to implement retract_batch, that is what I think we should encourage people to do. If someone really needs to use a UDAF without retract_batch for a sliding window, we can look into implementing the "full recompute" solution at that time

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this some more and have made queries with unbounded windows without retract_batch error as you suggested in 999866e -- much better to get an error rather than the wrong answer.

It seems to me that there is no fundamental reason DataFusion couldn't support sliding windows with user defined aggregate functions without retract_batch -- for example it could make a new Accumulator instance for each row and feed in the entire widow.

However, given it is likely to be much more efficient to implement retract_batch, that is what I think we should encourage people to do. If someone really needs to use a UDAF without retract_batch for a sliding window, we can look into implementing the "full recompute" solution at that time

Agreed. Also, users can define their UDWF (User defined window function). When UDWF's uses_window_frame flag is true. evaluate method of PartitionEvaluator is like "full recompute" approach.

order_by,
window_frame,
)),
WindowFunction::AggregateUDF(fun) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The major difference with this PR and what @stuartcarnie did is that this PR still allows AggregateUDFs to run as window functions even if they don't implement retract_batch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: it turns out it is not correct to run AggregateUDFs as sliding window function if they don't implement retract_batch which I have made clear in comments and the tests

))
}

/// Does the accumulator support incrementally updating its value
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a new API addition, but it should be backwards compatible

@alamb alamb changed the title Allow AggregateUDF to define retractable batch Allow AggregateUDF to define retractable batch , efficiently implement window functions Jun 14, 2023
@alamb alamb marked this pull request as ready for review June 14, 2023 21:20
Copy link
Contributor

@stuartcarnie stuartcarnie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great, @alamb. I updated my IOx code to point to this branch, and my existing UDAFs work great 👍🏻

let aggregate =
udaf::create_aggregate_expr(fun.as_ref(), args, input_schema, name)?;

if !unbounded_window && aggregate.retractable()? {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this line should be

Suggested change
if !unbounded_window && aggregate.retractable()? {
if !unbounded_window {

If aggregate doesn't implement retract we should get an error. There is no way to run aggregator whose start is different than (UNBOUNDED PRECEDING) without retract_batch method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will make this change -- thank you @mustafasrepo -- I didn't appreciate that there is no correct way to run the AggregateUDF correctly without retract_batch. Thus I think the correct solution is to return an error when such a combination is attempted. I will update the PR to reflect this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in 999866e

@mustafasrepo
Copy link
Contributor

mustafasrepo commented Jun 15, 2023

Also I think with the introduction of supports_retract_batch method, supports_bounded_execution method is no longer necessary for the AggregateExpr trait. I filed a small PR on top this PR to show required changes for removing supports_bounded_execution method from AggregateExpr trait.

More detailed explanation and PR can be found in #PR13

Other than the points in my review. This PR is LGTM!. With these changes users can write accumulators with retract_batch method, then can use it in aggregate and/or window queries.

@alamb alamb changed the title Allow AggregateUDF to define retractable batch , efficiently implement window functions Allow AggregateUDF to define retractable batch , implement sliding window functions Jun 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate logical-expr Logical plan and expressions

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support sliding windows in AggregateUDF / User Defined Window Functions

4 participants