-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pipeline-friendly Bounded Memory Window Executor #4777
Conversation
* partition by refactor * minor changes * Unnecessary tuple to Range conversion is removed * move transpose under common
…a-ai/arrow-datafusion into feature/sort_removal_rule # Conflicts: # datafusion/physical-expr/src/aggregate/count.rs # datafusion/physical-expr/src/aggregate/mod.rs # datafusion/physical-expr/src/aggregate/sum.rs # datafusion/physical-expr/src/window/aggregate.rs
I am very happy that we are finally sending this upstream. We have been working on making windows pipeline-friendly for a while now, and this provides a great foundation for that. There are a few things we will improve with follow-on PRs (e.g. extending this to support GROUPS mode), but the core functionality is already there. Looking forward to receiving feedback! |
Thank you for this PR -- I will try and review it later this weekend but I may not have time until early next week |
Starting to check this out |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @mustafasrepo
I went through this PR as carefully as I could given its size. Sorry for the delay in review -- finding the contiguous uninterrupted time to review has been hard. I had some suggestions but overall I think it could also be merged as is. As in your past PRs I found it well commented, well tested, and overall a pleasure to read.
I did not review all the logic in the window functions but I did review the tests carefully as well as skimmed all the code.
which shows that overall performance improves. This is due to searching RANGE boundaries in a smaller batch since we maintain a bounded state.
Very impressive benchmark results
datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
Outdated
Show resolved
Hide resolved
Thank you for the detailed review! We will go through your reviews and let you know when we are done so you can merge. |
@alamb, we just did our final review with @mustafasrepo and this is good to go. Feel free to merge after CI passes 🚀 |
Will do -- thank you @mustafasrepo and @ozankabak |
Benchmark runs are scheduled for baseline = e1dc962 and contender = 80abc94. 80abc94 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Improves the situation on #4285.
Rationale for this change
NOTE: Below discussion is a simplification of a more detailed exposition in the streaming execution proposal.
Unlike how the current implementation works, queries involving window expressions can actually be executed without materializing the entire table in memory when certain conditions are met. These conditions for a bounded implementation are as follows:
WindowExec
with bounded memory (without seeing the whole table), window frame boundaries of the given window expression should be bounded; i.e. we cannot run queries involving eitherUNBOUNDED PRECEDING
orUNBOUNDED FOLLOWING
.ORDER BY
clauses should already be aligned with theORDER BY
specification. With this condition is met, we can remove thePhysicalSort
expression beforeWindowExec
and generate results as we scan the table.If the above conditions are met, we can run a query like the one below
with a bounded memory algorithm. Consider the physical plan of the above query:
If we know that the column
inc_col
in the table is monotonically increasing, we can deduce that theSortExec: [inc_col@0 ASC NULLS LAST
] step in the physical plan is unnecessary. Hence, we can remove this step from the physical plan (see #4691). Furthermore, we also know that the frameRANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING
describes a bounded range. Therefore, we can turn the above physical plan into the one below:Performance Indicators
We analyzed the new Bounded Memory Window Executor and compared it with the existing implementation via
Test conditions for both executors is as follows:
SUM (x) OVER (RANGE ORDER BY a RANGE BETWEEN 10 PRECEDING AND 10 FOLLOWING
))RecordBatch
stream. They have varying sizes in the range of 0 to 50.NOTE: We did not include the benchmarking code in this PR.
Benchmarking
We measure the execution duration of each operator. The input size is
100_000
.WindowAggExec
: 226.72 msBoundedWindowAggExec
: 154.71 mswhich shows that overall performance improves. This is due to searching
RANGE
boundaries in a smaller batch since we maintain a bounded state.Heaptrack
We used a simple test case for memory consumption; the input size is
1_000_000
.WindowAggExec Memory Profiling
BoundedWindowAggExec Profiling
The finding supports that the sliding window approach is memory efficient.
What changes are included in this PR?
This PR includes a bounded memory variant of the already-existing
WindowAggExec
. We add a rule to choose betweenWindowAggExec
andBoundedWindowAggExec
. The strategy is as follows: ifwindow_expr
can generate its result without seeing whole table we chooseBoundedWindowAggExec
otherwise we chooseWindowAggExec
. Please note that it is possible (but not certainly trivial) to unify these executors. However, we left this as future work.In this implementation, we also added bounded execution support for
COUNT
,SUM
,MIN
,MAX
amongAggregateFunction
s. AmongBuiltInWindowFunction
s, we added support forROW_NUMBER
,RANK
,DENSE_RANK
,LAG
,LEAD
,FIRST_VALUE
,LAST_VALUE
,NTH_VALUE
. If the window function used is different than these, we fall back toWindowAggExec
.Are these changes tested?
We have added fuzzy tests comparing the results of
WindowAggExec
andBoundedWindowAggExec
. We also added sql tests that runs on already sorted parquet file. Approximately 700 lines of the changes come from test and test utils.Are there any user-facing changes?
None.