Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Sliding Windows Joins with Symmetric Hash Join (SHJ) #5322

Merged
merged 49 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
d456081
Prunable symmetric hash join implementation
metesynnada Jan 2, 2023
59c9acf
Merge branch 'master' into feature/prunable-symmetric-hash-join
metesynnada Jan 20, 2023
b156bc4
Minor changes after merge
metesynnada Jan 20, 2023
9adb7da
Filter mapping inside SymmetricHashJoin
metesynnada Jan 23, 2023
b9d50bd
Commenting on
metesynnada Jan 24, 2023
bbe2570
Merge branch 'apache:master' into feature/prunable-symmetric-hash-join
metesynnada Jan 24, 2023
0abf0d8
Minor changes after merge
metesynnada Jan 24, 2023
6a15fe7
Simplify interval arithmetic library code
ozankabak Jan 30, 2023
d2b6a66
Make the interval arithmetics library more robust
metesynnada Jan 30, 2023
fd4a51f
Merge branch 'feature/prunable-symmetric-hash-join' of https://github…
metesynnada Jan 30, 2023
1da529f
After merge corrections
metesynnada Jan 30, 2023
6921aaf
Simplifications to constraint propagation code
ozankabak Jan 31, 2023
582573e
Revamp some API's and enhance comments
metesynnada Jan 31, 2023
7638902
Resolve a propagation bug and make the propagation returns an opt. st…
metesynnada Feb 2, 2023
287f2f4
Refactor and simplify CP code, improve comments
ozankabak Feb 3, 2023
c8d7c21
Code deduplication between pipeline fixer and utils, also enhance com…
metesynnada Feb 4, 2023
08cf4f0
Refactor on input stream consumer on SymmetricHashJoin
metesynnada Feb 6, 2023
4b1b435
Merge branch 'feature/prunable-symmetric-hash-join' of https://github…
metesynnada Feb 6, 2023
05beacf
After merge resolution, before proto update
metesynnada Feb 6, 2023
f59018b
Revery unnecessary changes in some exprs
metesynnada Feb 6, 2023
f333d6a
Remove support indicators to interval library, rename module to use t…
ozankabak Feb 8, 2023
34c71e8
Simplify PipelineFixer, remove clones, improve comments
ozankabak Feb 8, 2023
8ea0400
Enhance the symmetric hash join code with reviews
metesynnada Feb 8, 2023
76440a2
Revamp according to reviews
metesynnada Feb 8, 2023
9a66872
Use a simple, stateless, one-liner DFS to check for IA support
ozankabak Feb 8, 2023
f9dfd77
Move test function to a test_utils module
ozankabak Feb 8, 2023
7e250df
Simplify DAG creation code
ozankabak Feb 9, 2023
f7c216f
Reducing code change
metesynnada Feb 9, 2023
2472446
Comment improvements and simplifications
ozankabak Feb 9, 2023
263eab0
Revamp SortedFilterExpr usage and enhance comments
metesynnada Feb 10, 2023
ec6e5ad
Update fifo.rs
metesynnada Feb 10, 2023
35c04a0
Remove unnecessary clones, improve comments and code structure
ozankabak Feb 11, 2023
0743eb6
Remove leaf searches from CP iterations, improve code
ozankabak Feb 12, 2023
a9a4912
Bug fix in cp_solver, revamp some comments
metesynnada Feb 13, 2023
52e143d
Update with correct testing
metesynnada Feb 13, 2023
5ec2eae
Test for future support on fuzzy matches between exprs
metesynnada Feb 14, 2023
175133b
Compute connected nodes in CP solver via a DFS, improve comments
ozankabak Feb 17, 2023
fc83386
Revamp OneSideHashJoin constructor and new unit test
metesynnada Feb 17, 2023
01c3434
Merge branch 'feature/prunable-symmetric-hash-join' of https://github…
metesynnada Feb 17, 2023
3bef26d
Update on concat_batches usage
metesynnada Feb 17, 2023
537bbe5
Revamping according to comments.
metesynnada Feb 27, 2023
70a9905
Simplifications, refactoring
ozankabak Feb 28, 2023
26ea524
Merge pull request #54 from synnada-ai/feature/join-improvements
metesynnada Feb 28, 2023
55cffd1
Merge branch 'feature/prunable-symmetric-hash-join' of https://github…
metesynnada Feb 28, 2023
adbc128
Minor fix
metesynnada Feb 28, 2023
f95e3a6
Fix typo in the new_zero function
ozankabak Feb 28, 2023
766da03
Merge branch 'main' into feature/prunable-symmetric-hash-join
ozankabak Feb 28, 2023
970b124
Merge branch 'main' into feature/prunable-symmetric-hash-join
ozankabak Feb 28, 2023
ff4cbf9
Merge branch 'main' into feature/prunable-symmetric-hash-join
ozankabak Mar 1, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,29 @@ impl ScalarValue {
Self::List(scalars, Box::new(Field::new("item", child_type, true)))
}

