Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic Join Pushdown #12706

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

[workspace]
# datafusion-cli is excluded because of its Cargo.lock. See datafusion-cli/README.md.
exclude = ["datafusion-cli", "dev/depcheck"]
exclude = ["dev/depcheck"]
members = [
"datafusion-cli",
"datafusion/common",
"datafusion/common-runtime",
"datafusion/catalog",
Expand Down
5 changes: 5 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,11 @@ config_namespace! {
/// then the output will be coerced to a non-view.
/// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
pub expand_views_at_output: bool, default = false

///
///
///
pub dynamic_join_pushdown: bool, default = true
}
}

Expand Down
76 changes: 56 additions & 20 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ use datafusion_expr::{
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_plan::joins::utils::PhysicalDynamicFiltersInfo;
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_physical_plan::unnest::ListUnnest;
use datafusion_sql::utils::window_expr_common_partition_keys;
Expand Down Expand Up @@ -858,6 +859,7 @@ impl DefaultPhysicalPlanner {
join_type,
null_equals_null,
schema: join_schema,
filter_pushdown_info,
..
}) => {
let null_equals_null = *null_equals_null;
Expand Down Expand Up @@ -1051,6 +1053,34 @@ impl DefaultPhysicalPlanner {
_ => None,
};

let physical_dynamic_filter_info = if let Some(filter_pushdown_info) =
filter_pushdown_info
{
let aggregates = filter_pushdown_info
.min_max_aggregates
.iter()
.map(|aggr| {
self.create_physical_expr(aggr, &join_schema, session_state)?
})
.collect::<Vec<_>>();

let columns = filter_pushdown_info
.filters
.iter()
.map(|filter| {
self.create_physical_expr(
filter.column,
&join_schema,
session_state,
)?
})
.collect::<Vec<_>>();

Some(PhysicalDynamicFiltersInfo::new(aggregates, columns))
} else {
None
};

let prefer_hash_join =
session_state.config_options().optimizer.prefer_hash_join;

Expand Down Expand Up @@ -1091,27 +1121,33 @@ impl DefaultPhysicalPlanner {
PartitionMode::Partitioned
}
};
Arc::new(HashJoinExec::try_new(
physical_left,
physical_right,
join_on,
join_filter,
join_type,
None,
partition_mode,
null_equals_null,
)?)
Arc::new(
HashJoinExec::try_new(
physical_left,
physical_right,
join_on,
join_filter,
join_type,
None,
partition_mode,
null_equals_null,
)?
.with_dynamic_filter_info(physical_dynamic_filter_info),
)
} else {
Arc::new(HashJoinExec::try_new(
physical_left,
physical_right,
join_on,
join_filter,
join_type,
None,
PartitionMode::CollectLeft,
null_equals_null,
)?)
Arc::new(
HashJoinExec::try_new(
physical_left,
physical_right,
join_on,
join_filter,
join_type,
None,
PartitionMode::CollectLeft,
null_equals_null,
)?
.with_dynamic_filter_info(physical_dynamic_filter_info),
)
};

// If plan was mutated previously then need to create the ExecutionPlan
Expand Down
3 changes: 3 additions & 0 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,7 @@ impl LogicalPlanBuilder {
join_constraint: JoinConstraint::On,
schema: DFSchemaRef::new(join_schema),
null_equals_null,
filter_pushdown_info: None,
})))
}

Expand Down Expand Up @@ -941,6 +942,7 @@ impl LogicalPlanBuilder {
join_constraint: JoinConstraint::Using,
schema: DFSchemaRef::new(join_schema),
null_equals_null: false,
filter_pushdown_info: None,
})))
}
}
Expand Down Expand Up @@ -1164,6 +1166,7 @@ impl LogicalPlanBuilder {
join_constraint: JoinConstraint::On,
schema: DFSchemaRef::new(join_schema),
null_equals_null: false,
filter_pushdown_info: None,
})))
}

Expand Down
17 changes: 16 additions & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::logical_plan::{DmlStatement, Statement};
use crate::utils::{
enumerate_grouping_sets, exprlist_len, exprlist_to_fields, find_base_plan,
find_out_reference_exprs, grouping_set_expr_count, grouping_set_to_exprlist,
split_conjunction,
split_conjunction, DynamicJoinFilterPushdownInfo,
};
use crate::{
build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Expr,
Expand Down Expand Up @@ -669,6 +669,7 @@ impl LogicalPlan {
on,
schema: _,
null_equals_null,
filter_pushdown_info,
}) => {
let schema =
build_join_schema(left.schema(), right.schema(), &join_type)?;
Expand All @@ -690,6 +691,7 @@ impl LogicalPlan {
filter,
schema: DFSchemaRef::new(schema),
null_equals_null,
filter_pushdown_info,
}))
}
LogicalPlan::CrossJoin(CrossJoin {
Expand Down Expand Up @@ -954,6 +956,7 @@ impl LogicalPlan {
filter: filter_expr,
schema: DFSchemaRef::new(schema),
null_equals_null: *null_equals_null,
filter_pushdown_info: None,
}))
}
LogicalPlan::CrossJoin(_) => {
Expand Down Expand Up @@ -3173,6 +3176,9 @@ pub struct Join {
pub schema: DFSchemaRef,
/// If null_equals_null is true, null == null else null != null
pub null_equals_null: bool,
/// store the filter which should passed to scan if dynamic filter
/// pushdown is enabled
pub filter_pushdown_info: Option<Arc<DynamicJoinFilterPushdownInfo>>,
}

impl Join {
Expand Down Expand Up @@ -3206,8 +3212,17 @@ impl Join {
join_constraint: original_join.join_constraint,
schema: Arc::new(join_schema),
null_equals_null: original_join.null_equals_null,
filter_pushdown_info: None,
})
}
/// assign filter pushdown struct
pub fn with_filter_pushdown_info(
mut self,
filter_pushdown: Arc<DynamicJoinFilterPushdownInfo>,
) -> Self {
self.filter_pushdown_info = Some(filter_pushdown);
self
}
}

