-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Introduce Sum UDAF #10651
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce Sum UDAF #10651
Conversation
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great to me @jayzhan211 -- thank you. I left some small comments, but overall I think it looks really nice.
| // Note if this query ever does start working | ||
| let err = execute(&ctx, sql).await.unwrap_err(); | ||
| assert_contains!(err.to_string(), "This feature is not implemented: Aggregate can not be used as a sliding accumulator because `retract_batch` is not implemented: AggregateUDF { inner: AggregateUDF { name: \"time_sum\", signature: Signature { type_signature: Exact([Timestamp(Nanosecond, None)]), volatility: Immutable }, fun: \"<FUNC>\" } }(t.time) ORDER BY [t.time ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING"); | ||
| assert_contains!(err.to_string(), "This feature is not implemented: Aggregate can not be used as a sliding accumulator because `retract_batch` is not implemented: time_sum(t.time) ORDER BY [t.time ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that is certainly nicer
|
|
||
| /// Coerce arguments of a function call to types that the function can evaluate. | ||
| /// | ||
| /// This function is only called if [`AggregateUDFImpl::signature`] returns [`crate::TypeSignature::UserDefined`]. Most |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
| &[] | ||
| } | ||
|
|
||
| fn create_sliding_accumulator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we please add some documentation here explaining what this is?
Basically I think this is to allow returning a different Accumulator instance that is optimized for sliding windows (e.g. incrementally computing output via https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.Accumulator.html#method.retract_batch
| NotSupported, | ||
| /// The expression is different from the original expression | ||
| Reversed(Arc<dyn AggregateUDFImpl>), | ||
| Reversed(Arc<AggregateUDF>), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
| if fun.to_string() == "first_value" || fun.to_string() == "last_value" { | ||
| assert_eq!(fun.to_string(), name); | ||
| } else { | ||
| assert_eq!(fun.to_string(), name.to_uppercase()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder, maybe we should treat udf names case insensitive way. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#10695 track the issue to rename name to lowercase
datafusion/expr/src/expr_schema.rs
Outdated
| let data_types = args | ||
| .iter() | ||
| .map(|e| e.get_type(schema)) | ||
| .collect::<Result<Vec<_>>>()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this section?. It seems that data_types is already calculated at the outer scope.
| WindowFunctionDefinition::AggregateUDF(udf) => { | ||
| let new_types = data_types_with_aggregate_udf(&data_types, udf).map_err(|err| { | ||
| plan_datafusion_err!( | ||
| "{} and {}", | ||
| err, | ||
| utils::generate_signature_error_msg( | ||
| fun.name(), | ||
| fun.signature().clone(), | ||
| &data_types | ||
| ) | ||
| ) | ||
| })?; | ||
| Ok(fun.return_type(&new_types)?) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should bury this check, and conversion inside to the fun.return_type implementation for WindowFunctionDefinition::AggregateUDF not sure though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer separate coerce_types and return_types given the difference between these two
| let new_types = data_types_with_aggregate_udf(&data_types, fun).map_err(|err| { | ||
| plan_datafusion_err!( | ||
| "{} and {}", | ||
| err, | ||
| utils::generate_signature_error_msg( | ||
| fun.name(), | ||
| fun.signature().clone(), | ||
| &data_types | ||
| ) | ||
| ) | ||
| })?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar comment above applies here
|
|
||
| fn order_bys(&self) -> Option<&[PhysicalSortExpr]> { | ||
| (!self.ordering_req.is_empty()).then_some(&self.ordering_req) | ||
| if self.fun.has_ordering_requirements() && !self.ordering_req.is_empty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if has_ordering_requirements is not introduced in this PR. I think, with the order_sensitivity API. This API is redundant. I think, it is better to remove this API (in this Pr or in subsequent ones).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. We can returns None for AggregateOrderSensitivity::Insensitive, ordering_req if others
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we change the default order sensitivity to AggregateOrderSensitivity::Insensitive. For example, I think Sum would expect AggregateOrderSensitivity::Insensitive. And, probably only first/last, nth value and agg_order would expect other kinds of AggregateOrderSensitivity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that, for most of the aggregate functions, AggregateOrderSensitivity::Insensitive is the correct behavior. However, I think the safest default choice is AggregateOrderSensitivity::HardRequirement. Also, as long as there is no requirement, default is not important. Hence, I think we can be strict in this choice.
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
mustafasrepo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!. Thanks @jayzhan211 for this PR.
|
🚀 |
|
Are we tracking the follow on work (to remove the built in sum) anywhere? |
| (AggregateFunction::Sum, _) => { | ||
| return internal_err!("Builtin Sum will be removed"); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @jayzhan211! I saw @alamb already asked the same question here last week. I'd like to follow-up and check if you have any plans to remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@appletreeisyellow You can checkout main branch it should be removed in #10831
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jayzhan211 Thank you so much! 💯
* move accumulate Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * move prim_op Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * move test to slt Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * remove sum distinct Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * move sum aggregate Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * fix args Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * add sum Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * merge fix Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * fix sum sig Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * todo: wait ahash merge Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * rebase Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * disable ordering req by default Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * check arg count Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * rm old workflow Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * fmt Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * fix failed test Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * doc and fmt Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * check udaf first Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * fmt Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * fix ci Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * fix ci Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * fix ci Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * fix err msg AGAIN Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * rm sum in builtin test which covered in sql Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * proto for window with udaf Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * fix slt Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * fmt Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * fix err msg Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * fix exprfn Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * fix ciy Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * fix ci Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * rename first/last to lowercase Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * skip sum Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * fix firstvalue Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * clippy Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * add doc Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * rm has_ordering_req Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * default hard req Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * insensitive for sum Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * cleanup duplicate code Signed-off-by: jayzhan211 <jayzhan211@gmail.com> * Re-introduce check --------- Signed-off-by: jayzhan211 <jayzhan211@gmail.com> Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
apache/datafusion#10651 Don't know if the built-in sum used to work on intervals, but the UDAF does not.
Which issue does this PR close?
Closes #.
Rationale for this change
What changes are included in this PR?
AccumulatorArgstocreate_groups_accumulatorThis PR only introduce Sum UDAF, remove builtin is not included to keep the PR small.
Are these changes tested?
Are there any user-facing changes?