Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline-friendly Bounded Memory Window Executor #4777

Merged
merged 64 commits into from
Jan 4, 2023
Merged

Pipeline-friendly Bounded Memory Window Executor #4777

merged 64 commits into from
Jan 4, 2023

Conversation

mustafasrepo
Copy link
Contributor

Which issue does this PR close?

Improves the situation on #4285.

Rationale for this change

NOTE: Below discussion is a simplification of a more detailed exposition in the streaming execution proposal.

Unlike how the current implementation works, queries involving window expressions can actually be executed without materializing the entire table in memory when certain conditions are met. These conditions for a bounded implementation are as follows:

  • In order to run WindowExec with bounded memory (without seeing the whole table), window frame boundaries of the given window expression should be bounded; i.e. we cannot run queries involving either UNBOUNDED PRECEDING or UNBOUNDED FOLLOWING.
  • We should be able to produce query results as we scan the table incrementally. For this to be possible, columns used in the ORDER BY clauses should already be aligned with the ORDER BY specification. With this condition is met, we can remove the PhysicalSort expression before WindowExec and generate results as we scan the table.

If the above conditions are met, we can run a query like the one below

SELECT
    SUM(inc_col) OVER(ORDER BY inc_col ASC RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING)
FROM annotated_data

with a bounded memory algorithm. Consider the physical plan of the above query:

+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                        |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+                                                                                                                                       |
| physical_plan | ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER BY [annotated_data.inc_col ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@0 as SUM(annotated_data.inc_col)]             |
|               |   WindowAggExec: wdw=[SUM(annotated_data.inc_col): Ok(Field { name: "SUM(annotated_data.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None })] |
|               |     SortExec: [inc_col@0 ASC NULLS LAST]                                                                                                                                                    |
|               |       MemoryExec: partitions=1, partition_sizes=[51]                                                                                                                                        |
|               |                                                                                                                                                                                             |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

If we know that the column inc_col in the table is monotonically increasing, we can deduce that the SortExec: [inc_col@0 ASC NULLS LAST] step in the physical plan is unnecessary. Hence, we can remove this step from the physical plan (see #4691). Furthermore, we also know that the frame RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING describes a bounded range. Therefore, we can turn the above physical plan into the one below:

+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                              |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+                                                                                                                                             |
| physical_plan | ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER BY [annotated_data.inc_col ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@0 as SUM(annotated_data.inc_col)]                   |
|               |   BoundedWindowAggExec: wdw=[SUM(annotated_data.inc_col): Ok(Field { name: "SUM(annotated_data.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None })] |
|               |     MemoryExec: partitions=1, partition_sizes=[51]                                                                                                                                                |
|               |                                                                                                                                                                                                   |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Performance Indicators

We analyzed the new Bounded Memory Window Executor and compared it with the existing implementation via

  • Benchmarking (criterion) for CPU-time analysis, and
  • Memory profiling the binary executable (Heaptrack).

Test conditions for both executors is as follows:

  • No partition
  • Single query (SUM (x) OVER (RANGE ORDER BY a RANGE BETWEEN 10 PRECEDING AND 10 FOLLOWING))
  • No sorting is included since the input data is already sorted.
  • The input is generated as a RecordBatch stream. They have varying sizes in the range of 0 to 50.

NOTE: We did not include the benchmarking code in this PR.

Benchmarking

We measure the execution duration of each operator. The input size is 100_000.

  • Average execution time for WindowAggExec: 226.72 ms
  • Average execution time for BoundedWindowAggExec: 154.71 ms

which shows that overall performance improves. This is due to searching RANGE boundaries in a smaller batch since we maintain a bounded state.

Heaptrack

We used a simple test case for memory consumption; the input size is 1_000_000.

WindowAggExec Memory Profiling

  • peak heap memory consumption: 161,9MB after 1min23s
  • peak RSS (including headtrack overhead): 202,7MB

Memory usage of WindowAggExec

BoundedWindowAggExec Profiling

  • peak heap memory consumption: 78,6MB after 08.633s
  • peak RSS (including heaptrack overhead):115,4MB

The finding supports that the sliding window approach is memory efficient.

Memory usage of BoundedWindowAggExec

What changes are included in this PR?

This PR includes a bounded memory variant of the already-existing WindowAggExec. We add a rule to choose between WindowAggExec and BoundedWindowAggExec. The strategy is as follows: if window_expr can generate its result without seeing whole table we choose BoundedWindowAggExec otherwise we choose WindowAggExec. Please note that it is possible (but not certainly trivial) to unify these executors. However, we left this as future work.

In this implementation, we also added bounded execution support for COUNT, SUM, MIN, MAX among AggregateFunctions. Among BuiltInWindowFunctions, we added support for ROW_NUMBER, RANK, DENSE_RANK, LAG, LEAD, FIRST_VALUE, LAST_VALUE, NTH_VALUE. If the window function used is different than these, we fall back to WindowAggExec.

Are these changes tested?

We have added fuzzy tests comparing the results of WindowAggExec and BoundedWindowAggExec. We also added sql tests that runs on already sorted parquet file. Approximately 700 lines of the changes come from test and test utils.

Are there any user-facing changes?

None.

mustafasrepo and others added 30 commits December 13, 2022 16:38
* partition by refactor

* minor changes

* Unnecessary tuple to Range conversion is removed

* move transpose under common
…a-ai/arrow-datafusion into feature/sort_removal_rule

# Conflicts:
#	datafusion/physical-expr/src/aggregate/count.rs
#	datafusion/physical-expr/src/aggregate/mod.rs
#	datafusion/physical-expr/src/aggregate/sum.rs
#	datafusion/physical-expr/src/window/aggregate.rs
@github-actions github-actions bot added core Core DataFusion crate logical-expr Logical plan and expressions physical-expr Physical Expressions labels Dec 30, 2022
@ozankabak
Copy link
Contributor

I am very happy that we are finally sending this upstream. We have been working on making windows pipeline-friendly for a while now, and this provides a great foundation for that. There are a few things we will improve with follow-on PRs (e.g. extending this to support GROUPS mode), but the core functionality is already there.

Looking forward to receiving feedback!

@alamb
Copy link
Contributor

alamb commented Dec 31, 2022

Thank you for this PR -- I will try and review it later this weekend but I may not have time until early next week

@github-actions github-actions bot removed the logical-expr Logical plan and expressions label Jan 2, 2023
@alamb
Copy link
Contributor

alamb commented Jan 3, 2023

Starting to check this out

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @mustafasrepo

I went through this PR as carefully as I could given its size. Sorry for the delay in review -- finding the contiguous uninterrupted time to review has been hard. I had some suggestions but overall I think it could also be merged as is. As in your past PRs I found it well commented, well tested, and overall a pleasure to read.

I did not review all the logic in the window functions but I did review the tests carefully as well as skimmed all the code.

which shows that overall performance improves. This is due to searching RANGE boundaries in a smaller batch since we maintain a bounded state.

Very impressive benchmark results

.github/workflows/rust.yml Show resolved Hide resolved
datafusion/core/Cargo.toml Show resolved Hide resolved
datafusion/core/src/execution/context.rs Outdated Show resolved Hide resolved
datafusion/core/src/physical_plan/common.rs Show resolved Hide resolved
datafusion/physical-expr/src/window/partition_evaluator.rs Outdated Show resolved Hide resolved
datafusion/physical-expr/src/window/rank.rs Show resolved Hide resolved
@ozankabak
Copy link
Contributor

Thank you for the detailed review! We will go through your reviews and let you know when we are done so you can merge.

@ozankabak
Copy link
Contributor

@alamb, we just did our final review with @mustafasrepo and this is good to go. Feel free to merge after CI passes 🚀

@alamb
Copy link
Contributor

alamb commented Jan 4, 2023

@alamb, we just did our final review with @mustafasrepo and this is good to go. Feel free to merge after CI passes 🚀

Will do -- thank you @mustafasrepo and @ozankabak

@ursabot
Copy link

ursabot commented Jan 4, 2023

Benchmark runs are scheduled for baseline = e1dc962 and contender = 80abc94. 80abc94 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@mustafasrepo mustafasrepo deleted the feature/bounded_window_exec branch January 10, 2023 11:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants