-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for Sliding Windows Joins with Symmetric Hash Join (SHJ) #5322
Changes from all commits
d456081
59c9acf
b156bc4
9adb7da
b9d50bd
bbe2570
0abf0d8
6a15fe7
d2b6a66
fd4a51f
1da529f
6921aaf
582573e
7638902
287f2f4
c8d7c21
08cf4f0
4b1b435
05beacf
f59018b
f333d6a
34c71e8
8ea0400
76440a2
9a66872
f9dfd77
7e250df
f7c216f
2472446
263eab0
ec6e5ad
35c04a0
0743eb6
a9a4912
52e143d
5ec2eae
175133b
fc83386
01c3434
3bef26d
537bbe5
70a9905
26ea524
55cffd1
adbc128
f95e3a6
766da03
970b124
ff4cbf9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,11 +29,19 @@ use crate::physical_optimizer::pipeline_checker::{ | |
check_finiteness_requirements, PipelineStatePropagator, | ||
}; | ||
use crate::physical_optimizer::PhysicalOptimizerRule; | ||
use crate::physical_plan::joins::{HashJoinExec, PartitionMode}; | ||
use crate::physical_plan::joins::utils::JoinSide; | ||
use crate::physical_plan::joins::{ | ||
convert_sort_expr_with_filter_schema, HashJoinExec, PartitionMode, | ||
SymmetricHashJoinExec, | ||
}; | ||
use crate::physical_plan::rewrite::TreeNodeRewritable; | ||
use crate::physical_plan::ExecutionPlan; | ||
use datafusion_common::DataFusionError; | ||
use datafusion_expr::logical_plan::JoinType; | ||
use datafusion_physical_expr::expressions::{BinaryExpr, CastExpr, Column, Literal}; | ||
use datafusion_physical_expr::intervals::{is_datatype_supported, is_operator_supported}; | ||
use datafusion_physical_expr::PhysicalExpr; | ||
|
||
use std::sync::Arc; | ||
|
||
/// The [PipelineFixer] rule tries to modify a given plan so that it can | ||
|
@@ -48,17 +56,24 @@ impl PipelineFixer { | |
Self {} | ||
} | ||
} | ||
/// [PipelineFixer] subrules are functions of this type. Such functions take a | ||
/// single [PipelineStatePropagator] argument, which stores state variables | ||
/// indicating the unboundedness status of the current [ExecutionPlan] as | ||
/// the [PipelineFixer] rule traverses the entire plan tree. | ||
type PipelineFixerSubrule = | ||
dyn Fn(&PipelineStatePropagator) -> Option<Result<PipelineStatePropagator>>; | ||
dyn Fn(PipelineStatePropagator) -> Option<Result<PipelineStatePropagator>>; | ||
Comment on lines
63
to
+64
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you please add some comments for the type definition There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, will do. |
||
|
||
impl PhysicalOptimizerRule for PipelineFixer { | ||
fn optimize( | ||
&self, | ||
plan: Arc<dyn ExecutionPlan>, | ||
_config: &ConfigOptions, | ||
) -> Result<Arc<dyn ExecutionPlan>> { | ||
let pipeline = PipelineStatePropagator::new(plan); | ||
let physical_optimizer_subrules: Vec<Box<PipelineFixerSubrule>> = | ||
vec![Box::new(hash_join_swap_subrule)]; | ||
let physical_optimizer_subrules: Vec<Box<PipelineFixerSubrule>> = vec![ | ||
Box::new(hash_join_convert_symmetric_subrule), | ||
Box::new(hash_join_swap_subrule), | ||
]; | ||
let state = pipeline.transform_up(&|p| { | ||
apply_subrules_and_check_finiteness_requirements( | ||
p, | ||
|
@@ -77,6 +92,104 @@ impl PhysicalOptimizerRule for PipelineFixer { | |
} | ||
} | ||
|
||
/// Indicates whether interval arithmetic is supported for the given expression. | ||
/// Currently, we do not support all [PhysicalExpr]s for interval calculations. | ||
/// We do not support every type of [Operator]s either. Over time, this check | ||
/// will relax as more types of [PhysicalExpr]s and [Operator]s are supported. | ||
/// Currently, [CastExpr], [BinaryExpr], [Column] and [Literal] is supported. | ||
fn check_support(expr: &Arc<dyn PhysicalExpr>) -> bool { | ||
let expr_any = expr.as_any(); | ||
let expr_supported = if let Some(binary_expr) = expr_any.downcast_ref::<BinaryExpr>() | ||
{ | ||
is_operator_supported(binary_expr.op()) | ||
} else { | ||
expr_any.is::<Column>() || expr_any.is::<Literal>() || expr_any.is::<CastExpr>() | ||
}; | ||
expr_supported && expr.children().iter().all(check_support) | ||
} | ||
|
||
/// This function returns whether a given hash join is replaceable by a | ||
/// symmetric hash join. Basically, the requirement is that involved | ||
/// [PhysicalExpr]s, [Operator]s and data types need to be supported, | ||
/// and order information must cover every column in the filter expression. | ||
fn is_suitable_for_symmetric_hash_join(hash_join: &HashJoinExec) -> Result<bool> { | ||
if let Some(filter) = hash_join.filter() { | ||
let left = hash_join.left(); | ||
if let Some(left_ordering) = left.output_ordering() { | ||
let right = hash_join.right(); | ||
if let Some(right_ordering) = right.output_ordering() { | ||
let expr_supported = check_support(filter.expression()); | ||
let left_convertible = convert_sort_expr_with_filter_schema( | ||
&JoinSide::Left, | ||
filter, | ||
&left.schema(), | ||
&left_ordering[0], | ||
)? | ||
.is_some(); | ||
let right_convertible = convert_sort_expr_with_filter_schema( | ||
&JoinSide::Right, | ||
filter, | ||
&right.schema(), | ||
&right_ordering[0], | ||
)? | ||
.is_some(); | ||
let fields_supported = filter | ||
.schema() | ||
.fields() | ||
.iter() | ||
.all(|f| is_datatype_supported(f.data_type())); | ||
return Ok(expr_supported | ||
&& fields_supported | ||
&& left_convertible | ||
&& right_convertible); | ||
} | ||
} | ||
} | ||
Ok(false) | ||
} | ||
|
||
/// This subrule checks if one can replace a hash join with a symmetric hash | ||
/// join so that the pipeline does not break due to the join operation in | ||
/// question. If possible, it makes this replacement; otherwise, it has no | ||
/// effect. | ||
fn hash_join_convert_symmetric_subrule( | ||
input: PipelineStatePropagator, | ||
) -> Option<Result<PipelineStatePropagator>> { | ||
let plan = input.plan; | ||
if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() { | ||
let ub_flags = input.children_unbounded; | ||
let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder why this PR limits the use of symmetric hash join to unbounded streams? It seems it could be used for any input where the sort conditions are reasonable, given
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As I mentioned in the rule-related comment, this limit is temporary. We will enable SHJ for other cases in one of the follow-on PRs 🚀 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. makes sense There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we move the join selection logic to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems currently we can not move the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We will think about this. I don't see a straightforward way right away but I will talk to @metesynnada about it in detail. If this turns out to be possible, we can make a follow-on PR about it. |
||
let new_plan = if left_unbounded && right_unbounded { | ||
match is_suitable_for_symmetric_hash_join(hash_join) { | ||
Ok(true) => SymmetricHashJoinExec::try_new( | ||
hash_join.left().clone(), | ||
hash_join.right().clone(), | ||
hash_join | ||
.on() | ||
.iter() | ||
.map(|(l, r)| (l.clone(), r.clone())) | ||
.collect(), | ||
hash_join.filter().unwrap().clone(), | ||
hash_join.join_type(), | ||
hash_join.null_equals_null(), | ||
) | ||
.map(|e| Arc::new(e) as _), | ||
Ok(false) => Ok(plan), | ||
Err(e) => return Some(Err(e)), | ||
} | ||
} else { | ||
Ok(plan) | ||
}; | ||
Some(new_plan.map(|plan| PipelineStatePropagator { | ||
plan, | ||
unbounded: left_unbounded || right_unbounded, | ||
children_unbounded: ub_flags, | ||
})) | ||
} else { | ||
None | ||
} | ||
} | ||
|
||
/// This subrule will swap build/probe sides of a hash join depending on whether its inputs | ||
/// may produce an infinite stream of records. The rule ensures that the left (build) side | ||
/// of the hash join always operates on an input stream that will produce a finite set of. | ||
|
@@ -119,12 +232,12 @@ impl PhysicalOptimizerRule for PipelineFixer { | |
/// | ||
/// ``` | ||
fn hash_join_swap_subrule( | ||
input: &PipelineStatePropagator, | ||
input: PipelineStatePropagator, | ||
) -> Option<Result<PipelineStatePropagator>> { | ||
let plan = input.plan.clone(); | ||
let children = &input.children_unbounded; | ||
let plan = input.plan; | ||
if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() { | ||
let (left_unbounded, right_unbounded) = (children[0], children[1]); | ||
let ub_flags = input.children_unbounded; | ||
let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]); | ||
let new_plan = if left_unbounded && !right_unbounded { | ||
if matches!( | ||
*hash_join.join_type(), | ||
|
@@ -140,12 +253,11 @@ fn hash_join_swap_subrule( | |
} else { | ||
Ok(plan) | ||
}; | ||
let new_state = new_plan.map(|plan| PipelineStatePropagator { | ||
Some(new_plan.map(|plan| PipelineStatePropagator { | ||
plan, | ||
unbounded: left_unbounded || right_unbounded, | ||
children_unbounded: vec![left_unbounded, right_unbounded], | ||
}); | ||
Some(new_state) | ||
children_unbounded: ub_flags, | ||
})) | ||
} else { | ||
None | ||
} | ||
|
@@ -182,13 +294,46 @@ fn apply_subrules_and_check_finiteness_requirements( | |
physical_optimizer_subrules: &Vec<Box<PipelineFixerSubrule>>, | ||
) -> Result<Option<PipelineStatePropagator>> { | ||
for sub_rule in physical_optimizer_subrules { | ||
if let Some(value) = sub_rule(&input).transpose()? { | ||
if let Some(value) = sub_rule(input.clone()).transpose()? { | ||
input = value; | ||
} | ||
} | ||
check_finiteness_requirements(input) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Regarding the method I would suggest to make the fn apply_subrules_and_check_finiteness_requirements(
mut input: PipelineStatePropagator,
physical_optimizer_subrules: &Vec<Box<PipelineFixerSubrule>>,
) Suggest change to fn apply_subrules_and_check_finiteness_requirements(
input: PipelineStatePropagator,
physical_optimizer_subrules: &[Box<PipelineFixerSubrule>],
) -> Result<Option<PipelineStatePropagator>> {
let after_op =
physical_optimizer_subrules
.iter()
.try_fold(input, |pipeline, sub_rule| {
if let Some(value) = sub_rule(&pipeline).transpose()? {
Result::<_, DataFusionError>::Ok(value)
} else {
Result::<_, DataFusionError>::Ok(pipeline)
}
})?;
check_finiteness_requirements(after_op)
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried it, but this creates an ownership problem in the closure (since BTW, when we were investigating There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, you can do all code refinement in following PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we will be using Ballista. We haven't contributed much to Ballista yet as we are still focusing on fundamental streaming execution primitives, but will increase our focus on Ballista over time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good to know. In the past, there was some discussion to enhance Ballista to support both BATCH/STREAMING execution models: https://docs.google.com/document/d/1OdAe078axk4qO0ozUxNqBMD4wKoBhzh9keMuLp_jerE/edit# And in the latest Flink release, they had implement similar features(Bubble execution model, hybird shuffle etc). I think generally we can follow Flink's approach to make both DataFusion and Ballista support BATCH/STREAMING execution models. In the high level, we can have different models(BATCH vs STREAMING), and user can specify the execution model. In the physical planing phase, we have https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have made significant progress in foundational streaming execution support on the Datafusion side over the last few months. For example, you can already mark sources as infinite/unbounded (check out the tests with FIFO files). Every We will write a blog post about this soon, I will share with you when it is done so you can get up to speed on what we've done already. I think it would be also good to have a meeting with everyone interested in this to discuss future plans and designs. |
||
|
||
#[cfg(test)] | ||
mod util_tests { | ||
use crate::physical_optimizer::pipeline_fixer::check_support; | ||
use datafusion_expr::Operator; | ||
use datafusion_physical_expr::expressions::{BinaryExpr, Column, NegativeExpr}; | ||
use datafusion_physical_expr::PhysicalExpr; | ||
use std::sync::Arc; | ||
|
||
#[test] | ||
fn check_expr_supported() { | ||
let supported_expr = Arc::new(BinaryExpr::new( | ||
Arc::new(Column::new("a", 0)), | ||
Operator::Plus, | ||
Arc::new(Column::new("a", 0)), | ||
)) as Arc<dyn PhysicalExpr>; | ||
assert!(check_support(&supported_expr)); | ||
let supported_expr_2 = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>; | ||
assert!(check_support(&supported_expr_2)); | ||
let unsupported_expr = Arc::new(BinaryExpr::new( | ||
Arc::new(Column::new("a", 0)), | ||
Operator::Or, | ||
Arc::new(Column::new("a", 0)), | ||
)) as Arc<dyn PhysicalExpr>; | ||
assert!(!check_support(&unsupported_expr)); | ||
let unsupported_expr_2 = Arc::new(BinaryExpr::new( | ||
Arc::new(Column::new("a", 0)), | ||
Operator::Or, | ||
Arc::new(NegativeExpr::new(Arc::new(Column::new("a", 0)))), | ||
)) as Arc<dyn PhysicalExpr>; | ||
assert!(!check_support(&unsupported_expr_2)); | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod hash_join_tests { | ||
use super::*; | ||
|
@@ -574,7 +719,7 @@ mod hash_join_tests { | |
children_unbounded: vec![left_unbounded, right_unbounded], | ||
}; | ||
let optimized_hash_join = | ||
hash_join_swap_subrule(&initial_hash_join_state).unwrap()?; | ||
hash_join_swap_subrule(initial_hash_join_state).unwrap()?; | ||
let optimized_join_plan = optimized_hash_join.plan; | ||
|
||
// If swap did happen | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why doess the enforcement happens before pipeline fixer (and not for example, also before
JoinSelection
)?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We haven't focused on optimal rule ordering yet. Right now, we only enable SHJ when joining two unbounded streams, so it has to happen at some point before
PipelineFixer
. As we mature the SHJ implementation, we will enable it even for normal tables when it is appropriate (i.e. yields performance gains). Within that context, we will revisit both rule internal logic and ordering.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the filter expression is incapable of pruning both sides, bounded sources will not experience any performance boost since there is no sliding window effect. We plan to include a check for whether the filter expression supports range pruning on both sides in a future PR. Once this check is in place, we can evaluate the feasibility of applying this plan to bounded sources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the comment for
PipelineFixer
need to be adjusted.