// Create a zero value in the given type.
pub fn new_zero(datatype: &DataType) -> Result<ScalarValue> {
assert!(datatype.is_primitive());
Ok(match datatype {
DataType::Boolean => ScalarValue::Boolean(Some(false)),
DataType::Int8 => ScalarValue::Int8(Some(0)),
DataType::Int16 => ScalarValue::Int16(Some(0)),
DataType::Int32 => ScalarValue::Int32(Some(0)),
DataType::Int64 => ScalarValue::Int64(Some(0)),
DataType::UInt8 => ScalarValue::UInt8(Some(0)),
DataType::UInt16 => ScalarValue::UInt16(Some(0)),
DataType::UInt32 => ScalarValue::UInt32(Some(0)),
DataType::UInt64 => ScalarValue::UInt64(Some(0)),
DataType::Float32 => ScalarValue::UInt64(Some(0)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be float32 and float64?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, that's a typo -- just sent a commit to fix it as well as the new merge conflicts.

DataType::Float64 => ScalarValue::UInt64(Some(0)),
_ => {
return Err(DataFusionError::NotImplemented(format!(
"Can't create a zero scalar from data_type \"{datatype:?}\""
)));
}
})
}

/// Getter for the `DataType` of the value
pub fn get_datatype(&self) -> DataType {
match self {
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1584,6 +1584,9 @@ impl SessionState {
// repartitioning and local sorting steps to meet distribution and ordering requirements.
// Therefore, it should run before EnforceDistribution and EnforceSorting.
Arc::new(JoinSelection::new()),
// Enforce sort before PipelineFixer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why doess the enforcement happens before pipeline fixer (and not for example, also before JoinSelection)?

Copy link
Contributor

@ozankabak ozankabak Feb 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We haven't focused on optimal rule ordering yet. Right now, we only enable SHJ when joining two unbounded streams, so it has to happen at some point before PipelineFixer. As we mature the SHJ implementation, we will enable it even for normal tables when it is appropriate (i.e. yields performance gains). Within that context, we will revisit both rule internal logic and ordering.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the filter expression is incapable of pruning both sides, bounded sources will not experience any performance boost since there is no sliding window effect. We plan to include a check for whether the filter expression supports range pruning on both sides in a future PR. Once this check is in place, we can evaluate the feasibility of applying this plan to bounded sources.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the comment for PipelineFixer need to be adjusted.

// If the query is processing infinite inputs, the PipelineFixer rule applies the
// necessary transformations to make the query runnable (if it is not already runnable).
// If the query can not be made runnable, the rule emits an error with a diagnostic message.
// Since the transformations it applies may alter output partitioning properties of operators
// (e.g. by swapping hash join sides), this rule runs before EnforceDistribution.

Arc::new(EnforceDistribution::new()),
Arc::new(EnforceSorting::new()),
// If the query is processing infinite inputs, the PipelineFixer rule applies the
// necessary transformations to make the query runnable (if it is not already runnable).
// If the query can not be made runnable, the rule emits an error with a diagnostic message.
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ mod sql_tests {
FROM test
LIMIT 5".to_string(),
cases: vec![Arc::new(test1), Arc::new(test2)],
error_operator: "Window Error".to_string()
error_operator: "Sort Error".to_string()
};

case.run().await?;
Expand All @@ -328,7 +328,7 @@ mod sql_tests {
SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1
FROM test".to_string(),
cases: vec![Arc::new(test1), Arc::new(test2)],
error_operator: "Window Error".to_string()
error_operator: "Sort Error".to_string()
};
case.run().await?;
Ok(())
Expand Down
173 changes: 159 additions & 14 deletions datafusion/core/src/physical_optimizer/pipeline_fixer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,19 @@ use crate::physical_optimizer::pipeline_checker::{
check_finiteness_requirements, PipelineStatePropagator,
};
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::joins::{HashJoinExec, PartitionMode};
use crate::physical_plan::joins::utils::JoinSide;
use crate::physical_plan::joins::{
convert_sort_expr_with_filter_schema, HashJoinExec, PartitionMode,
SymmetricHashJoinExec,
};
use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::ExecutionPlan;
use datafusion_common::DataFusionError;
use datafusion_expr::logical_plan::JoinType;
use datafusion_physical_expr::expressions::{BinaryExpr, CastExpr, Column, Literal};
use datafusion_physical_expr::intervals::{is_datatype_supported, is_operator_supported};
use datafusion_physical_expr::PhysicalExpr;

use std::sync::Arc;

/// The [PipelineFixer] rule tries to modify a given plan so that it can
Expand All @@ -48,17 +56,24 @@ impl PipelineFixer {
Self {}
}
}
/// [PipelineFixer] subrules are functions of this type. Such functions take a
/// single [PipelineStatePropagator] argument, which stores state variables
/// indicating the unboundedness status of the current [ExecutionPlan] as
/// the [PipelineFixer] rule traverses the entire plan tree.
type PipelineFixerSubrule =
dyn Fn(&PipelineStatePropagator) -> Option<Result<PipelineStatePropagator>>;
dyn Fn(PipelineStatePropagator) -> Option<Result<PipelineStatePropagator>>;
Comment on lines 63 to +64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add some comments for the type definition dyn Fn(PipelineStatePropagator) -> Option<Result<PipelineStatePropagator>> ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do.


impl PhysicalOptimizerRule for PipelineFixer {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let pipeline = PipelineStatePropagator::new(plan);
let physical_optimizer_subrules: Vec<Box<PipelineFixerSubrule>> =
vec![Box::new(hash_join_swap_subrule)];
let physical_optimizer_subrules: Vec<Box<PipelineFixerSubrule>> = vec![
Box::new(hash_join_convert_symmetric_subrule),
Box::new(hash_join_swap_subrule),
];
let state = pipeline.transform_up(&|p| {
apply_subrules_and_check_finiteness_requirements(
p,
Expand All @@ -77,6 +92,104 @@ impl PhysicalOptimizerRule for PipelineFixer {
}
}

/// Indicates whether interval arithmetic is supported for the given expression.
/// Currently, we do not support all [PhysicalExpr]s for interval calculations.
/// We do not support every type of [Operator]s either. Over time, this check
/// will relax as more types of [PhysicalExpr]s and [Operator]s are supported.
/// Currently, [CastExpr], [BinaryExpr], [Column] and [Literal] is supported.
fn check_support(expr: &Arc<dyn PhysicalExpr>) -> bool {
let expr_any = expr.as_any();
let expr_supported = if let Some(binary_expr) = expr_any.downcast_ref::<BinaryExpr>()
{
is_operator_supported(binary_expr.op())
} else {
expr_any.is::<Column>() || expr_any.is::<Literal>() || expr_any.is::<CastExpr>()
};
expr_supported && expr.children().iter().all(check_support)
}

/// This function returns whether a given hash join is replaceable by a
/// symmetric hash join. Basically, the requirement is that involved
/// [PhysicalExpr]s, [Operator]s and data types need to be supported,
/// and order information must cover every column in the filter expression.
fn is_suitable_for_symmetric_hash_join(hash_join: &HashJoinExec) -> Result<bool> {
if let Some(filter) = hash_join.filter() {
let left = hash_join.left();
if let Some(left_ordering) = left.output_ordering() {
let right = hash_join.right();
if let Some(right_ordering) = right.output_ordering() {
let expr_supported = check_support(filter.expression());
let left_convertible = convert_sort_expr_with_filter_schema(
&JoinSide::Left,
filter,
&left.schema(),
&left_ordering[0],
)?
.is_some();
let right_convertible = convert_sort_expr_with_filter_schema(
&JoinSide::Right,
filter,
&right.schema(),
&right_ordering[0],
)?
.is_some();
let fields_supported = filter
.schema()
.fields()
.iter()
.all(|f| is_datatype_supported(f.data_type()));
return Ok(expr_supported
&& fields_supported
&& left_convertible
&& right_convertible);
}
}
}
Ok(false)
}

/// This subrule checks if one can replace a hash join with a symmetric hash
/// join so that the pipeline does not break due to the join operation in
/// question. If possible, it makes this replacement; otherwise, it has no
/// effect.
fn hash_join_convert_symmetric_subrule(
input: PipelineStatePropagator,
) -> Option<Result<PipelineStatePropagator>> {
let plan = input.plan;
if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
let ub_flags = input.children_unbounded;
let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why this PR limits the use of symmetric hash join to unbounded streams? It seems it could be used for any input where the sort conditions are reasonable, given

  1. the very good numbers reported in https://synnada.notion.site/synnada/General-purpose-Stream-Joins-via-Pruning-Symmetric-Hash-Joins-2fe26d3127a241e294a0217b1f18603a
  2. the fact that this algorithm is general for any sorted stream (not just the unbounded ones)

Copy link
Contributor

@ozankabak ozankabak Feb 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned in the rule-related comment, this limit is temporary. We will enable SHJ for other cases in one of the follow-on PRs 🚀

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move the join selection logic to JoinSelection rule ?

Copy link
Contributor

@mingmwang mingmwang Feb 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems currently we can not move the hash_join_convert_symmetric_subrule logic to the JoinSelection rule. But I think the hash_join_swap_subrule logic can be moved to
the JoinSelection rule. Since the original JoinSelection already has logic to swap join sides based on estimated stats/size, it does not take the bounded/unbounded inputs into the consideration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will think about this. I don't see a straightforward way right away but I will talk to @metesynnada about it in detail. If this turns out to be possible, we can make a follow-on PR about it.

let new_plan = if left_unbounded && right_unbounded {
match is_suitable_for_symmetric_hash_join(hash_join) {
Ok(true) => SymmetricHashJoinExec::try_new(
hash_join.left().clone(),
hash_join.right().clone(),
hash_join
.on()
.iter()
.map(|(l, r)| (l.clone(), r.clone()))
.collect(),
hash_join.filter().unwrap().clone(),
hash_join.join_type(),
hash_join.null_equals_null(),
)
.map(|e| Arc::new(e) as _),
Ok(false) => Ok(plan),
Err(e) => return Some(Err(e)),
}
} else {
Ok(plan)
};
Some(new_plan.map(|plan| PipelineStatePropagator {
plan,
unbounded: left_unbounded || right_unbounded,
children_unbounded: ub_flags,
}))
} else {
None
}
}

/// This subrule will swap build/probe sides of a hash join depending on whether its inputs
/// may produce an infinite stream of records. The rule ensures that the left (build) side
/// of the hash join always operates on an input stream that will produce a finite set of.
Expand Down Expand Up @@ -119,12 +232,12 @@ impl PhysicalOptimizerRule for PipelineFixer {
///
/// ```
fn hash_join_swap_subrule(
input: &PipelineStatePropagator,
input: PipelineStatePropagator,
) -> Option<Result<PipelineStatePropagator>> {
let plan = input.plan.clone();
let children = &input.children_unbounded;
let plan = input.plan;
if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
let (left_unbounded, right_unbounded) = (children[0], children[1]);
let ub_flags = input.children_unbounded;
let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]);
let new_plan = if left_unbounded && !right_unbounded {
if matches!(
*hash_join.join_type(),
Expand All @@ -140,12 +253,11 @@ fn hash_join_swap_subrule(
} else {
Ok(plan)
};
let new_state = new_plan.map(|plan| PipelineStatePropagator {
Some(new_plan.map(|plan| PipelineStatePropagator {
plan,
unbounded: left_unbounded || right_unbounded,
children_unbounded: vec![left_unbounded, right_unbounded],
});
Some(new_state)
children_unbounded: ub_flags,
}))
} else {
None
}
Expand Down Expand Up @@ -182,13 +294,46 @@ fn apply_subrules_and_check_finiteness_requirements(
physical_optimizer_subrules: &Vec<Box<PipelineFixerSubrule>>,
) -> Result<Option<PipelineStatePropagator>> {
for sub_rule in physical_optimizer_subrules {
if let Some(value) = sub_rule(&input).transpose()? {
if let Some(value) = sub_rule(input.clone()).transpose()? {
input = value;
}
}
check_finiteness_requirements(input)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the method apply_subrules_and_check_finiteness_requirements()

I would suggest to make the input immutable. If there is changes to the plan/struct, return the changed (new)plan/struct.

fn apply_subrules_and_check_finiteness_requirements(
    mut input: PipelineStatePropagator,
    physical_optimizer_subrules: &Vec<Box<PipelineFixerSubrule>>,
)

Suggest change to

fn apply_subrules_and_check_finiteness_requirements(
    input: PipelineStatePropagator,
    physical_optimizer_subrules: &[Box<PipelineFixerSubrule>],
) -> Result<Option<PipelineStatePropagator>> {
    let after_op =
        physical_optimizer_subrules
            .iter()
            .try_fold(input, |pipeline, sub_rule| {
                if let Some(value) = sub_rule(&pipeline).transpose()? {
                    Result::<_, DataFusionError>::Ok(value)
                } else {
                    Result::<_, DataFusionError>::Ok(pipeline)
                }
            })?;
    check_finiteness_requirements(after_op)
}

Copy link
Contributor

@ozankabak ozankabak Feb 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried it, but this creates an ownership problem in the closure (since sub_rule does not take a reference as its argument). We can make this work if we change the sub-rule return type so that it always returns an PipelineStatePropagator (by returning the argument unchanged when nothing is modified), and not an Option. Maybe we can explore this in a refactor PR.

BTW, when we were investigating clone calls, we came across a similar situation in some use cases involving transform_up/transform_down. Maybe we can consider all of these things within that refactor PR and discuss in detail. Will keep this in mind, thanks for pointing it out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, you can do all code refinement in following PR.
BTW, in your use case, do you use Ballista as the distributed engine ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we will be using Ballista. We haven't contributed much to Ballista yet as we are still focusing on fundamental streaming execution primitives, but will increase our focus on Ballista over time.

Copy link
Contributor

@mingmwang mingmwang Feb 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to know. In the past, there was some discussion to enhance Ballista to support both BATCH/STREAMING execution models:

https://docs.google.com/document/d/1OdAe078axk4qO0ozUxNqBMD4wKoBhzh9keMuLp_jerE/edit#
http://www.vldb.org/pvldb/vol11/p746-yin.pdf
I haven't working on Ballista since last year and there is no progress in this area.

And in the latest Flink release, they had implement similar features(Bubble execution model, hybird shuffle etc).
https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode

I think generally we can follow Flink's approach to make both DataFusion and Ballista support BATCH/STREAMING execution models. In the high level, we can have different models(BATCH vs STREAMING), and user can specify the execution model. In the physical planing phase, we have BatchPlanner and StreamingPlanner, they can share some common rules, and batch and streaming planners can have their own rules.
In the ExecutionPlan trait, we can have another trait to indicate some operators are Source operators, the source operators can be BOUNDED or UNBOUNDED. BOUNDED or UNBOUNDED should be a property available to Source operators only.

https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API

Copy link
Contributor

@ozankabak ozankabak Feb 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have made significant progress in foundational streaming execution support on the Datafusion side over the last few months. For example, you can already mark sources as infinite/unbounded (check out the tests with FIFO files). Every ExecutionPlan then propagates this information upwards, so we make adjustments to (or optimize) the plan in an intelligent way depending on whether we are dealing with a stream.

We will write a blog post about this soon, I will share with you when it is done so you can get up to speed on what we've done already. I think it would be also good to have a meeting with everyone interested in this to discuss future plans and designs.


#[cfg(test)]
mod util_tests {
use crate::physical_optimizer::pipeline_fixer::check_support;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{BinaryExpr, Column, NegativeExpr};
use datafusion_physical_expr::PhysicalExpr;
use std::sync::Arc;

#[test]
fn check_expr_supported() {
let supported_expr = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Plus,
Arc::new(Column::new("a", 0)),
)) as Arc<dyn PhysicalExpr>;
assert!(check_support(&supported_expr));
let supported_expr_2 = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
assert!(check_support(&supported_expr_2));
let unsupported_expr = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Or,
Arc::new(Column::new("a", 0)),
)) as Arc<dyn PhysicalExpr>;
assert!(!check_support(&unsupported_expr));
let unsupported_expr_2 = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Or,
Arc::new(NegativeExpr::new(Arc::new(Column::new("a", 0)))),
)) as Arc<dyn PhysicalExpr>;
assert!(!check_support(&unsupported_expr_2));
}
}

#[cfg(test)]
mod hash_join_tests {
use super::*;
Expand Down Expand Up @@ -574,7 +719,7 @@ mod hash_join_tests {
children_unbounded: vec![left_unbounded, right_unbounded],
};
let optimized_hash_join =
hash_join_swap_subrule(&initial_hash_join_state).unwrap()?;
hash_join_swap_subrule(initial_hash_join_state).unwrap()?;
let optimized_join_plan = optimized_hash_join.plan;

// If swap did happen
Expand Down
Loading