-
Notifications
You must be signed in to change notification settings - Fork 1.8k
add window expression stream, delegated window aggregation to aggregate functions, and implement row_number
#375
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
90daf11 to
4ad6e4d
Compare
Codecov Report
@@ Coverage Diff @@
## master #375 +/- ##
==========================================
+ Coverage 74.85% 75.34% +0.49%
==========================================
Files 146 147 +1
Lines 24565 24782 +217
==========================================
+ Hits 18387 18673 +286
+ Misses 6178 6109 -69
Continue to review full report at Codecov.
|
|
Thank @jimexist -- I plan to review this carefully tomorrow |
90a22fa to
6e4d662
Compare
| pub trait WindowAccumulator: Send + Sync + Debug { | ||
| /// scans the accumulator's state from a vector of scalars, similar to Accumulator it also | ||
| /// optionally generates values. | ||
| fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would help me if you could explain where the "window" for the window function appears in this trait. I assume you already have a design in mind, so I figured I would ask here
I am thinking about a query like the following
select sum(value) OVER (ROWS 5 PRECEDING) FROM ....
I think in this case, you end up with 10 aggregate values from 10 different windows, in the the following manner:
1 2 3 4 5 6 7 8 9 input
window 1 ─
window 2 ───
window 3 ─────
window 4 ───────
window 5 ─────────
window 6 ─────────
window 7 ─────────
window 8 ─────────
window 9 ─────────
I would have expected the WindowAccumulator to have functions like
/// Add a new row to the current window
fn new_row_in_window(ScalarValue);
/// remove a row from the current window
fn remove_row_from_window(ScalarValue);
/// The current value of this function for the given window
fn current_value(ScalarValue);Or possibly something like
evaluate(window: &[ArrayRef]) -> ScalarValueThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb good question!
The window word in this trait is purely indicating the fact that window functions will use this. it can be of a better name but...
for a design, there are two complications:
- multiple window functions, each having different window frames, can be scanning batches at the same time, so i'd want each to create its own window accumulator, remembering to push_back, and remove front, on their own; this trait would not put that into the API, it just scans
- specifically for window that peeks ahead, because batches arrive in async stream, it is not feasible to peek, so my plan is to allow them to optionally execute a one time shift upon finishing up (e.g.
leadis just producing the same value in situ, but with a one time shift at the end)
Due to 1 and 2, a best possible state vector for window accumulator is possibly VecDequeue. And the name scan is because accumulators iterate the list and either optionally produce one value at a time (max with order by), or exactly one accumulated value upon finish scanning (max with empty over), but not both
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see -- looking at how these functions are used for nth_value here helped me get a better sense for how these traits work
I think as you go through the implementation, adding some additional details to help future implementers of this trait. For example, when it is ok to return values from scan() or scan_batch() and what the expected total number of rows produced?
However having several examples of of implemented window functions I think will help too so no need to change anything more at this time.
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| // use super::*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think tests would be good to have
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i try to showcase the design here, so if the interface is generally okay with reviewers, i can go ahead and write down tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
8e6c181 to
b439b0e
Compare
row_number
|
local test: |
1b82f25 to
563192a
Compare
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally think this is a great start. The structure is clear and I think the follow on PRs will make it even better. Thank you so much @jimexist
I am still a bit confused about what the requirements are while implementing a WindowAccumulator but I think they will become clearer as we add more window aggregate support.
I think we should merge this one in. cc @Dandandan @andygrove @jorgecarleitao
| pub trait WindowAccumulator: Send + Sync + Debug { | ||
| /// scans the accumulator's state from a vector of scalars, similar to Accumulator it also | ||
| /// optionally generates values. | ||
| fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see -- looking at how these functions are used for nth_value here helped me get a better sense for how these traits work
I think as you go through the implementation, adding some additional details to help future implementers of this trait. For example, when it is ok to return values from scan() or scan_batch() and what the expected total number of rows produced?
However having several examples of of implemented window functions I think will help too so no need to change anything more at this time.
| min(c3) over () \ | ||
| from aggregate_test_100 limit 5"; | ||
| let actual = execute(&mut ctx, sql).await; | ||
| let expected = vec![ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
|
|
||
| for i in 0..(schema.fields().len() - window_expr.len()) { | ||
| let col = concat( | ||
| &original_batches |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this is only needed for the empty over clause right, as in other cases we probably only want to access the rows.
Also for functions like RowNumber I think we could be streaming/emit batches instead of "caching" them as we don't need to scan all the rows in order to compute the result. Both good in this PR or an issue / follow up work!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes I am aware of the potential changes on how rows are scanned but plan to leave this as is because down the road (when adding partition by and order by) things could change a lot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like this to gradually fledge out.
datafusion/tests/sql.rs
Outdated
| count(c3) over (), \ | ||
| max(c3) over (), \ | ||
| min(c3) over () \ | ||
| from aggregate_test_100 limit 5"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think an explicit order by is required (or sorting of results) to make this deterministic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now the ordering is "just" a implementation detail of the batches getting merged / concatenated into 1 batch and keeping the same ordering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will add sort by c1
| &mut self, | ||
| num_rows: usize, | ||
| values: &[ArrayRef], | ||
| ) -> Result<Option<ArrayRef>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get completely when this scan_batch is being used instead of the scan_batch of the implementations? Or can this be removed by now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's a catch-all implementations. with more window functions implemented it'll be clearer and by that time we can feel free to remove this.
Dandandan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a good start / continuation - thanks @jimexist for this amazing contribution!
I think this is good to go - also feel free addressing some of the comments within this PR if you want.
commit 7fb3640 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 16:38:25 2021 +0800 row number done commit 1723926 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 16:05:50 2021 +0800 add row number commit bf5b8a5 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 15:04:49 2021 +0800 save commit d2ce852 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 14:53:05 2021 +0800 add streams commit 0a861a7 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 20 22:28:34 2021 +0800 save stream commit a9121af Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 20 22:01:51 2021 +0800 update unit test commit 2af2a27 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 14:25:12 2021 +0800 fix unit test commit bb57c76 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 14:23:34 2021 +0800 use upper case commit 5d96e52 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 14:16:16 2021 +0800 fix unit test commit 1ecae8f Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 12:27:26 2021 +0800 fix unit test commit bc2271d Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 10:04:29 2021 +0800 fix error commit 880b94f Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 08:24:00 2021 +0800 fix unit test commit 4e792e1 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 08:05:17 2021 +0800 fix test commit c36c04a Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 00:07:54 2021 +0800 add more tests commit f5e64de Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 20 23:41:36 2021 +0800 update commit a1eae86 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 20 23:36:15 2021 +0800 enrich unit test commit 0d2a214 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 20 23:25:43 2021 +0800 adding filter by todo commit 8b486d5 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 20 23:17:22 2021 +0800 adding more built-in functions commit abf08cd Author: Jiayu Liu <Jimexist@users.noreply.github.com> Date: Thu May 20 22:36:27 2021 +0800 Update datafusion/src/physical_plan/window_functions.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> commit 0cbca53 Author: Jiayu Liu <Jimexist@users.noreply.github.com> Date: Thu May 20 22:34:57 2021 +0800 Update datafusion/src/physical_plan/window_functions.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> commit 831c069 Author: Jiayu Liu <Jimexist@users.noreply.github.com> Date: Thu May 20 22:34:04 2021 +0800 Update datafusion/src/logical_plan/builder.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> commit f70c739 Author: Jiayu Liu <Jimexist@users.noreply.github.com> Date: Thu May 20 22:33:04 2021 +0800 Update datafusion/src/logical_plan/builder.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> commit 3ee87aa Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Wed May 19 22:55:08 2021 +0800 fix unit test commit 5c4d92d Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Wed May 19 22:48:26 2021 +0800 fix clippy commit a0b7526 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Wed May 19 22:46:38 2021 +0800 fix unused imports commit 1d3b076 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 13 18:51:14 2021 +0800 add window expr
Closes apache#375 - enable decimal to decimal - remove hard coded castoptions to pass to native execution - fixed castTest to match arrow invalid argument error with spark's Number out of range error.
This partly fix-es #298