Skip to content

Commit

Permalink
Pipeline-friendly Bounded Memory Window Executor (#4777)
Browse files Browse the repository at this point in the history
* Sort Removal rule initial commit

* move ordering satisfy to the util

* update test and change repartition maintain_input_order impl

* simplifications

* partition by refactor (#28)

* partition by refactor

* minor changes

* Unnecessary tuple to Range conversion is removed

* move transpose under common

* Add naive sort removal rule

* Add todo for finer Sort removal handling

* Refactors to improve readability and reduce nesting

* reverse expr returns Option (no need for support check)

* fix tests

* partition by and order by no longer ends up at the same window group

* Bounded window exec

* solve merge problems

* Refactor to simplify code

* Better comments, change method names

* resolve merge conflicts

* Resolve errors introduced by syncing

* remove set_state, make ntile debuggable

* remove locked flag

* address reviews

* address reviews

* Resolve merge conflict

* address reviews

* address reviews

* address reviews

* Add new tests

* Update tests

* add support for bounded min max

* address reviews

* rename sort rule

* Resolve merge conflicts

* refactors

* Update fuzzy tests + minor changes

* Simplify code and improve comments

* Fix imports, make create_schema more functional

* address reviews

* undo yml change

* minor change to pass from CI

* resolve merge conflicts

* rename some members

* Move rule to physical planning

* Minor stylistic/comment changes

* Simplify batch-merging utility functions

* Remove unnecessary clones, simplify code

* update cargo lock file

* address reviews

* update comments

* resolve linter error

* Tidy up comments after final review

Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
  • Loading branch information
mustafasrepo and ozankabak authored Jan 4, 2023
1 parent e1dc962 commit 80abc94
Show file tree
Hide file tree
Showing 31 changed files with 2,205 additions and 133 deletions.
1 change: 1 addition & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ jobs:
- name: Check Cargo.lock for datafusion-cli
run: |
# If this test fails, try running `cargo update` in the `datafusion-cli` directory
# and check in the updated Cargo.lock file.
cargo check --manifest-path datafusion-cli/Cargo.toml --locked
# test the crate
Expand Down
2 changes: 2 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ flate2 = { version = "1.0.24", optional = true }
futures = "0.3"
glob = "0.3.0"
hashbrown = { version = "0.13", features = ["raw"] }
indexmap = "1.9.2"
itertools = "0.10"
lazy_static = { version = "^1.4.0" }
log = "^0.4"
Expand Down
43 changes: 21 additions & 22 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1453,44 +1453,43 @@ impl SessionState {
// We need to take care of the rule ordering. They may influence each other.
let physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Arc::new(AggregateStatistics::new()),
// - In order to increase the parallelism, it will change the output partitioning
// of some operators in the plan tree, which will influence other rules.
// Therefore, it should be run as soon as possible.
// - The reason to make it optional is
// - it's not used for the distributed engine, Ballista.
// - it's conflicted with some parts of the BasicEnforcement, since it will
// introduce additional repartitioning while the BasicEnforcement aims at
// reducing unnecessary repartitioning.
// In order to increase the parallelism, the Repartition rule will change the
// output partitioning of some operators in the plan tree, which will influence
// other rules. Therefore, it should run as soon as possible. It is optional because:
// - It's not used for the distributed engine, Ballista.
// - It's conflicted with some parts of the BasicEnforcement, since it will
// introduce additional repartitioning while the BasicEnforcement aims at
// reducing unnecessary repartitioning.
Arc::new(Repartition::new()),
//- Currently it will depend on the partition number to decide whether to change the
// single node sort to parallel local sort and merge. Therefore, it should be run
// after the Repartition.
// - Since it will change the output ordering of some operators, it should be run
// - Currently it will depend on the partition number to decide whether to change the
// single node sort to parallel local sort and merge. Therefore, GlobalSortSelection
// should run after the Repartition.
// - Since it will change the output ordering of some operators, it should run
// before JoinSelection and BasicEnforcement, which may depend on that.
Arc::new(GlobalSortSelection::new()),
// Statistics-base join selection will change the Auto mode to real join implementation,
// Statistics-based join selection will change the Auto mode to a real join implementation,
// like collect left, or hash join, or future sort merge join, which will
// influence the BasicEnforcement to decide whether to add additional repartition
// and local sort to meet the distribution and ordering requirements.
// Therefore, it should be run before BasicEnforcement
// Therefore, it should run before BasicEnforcement.
Arc::new(JoinSelection::new()),
// If the query is processing infinite inputs, the PipelineFixer rule applies the
// necessary transformations to make the query runnable (if it is not already runnable).
// If the query can not be made runnable, the rule emits an error with a diagnostic message.
// Since the transformations it applies may alter output partitioning properties of operators
// (e.g. by swapping hash join sides), this rule runs before BasicEnforcement.
Arc::new(PipelineFixer::new()),
// It's for adding essential repartition and local sorting operator to satisfy the
// required distribution and local sort.
// BasicEnforcement is for adding essential repartition and local sorting operators
// to satisfy the required distribution and local sort requirements.
// Please make sure that the whole plan tree is determined.
Arc::new(BasicEnforcement::new()),
// `BasicEnforcement` stage conservatively inserts `SortExec`s to satisfy ordering requirements.
// However, a deeper analysis may sometimes reveal that such a `SortExec` is actually unnecessary.
// These cases typically arise when we have reversible `WindowAggExec`s or deep subqueries. The
// rule below performs this analysis and removes unnecessary `SortExec`s.
// The BasicEnforcement stage conservatively inserts sorts to satisfy ordering requirements.
// However, a deeper analysis may sometimes reveal that such a sort is actually unnecessary.
// These cases typically arise when we have reversible window expressions or deep subqueries.
// The rule below performs this analysis and removes unnecessary sorts.
Arc::new(OptimizeSorts::new()),
// It will not influence the distribution and ordering of the whole plan tree.
// Therefore, to avoid influencing other rules, it should be run at last.
// The CoalesceBatches rule will not influence the distribution and ordering of the
// whole plan tree. Therefore, to avoid influencing other rules, it should run last.
Arc::new(CoalesceBatches::new()),
// The PipelineChecker rule will reject non-runnable query plans that use
// pipeline-breaking operators on infinite input(s). The rule generates a
Expand Down
64 changes: 47 additions & 17 deletions datafusion/core/src/physical_optimizer/optimize_sorts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ use crate::physical_optimizer::utils::{
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use arrow::datatypes::SchemaRef;
use datafusion_common::{reverse_sort_options, DataFusionError};
use datafusion_physical_expr::window::WindowExpr;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use itertools::izip;
use std::iter::zip;
Expand Down Expand Up @@ -181,17 +182,32 @@ fn optimize_sorts(
sort_exec.input().equivalence_properties()
}) {
update_child_to_remove_unnecessary_sort(child, sort_onwards)?;
} else if let Some(window_agg_exec) =
}
// For window expressions, we can remove some sorts when we can
// calculate the result in reverse:
else if let Some(exec) =
requirements.plan.as_any().downcast_ref::<WindowAggExec>()
{
// For window expressions, we can remove some sorts when we can
// calculate the result in reverse:
if let Some(res) = analyze_window_sort_removal(
window_agg_exec,
if let Some(result) = analyze_window_sort_removal(
exec.window_expr(),
&exec.partition_keys,
sort_exec,
sort_onwards,
)? {
return Ok(Some(result));
}
} else if let Some(exec) = requirements
.plan
.as_any()
.downcast_ref::<BoundedWindowAggExec>()
{
if let Some(result) = analyze_window_sort_removal(
exec.window_expr(),
&exec.partition_keys,
sort_exec,
sort_onwards,
)? {
return Ok(Some(res));
return Ok(Some(result));
}
}
// TODO: Once we can ensure that required ordering information propagates with
Expand Down Expand Up @@ -273,9 +289,11 @@ fn analyze_immediate_sort_removal(
Ok(None)
}

/// Analyzes a `WindowAggExec` to determine whether it may allow removing a sort.
/// Analyzes a [WindowAggExec] or a [BoundedWindowAggExec] to determine whether
/// it may allow removing a sort.
fn analyze_window_sort_removal(
window_agg_exec: &WindowAggExec,
window_expr: &[Arc<dyn WindowExpr>],
partition_keys: &[Arc<dyn PhysicalExpr>],
sort_exec: &SortExec,
sort_onward: &mut Vec<(usize, Arc<dyn ExecutionPlan>)>,
) -> Result<Option<PlanWithCorrespondingSort>> {
Expand All @@ -289,7 +307,6 @@ fn analyze_window_sort_removal(
// If there is no physical ordering, there is no way to remove a sort -- immediately return:
return Ok(None);
};
let window_expr = window_agg_exec.window_expr();
let (can_skip_sorting, should_reverse) = can_skip_sort(
window_expr[0].partition_by(),
required_ordering,
Expand All @@ -308,13 +325,26 @@ fn analyze_window_sort_removal(
if let Some(window_expr) = new_window_expr {
let new_child = remove_corresponding_sort_from_sub_plan(sort_onward)?;
let new_schema = new_child.schema();
let new_plan = Arc::new(WindowAggExec::try_new(
window_expr,
new_child,
new_schema,
window_agg_exec.partition_keys.clone(),
Some(physical_ordering.to_vec()),
)?);

let uses_bounded_memory = window_expr.iter().all(|e| e.uses_bounded_memory());
// If all window exprs can run with bounded memory choose bounded window variant
let new_plan = if uses_bounded_memory {
Arc::new(BoundedWindowAggExec::try_new(
window_expr,
new_child,
new_schema,
partition_keys.to_vec(),
Some(physical_ordering.to_vec()),
)?) as _
} else {
Arc::new(WindowAggExec::try_new(
window_expr,
new_child,
new_schema,
partition_keys.to_vec(),
Some(physical_ordering.to_vec()),
)?) as _
};
return Ok(Some(PlanWithCorrespondingSort::new(new_plan)));
}
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ mod sql_tests {
let case = QueryCase {
sql: "SELECT
c9,
SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1
SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1
FROM test
LIMIT 5".to_string(),
cases: vec![Arc::new(test1), Arc::new(test2)],
Expand All @@ -325,7 +325,7 @@ mod sql_tests {
let case = QueryCase {
sql: "SELECT
c9,
SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1
SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1
FROM test".to_string(),
cases: vec![Arc::new(test1), Arc::new(test2)],
error_operator: "Window Error".to_string()
Expand Down
42 changes: 42 additions & 0 deletions datafusion/core/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::error::{DataFusionError, Result};
use crate::execution::context::TaskContext;
use crate::physical_plan::metrics::MemTrackingMetrics;
use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statistics};
use arrow::compute::concat;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::ArrowError;
use arrow::error::Result as ArrowResult;
Expand Down Expand Up @@ -95,6 +96,47 @@ pub async fn collect(stream: SendableRecordBatchStream) -> Result<Vec<RecordBatc
.map_err(DataFusionError::from)
}

/// Merge two record batch references into a single record batch.
/// All the record batches inside the slice must have the same schema.
pub fn merge_batches(
first: &RecordBatch,
second: &RecordBatch,
schema: SchemaRef,
) -> ArrowResult<RecordBatch> {
let columns = (0..schema.fields.len())
.map(|index| {
let first_column = first.column(index).as_ref();
let second_column = second.column(index).as_ref();
concat(&[first_column, second_column])
})
.collect::<ArrowResult<Vec<_>>>()?;
RecordBatch::try_new(schema, columns)
}

/// Merge a slice of record batch references into a single record batch, or
/// return None if the slice itself is empty. All the record batches inside the
/// slice must have the same schema.
pub fn merge_multiple_batches(
batches: &[&RecordBatch],
schema: SchemaRef,
) -> ArrowResult<Option<RecordBatch>> {
Ok(if batches.is_empty() {
None
} else {
let columns = (0..schema.fields.len())
.map(|index| {
concat(
&batches
.iter()
.map(|batch| batch.column(index).as_ref())
.collect::<Vec<_>>(),
)
})
.collect::<ArrowResult<Vec<_>>>()?;
Some(RecordBatch::try_new(schema, columns)?)
})
}

/// Recursively builds a list of files in a directory with a given extension
pub fn build_checked_file_list(dir: &str, ext: &str) -> Result<Vec<String>> {
let mut filenames: Vec<String> = Vec::new();
Expand Down
31 changes: 23 additions & 8 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{joins::utils as join_utils, Partitioning};
use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, WindowExpr};
use crate::{
Expand Down Expand Up @@ -614,13 +614,28 @@ impl DefaultPhysicalPlanner {
})
.collect::<Result<Vec<_>>>()?;

Ok(Arc::new(WindowAggExec::try_new(
window_expr,
input_exec,
physical_input_schema,
physical_partition_keys,
physical_sort_keys,
)?))
let uses_bounded_memory = window_expr
.iter()
.all(|e| e.uses_bounded_memory());
// If all window expressions can run with bounded memory,
// choose the bounded window variant:
Ok(if uses_bounded_memory {
Arc::new(BoundedWindowAggExec::try_new(
window_expr,
input_exec,
physical_input_schema,
physical_partition_keys,
physical_sort_keys,
)?)
} else {
Arc::new(WindowAggExec::try_new(
window_expr,
input_exec,
physical_input_schema,
physical_partition_keys,
physical_sort_keys,
)?)
})
}
LogicalPlan::Aggregate(Aggregate {
input,
Expand Down
Loading

0 comments on commit 80abc94

Please sign in to comment.