// Manual implementation needed because of `schema` field. Comparison excludes this field.
Expand Down
4 changes: 4 additions & 0 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ impl TreeNode for LogicalPlan {
join_constraint,
schema,
null_equals_null,
filter_pushdown_info,
}) => map_until_stop_and_collect!(
rewrite_arc(left, &mut f),
right,
Expand All @@ -157,6 +158,7 @@ impl TreeNode for LogicalPlan {
join_constraint,
schema,
null_equals_null,
filter_pushdown_info,
})
}),
LogicalPlan::CrossJoin(CrossJoin {
Expand Down Expand Up @@ -639,6 +641,7 @@ impl LogicalPlan {
join_constraint,
schema,
null_equals_null,
filter_pushdown_info,
}) => map_until_stop_and_collect!(
on.into_iter().map_until_stop_and_collect(
|on| map_until_stop_and_collect!(f(on.0), on.1, f(on.1))
Expand All @@ -658,6 +661,7 @@ impl LogicalPlan {
join_constraint,
schema,
null_equals_null,
filter_pushdown_info,
})
}),
LogicalPlan::Sort(Sort { expr, input, fetch }) => {
Expand Down
61 changes: 61 additions & 0 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1324,6 +1324,67 @@ pub fn format_state_name(name: &str, state_name: &str) -> String {
format!("{name}[{state_name}]")
}

/// Dynamic join filter used in
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DynamicJoinFilterPushdownInfo {
pub dynamic_filters: Option<DynamicTableFilters>,
pub filters: Vec<DynamicJoinFilterPushdownColumn>,
pub min_max_aggregates: Vec<Expr>,
}

impl DynamicJoinFilterPushdownInfo {
pub fn new_with_all(
dynamic_filters: DynamicTableFilters,
filters: Vec<DynamicJoinFilterPushdownColumn>,
min_max_aggregates: Vec<Expr>,
) -> Self {
Self {
dynamic_filters: Some(dynamic_filters),
filters,
min_max_aggregates,
}
}

pub fn new_with_dynamic_filter(dynamic_filters: DynamicTableFilters) -> Self {
Self {
dynamic_filters: Some(dynamic_filters),
filters: Vec::new(),
min_max_aggregates: Vec::new(),
}
}
pub fn new() -> Self {
Self {
dynamic_filters: None,
filters: Vec::new(),
min_max_aggregates: Vec::new(),
}
}
pub fn push_filter(&mut self, filter: DynamicJoinFilterPushdownColumn) {
self.filters.push(filter);
}

pub fn push_aggregates(&mut self, aggs: Vec<Aggregate>) {
self.min_max_aggregates.extend(aggs);
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DynamicTableFilters {}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DynamicJoinFilterPushdownColumn {
pub condition_idx: usize,
pub column: Arc<Column>,
}

impl DynamicJoinFilterPushdownColumn {
pub fn new(condition_idx: usize, column: Arc<Column>) -> Self {
Self {
condition_idx,
column,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ fn find_inner_join(
filter: None,
schema: join_schema,
null_equals_null: false,
filter_pushdown_info: None,
}));
}
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/eliminate_outer_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ impl OptimizerRule for EliminateOuterJoin {
filter: join.filter.clone(),
schema: Arc::clone(&join.schema),
null_equals_null: join.null_equals_null,
filter_pushdown_info: None,
}));
Filter::try_new(filter.predicate, new_join)
.map(|f| Transformed::yes(LogicalPlan::Filter(f)))
Expand Down
3 changes: 3 additions & 0 deletions datafusion/optimizer/src/extract_equijoin_predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl OptimizerRule for ExtractEquijoinPredicate {
join_constraint,
schema,
null_equals_null,
filter_pushdown_info,
}) => {
let left_schema = left.schema();
let right_schema = right.schema();
Expand All @@ -93,6 +94,7 @@ impl OptimizerRule for ExtractEquijoinPredicate {
join_constraint,
schema,
null_equals_null,
filter_pushdown_info,
})))
} else {
Ok(Transformed::no(LogicalPlan::Join(Join {
Expand All @@ -104,6 +106,7 @@ impl OptimizerRule for ExtractEquijoinPredicate {
join_constraint,
schema,
null_equals_null,
filter_pushdown_info,
})))
}
}
Expand Down
Loading
Loading