-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Allow AggregateUDF to define retractable batch , implement sliding window functions
#6671
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
6282c54 to
5198ae8
Compare
AggregateUDF to define retractable batch @alambAggregateUDF to define retractable batch
Rationale: The default implementation of the `Accumulator` trait returns an error for the `retract_batch` API.
5198ae8 to
0d5adc4
Compare
0d5adc4 to
579b4d9
Compare
|
|
||
| /// Returns true if this can support sliding accumulators | ||
| pub fn retractable(&self) -> Result<bool> { | ||
| Ok((self.fun.accumulator)(&self.data_type)?.supports_retract_batch()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is non ideal that we need to instanatiate the accumulator just to check if it supports retract batch, but that is consistent with the window function API so I think that is ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can fix this and its window function counterpart in a refactor. We discussed this instantiation thing in a meeting recently and we are not super happy with it either. Let's get the basics in and then we can work on the rough edges.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another other formulation I tried looked like the following. The upside is it avoided the instantiation, but the downside is that it an API change and is not consistent with the window functions
pub struct AggregateUDF {
/// name
pub name: String,
/// Signature (input arguments)
pub signature: Signature,
/// Return type
pub return_type: ReturnTypeFunction,
/// Return an accumulator without retract batch
pub accumulator: AccumulatorFunctionImplementation,
/// Return an accumulator with retract batch
pub retractable_accumulator: Option<AccumulatorFunctionImplementation>, <---- this is added
/// the accumulator's state's description as a function of the return type
pub state_type: StateTypeFunction,
}|
|
||
| let TestContext { ctx, test_state } = TestContext::new_with_test_state(test_state); | ||
| let sql = "SELECT time_sum(time) OVER(ORDER BY time ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as time_sum from t"; | ||
| // TODO: It is not clear why this is a different value than when retract batch is used |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure why the UDWF gets different answers when implemented with retract_batch 🤔 maybe @mustafasrepo can offer some hints or insights
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accumulators when they have window frame startings different than UNBOUNDED PRECEDING such as 1 preceding, need to implement retract_batch method, to be able to run correctly (If this method is lacking there is no way to calculate result). Consider the query
SELECT SUM(a) OVER(ORDER BY a ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as sum_a from tFirst sum value will be the sum of rows between [0, 1),
Second sum value will be the sum of rows between [0, 2),
Third sum value will be the sum of rows between [1, 3), etc.
Since accumulator keeps in its state, running sum
for first sum we add to the state sum value between [0, 1)
for second sum we add to the state sum value between [1, 2) ([0, 1) is already in the state sum, hence running sum will cover [0, 2) range)
for third sum we add to the state sum value between [2, 3) ([0, 2) is already in the state sum). Also we need to retract values between [0, 1) by this way we can obtain sum between [1, 3) which is indeed the apropriate range.
When we use UNBOUNDED PRECEDING in the query starting index will always be 0 for the desired range. Hence we will never call retract_batch method. In this case having retract_batch is not a requirement.
This approach is a a bit different than window function approach. In window function(when they use frame) they get all the desired range during evaluation. If we were to have a window_function sum we could have calculated sum from scratch each time inside given range. However, for accumulators this is the case, and above query should give error if retract_batch is not implemented. During review I have pinpointed the section where I think this error is prevented
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you -- I am glad I asked!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this some more and have made queries with unbounded windows without retract_batch error as you suggested in 999866e -- much better to get an error rather than the wrong answer.
It seems to me that there is no fundamental reason DataFusion couldn't support sliding windows with user defined aggregate functions without retract_batch -- for example it could make a new Accumulator instance for each row and feed in the entire widow.
However, given it is likely to be much more efficient to implement retract_batch, that is what I think we should encourage people to do. If someone really needs to use a UDAF without retract_batch for a sliding window, we can look into implementing the "full recompute" solution at that time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this some more and have made queries with unbounded windows without retract_batch error as you suggested in 999866e -- much better to get an error rather than the wrong answer.
It seems to me that there is no fundamental reason DataFusion couldn't support sliding windows with user defined aggregate functions without
retract_batch-- for example it could make a newAccumulatorinstance for each row and feed in the entire widow.However, given it is likely to be much more efficient to implement
retract_batch, that is what I think we should encourage people to do. If someone really needs to use a UDAF withoutretract_batchfor a sliding window, we can look into implementing the "full recompute" solution at that time
Agreed. Also, users can define their UDWF (User defined window function). When UDWF's uses_window_frame flag is true. evaluate method of PartitionEvaluator is like "full recompute" approach.
| order_by, | ||
| window_frame, | ||
| )), | ||
| WindowFunction::AggregateUDF(fun) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The major difference with this PR and what @stuartcarnie did is that this PR still allows AggregateUDFs to run as window functions even if they don't implement retract_batch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: it turns out it is not correct to run AggregateUDFs as sliding window function if they don't implement retract_batch which I have made clear in comments and the tests
| )) | ||
| } | ||
|
|
||
| /// Does the accumulator support incrementally updating its value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is a new API addition, but it should be backwards compatible
AggregateUDF to define retractable batch AggregateUDF to define retractable batch , efficiently implement window functions
stuartcarnie
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great, @alamb. I updated my IOx code to point to this branch, and my existing UDAFs work great 👍🏻
| let aggregate = | ||
| udaf::create_aggregate_expr(fun.as_ref(), args, input_schema, name)?; | ||
|
|
||
| if !unbounded_window && aggregate.retractable()? { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this line should be
| if !unbounded_window && aggregate.retractable()? { | |
| if !unbounded_window { |
If aggregate doesn't implement retract we should get an error. There is no way to run aggregator whose start is different than (UNBOUNDED PRECEDING) without retract_batch method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will make this change -- thank you @mustafasrepo -- I didn't appreciate that there is no correct way to run the AggregateUDF correctly without retract_batch. Thus I think the correct solution is to return an error when such a combination is attempted. I will update the PR to reflect this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in 999866e
|
Also I think with the introduction of More detailed explanation and PR can be found in #PR13 Other than the points in my review. This PR is LGTM!. With these changes users can write accumulators with |
…_batch is used as a sliding accumulator
AggregateUDF to define retractable batch , efficiently implement window functionsAggregateUDF to define retractable batch , implement sliding window functions
Which issue does this PR close?
Closes #6611
Rationale for this change
When used as window functions,
AggregateUDFcan not use theretract_batchAPI for sliding window calculationsWhat changes are included in this PR?
supports_retract_batchinAccumulatortrait (to mirror the API added by @mustafasrepo for Window Functions)retract_batch, use the sliding accumulator codeAre these changes tested?
Yes
Are there any user-facing changes?
New API, though they should all be backwards compatible