-
Notifications
You must be signed in to change notification settings - Fork 1.5k
feat: Add Aggregate UDF to FFI crate #14775
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
9aae315
to
77fd002
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is my first substantial review of someone's else code. Please let me know if I am not following proper steps in reviewing!
@m09526 Thank you for the review! I've been out of country all last week, so I will try to get back to this soon now that I'm back. |
Let me know when this is ready for review |
Moved to draft until I get time to address the comments above. Additionally I discovered that |
The above |
Now that #15487 is resolved, this is unblocked. |
Shall we mark it as "ready for review"? |
@timsaucer @alamb Should be good for review now. Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent work! Thank you so much for pulling this over the line.
I have a few suggestions, but the core functionality looks correct to me. As one of the authors I cannot approve the pull request.
pub struct FFI_GroupsAccumulator { | ||
pub update_batch: unsafe extern "C" fn( | ||
accumulator: &mut Self, | ||
values: RVec<WrappedArray>, | ||
group_indices: RVec<usize>, | ||
opt_filter: ROption<WrappedArray>, | ||
total_num_groups: usize, | ||
) -> RResult<(), RString>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to add documentation for the struct, but it should be very similar to the others except pointing to GroupsAccumulator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would personally suggest not implementing the GroupsAccumulator in the first FFI implementation -- it is hard to implement correctly as is.
Perhaps we can avoid this for now 🤔
Though I suppose GroupsAccumulator is basically required for high performance aggregation 🤔
@robtandy As someone keenly interested in the FFI work, would you be able to do a review? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
THanks @timsaucer -- this looks like it is pretty close. It has a few unresolved comments, but then would be ready to go it seems
pub struct FFI_GroupsAccumulator { | ||
pub update_batch: unsafe extern "C" fn( | ||
accumulator: &mut Self, | ||
values: RVec<WrappedArray>, | ||
group_indices: RVec<usize>, | ||
opt_filter: ROption<WrappedArray>, | ||
total_num_groups: usize, | ||
) -> RResult<(), RString>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would personally suggest not implementing the GroupsAccumulator in the first FFI implementation -- it is hard to implement correctly as is.
Perhaps we can avoid this for now 🤔
Though I suppose GroupsAccumulator is basically required for high performance aggregation 🤔
ec4dda9
to
4282c2a
Compare
@alamb I believe this one is ready. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @timsaucer -- I think this looks good to me. I didn't have time to review all the APIs in detail, but I think we need to get the big initial PR in and then work on fixing any blemishes over time
Thank you very much @m09526 for the review |
I merged up one more time to make sure the CI tests pass but assuming they do I plan to merge this one in |
gogogo |
* Work in progress adding user defined aggregate function FFI support * Intermediate work. Going through groups accumulator * MVP for aggregate udf via FFI * Clean up after rebase * Add unit test for FFI Accumulator Args * Adding unit tests and fixing memory errors in aggregate ffi udf * Working through additional unit and integration tests for UDAF ffi * Switch to a accumulator that supports convert to state to get a little better coverage * Set feature so we do not get an error warning in stable rustc * Add more options to test * Add unit test for FFI RecordBatchStream * Add a few more args to ffi accumulator test fn * Adding more unit tests on ffi aggregate udaf * taplo format * Update code comment * Correct function name * Temp fix record batch test dependencies * Address some comments * Revise comments and address PR comments * Remove commented code * Refactor GroupsAccumulator * Add documentation * Split integration tests * Address comments to refactor error handling for opt filter * Fix linting errors * Fix linting and add deref * Remove extra tests and unnecessary code * Adjustments to FFI aggregate functions after rebase on main * cargo fmt * cargo clippy * Re-implement cleaned up code that was removed in last push * Minor review comments --------- Co-authored-by: Crystal Zhou <crystal.zhouxiaoyue@hotmail.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Which issue does this PR close?
This PR addresses part of #14562
Rationale for this change
This change allows for using user defined aggregate functions across FFI boundaries. It is useful for enabling shared libraries to pass functions back and forth. This feature will unlock:
datafusion-python
What changes are included in this PR?
This PR follows the same pattern as the previous FFI code. It exposes AggregateUDF and the Accumulators and AccumulatorArgs that go along with it.
Are these changes tested?
Included in this PR are both unit tests and two integration tests. The integration tests cover both grouping and non-grouping UDAFs. For the unit tests here is a tarpaulin report showing the coverage. The 12% not covered is almost entirely error handling.
Are there any user-facing changes?
There are no changes to existing API but new functions are exposed in the
datafusion-ffi
crate.