Skip to content

Commit 219b556

Browse files
committed
reimplement eliminate_outer_join
1 parent 822022d commit 219b556

File tree

4 files changed

+105
-184
lines changed

4 files changed

+105
-184
lines changed

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -999,6 +999,12 @@ pub enum JoinType {
999999
RightAnti,
10001000
}
10011001

1002+
impl JoinType {
1003+
pub fn is_outer(self) -> bool {
1004+
self == JoinType::Left || self == JoinType::Right || self == JoinType::Full
1005+
}
1006+
}
1007+
10021008
impl Display for JoinType {
10031009
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
10041010
let join_type = match self {

datafusion/optimizer/src/reduce_outer_join.rs renamed to datafusion/optimizer/src/eliminate_outer_join.rs

Lines changed: 97 additions & 182 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,17 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! Optimizer rule to reduce left/right/full join to inner join if possible.
19-
use crate::{OptimizerConfig, OptimizerRule};
18+
//! Optimizer rule to eliminate left/right/full join to inner join if possible.
19+
use crate::{utils, OptimizerConfig, OptimizerRule};
2020
use datafusion_common::{Column, DFSchema, Result};
2121
use datafusion_expr::{
2222
expr::BinaryExpr,
23-
logical_plan::{Filter, Join, JoinType, LogicalPlan, Projection},
23+
logical_plan::{Join, JoinType, LogicalPlan},
2424
utils::from_plan,
2525
};
2626
use datafusion_expr::{Expr, Operator};
2727

2828
use datafusion_expr::expr::Cast;
29-
use std::collections::HashMap;
3029
use std::sync::Arc;
3130

3231
#[derive(Default)]
@@ -39,191 +38,107 @@ impl ReduceOuterJoin {
3938
}
4039
}
4140

41+
/// Attempt to eliminate outer joins to inner joins.
42+
/// for query: select ... from a left join b on ... where b.xx = 100;
43+
/// if b.xx is null, and b.xx = 100 returns false, filtered those null rows.
44+
/// Therefore, there is no need to produce null rows for output, we can use
45+
/// inner join instead of left join.
46+
///
47+
/// Generally, an outer join can be eliminated to inner join if equals from where
48+
/// return false while any inputs are null and columns of those equals are come from
49+
/// nullable side of outer join.
4250
impl OptimizerRule for ReduceOuterJoin {
4351
fn optimize(
4452
&self,
4553
plan: &LogicalPlan,
4654
optimizer_config: &mut OptimizerConfig,
4755
) -> Result<LogicalPlan> {
48-
let mut nonnullable_cols: Vec<Column> = vec![];
56+
match plan {
57+
LogicalPlan::Filter(filter) => match filter.input().as_ref() {
58+
LogicalPlan::Join(join) => {
59+
let mut nonnullable_cols: Vec<Column> = vec![];
60+
61+
extract_nonnullable_columns(
62+
filter.predicate(),
63+
&mut nonnullable_cols,
64+
join.left.schema(),
65+
join.right.schema(),
66+
true,
67+
)?;
4968

50-
reduce_outer_join(self, plan, &mut nonnullable_cols, optimizer_config)
69+
let new_join_type = if join.join_type.is_outer() {
70+
let mut left_nonnullable = false;
71+
let mut right_nonnullable = false;
72+
for col in nonnullable_cols.iter() {
73+
if join.left.schema().field_from_column(col).is_ok() {
74+
left_nonnullable = true;
75+
}
76+
if join.right.schema().field_from_column(col).is_ok() {
77+
right_nonnullable = true;
78+
}
79+
}
80+
eliminate_outer(
81+
join.join_type,
82+
left_nonnullable,
83+
right_nonnullable,
84+
)
85+
} else {
86+
join.join_type
87+
};
88+
89+
let new_join = LogicalPlan::Join(Join {
90+
left: Arc::new((*join.left).clone()),
91+
right: Arc::new((*join.right).clone()),
92+
join_type: new_join_type,
93+
join_constraint: join.join_constraint,
94+
on: join.on.clone(),
95+
filter: join.filter.clone(),
96+
schema: join.schema.clone(),
97+
null_equals_null: join.null_equals_null,
98+
});
99+
let new_plan = from_plan(plan, &plan.expressions(), &[new_join])?;
100+
utils::optimize_children(self, &new_plan, optimizer_config)
101+
}
102+
_ => utils::optimize_children(self, plan, optimizer_config),
103+
},
104+
_ => utils::optimize_children(self, plan, optimizer_config),
105+
}
51106
}
52107

53108
fn name(&self) -> &str {
54-
"reduce_outer_join"
109+
"eliminate_outer_join"
55110
}
56111
}
57112

58-
/// Attempt to reduce outer joins to inner joins.
59-
/// for query: select ... from a left join b on ... where b.xx = 100;
60-
/// if b.xx is null, and b.xx = 100 returns false, filterd those null rows.
61-
/// Therefore, there is no need to produce null rows for output, we can use
62-
/// inner join instead of left join.
63-
///
64-
/// Generally, an outer join can be reduced to inner join if quals from where
65-
/// return false while any inputs are null and columns of those quals are come from
66-
/// nullable side of outer join.
67-
fn reduce_outer_join(
68-
_optimizer: &ReduceOuterJoin,
69-
plan: &LogicalPlan,
70-
nonnullable_cols: &mut Vec<Column>,
71-
_optimizer_config: &OptimizerConfig,
72-
) -> Result<LogicalPlan> {
73-
match plan {
74-
LogicalPlan::Filter(filter) => match filter.input().as_ref() {
75-
LogicalPlan::Join(join) => {
76-
extract_nonnullable_columns(
77-
filter.predicate(),
78-
nonnullable_cols,
79-
join.left.schema(),
80-
join.right.schema(),
81-
true,
82-
)?;
83-
Ok(LogicalPlan::Filter(Filter::try_new(
84-
filter.predicate().clone(),
85-
Arc::new(reduce_outer_join(
86-
_optimizer,
87-
filter.input(),
88-
nonnullable_cols,
89-
_optimizer_config,
90-
)?),
91-
)?))
113+
pub fn eliminate_outer(
114+
join_type: JoinType,
115+
left_nonnullable: bool,
116+
right_nonnullable: bool,
117+
) -> JoinType {
118+
let mut new_join_type = join_type;
119+
match join_type {
120+
JoinType::Left => {
121+
if right_nonnullable {
122+
new_join_type = JoinType::Inner;
92123
}
93-
_ => Ok(LogicalPlan::Filter(Filter::try_new(
94-
filter.predicate().clone(),
95-
Arc::new(reduce_outer_join(
96-
_optimizer,
97-
filter.input(),
98-
nonnullable_cols,
99-
_optimizer_config,
100-
)?),
101-
)?)),
102-
},
103-
LogicalPlan::Join(join) => {
104-
let mut new_join_type = join.join_type;
105-
106-
if join.join_type == JoinType::Left
107-
|| join.join_type == JoinType::Right
108-
|| join.join_type == JoinType::Full
109-
{
110-
let mut left_nonnullable = false;
111-
let mut right_nonnullable = false;
112-
for col in nonnullable_cols.iter() {
113-
if join.left.schema().field_from_column(col).is_ok() {
114-
left_nonnullable = true;
115-
}
116-
if join.right.schema().field_from_column(col).is_ok() {
117-
right_nonnullable = true;
118-
}
119-
}
120-
121-
match join.join_type {
122-
JoinType::Left => {
123-
if right_nonnullable {
124-
new_join_type = JoinType::Inner;
125-
}
126-
}
127-
JoinType::Right => {
128-
if left_nonnullable {
129-
new_join_type = JoinType::Inner;
130-
}
131-
}
132-
JoinType::Full => {
133-
if left_nonnullable && right_nonnullable {
134-
new_join_type = JoinType::Inner;
135-
} else if left_nonnullable {
136-
new_join_type = JoinType::Left;
137-
} else if right_nonnullable {
138-
new_join_type = JoinType::Right;
139-
}
140-
}
141-
_ => {}
142-
};
143-
}
144-
145-
let left_plan = reduce_outer_join(
146-
_optimizer,
147-
&join.left,
148-
&mut nonnullable_cols.clone(),
149-
_optimizer_config,
150-
)?;
151-
let right_plan = reduce_outer_join(
152-
_optimizer,
153-
&join.right,
154-
&mut nonnullable_cols.clone(),
155-
_optimizer_config,
156-
)?;
157-
158-
Ok(LogicalPlan::Join(Join {
159-
left: Arc::new(left_plan),
160-
right: Arc::new(right_plan),
161-
join_type: new_join_type,
162-
join_constraint: join.join_constraint,
163-
on: join.on.clone(),
164-
filter: join.filter.clone(),
165-
schema: join.schema.clone(),
166-
null_equals_null: join.null_equals_null,
167-
}))
168124
}
169-
LogicalPlan::Projection(Projection {
170-
input,
171-
expr,
172-
schema,
173-
alias: _,
174-
}) => {
175-
let projection = schema
176-
.fields()
177-
.iter()
178-
.enumerate()
179-
.map(|(i, field)| {
180-
// strip alias, as they should not be part of filters
181-
let expr = match &expr[i] {
182-
Expr::Alias(expr, _) => expr.as_ref().clone(),
183-
expr => expr.clone(),
184-
};
185-
186-
(field.qualified_name(), expr)
187-
})
188-
.collect::<HashMap<_, _>>();
189-
190-
// re-write all Columns based on this projection
191-
for col in nonnullable_cols.iter_mut() {
192-
if let Some(Expr::Column(column)) = projection.get(&col.flat_name()) {
193-
*col = column.clone();
194-
}
125+
JoinType::Right => {
126+
if left_nonnullable {
127+
new_join_type = JoinType::Inner;
195128
}
196-
197-
// optimize inner
198-
let new_input = reduce_outer_join(
199-
_optimizer,
200-
input,
201-
nonnullable_cols,
202-
_optimizer_config,
203-
)?;
204-
205-
from_plan(plan, expr, &[new_input])
206129
}
207-
_ => {
208-
let expr = plan.expressions();
209-
210-
// apply the optimization to all inputs of the plan
211-
let inputs = plan.inputs();
212-
let new_inputs = inputs
213-
.iter()
214-
.map(|plan| {
215-
reduce_outer_join(
216-
_optimizer,
217-
plan,
218-
nonnullable_cols,
219-
_optimizer_config,
220-
)
221-
})
222-
.collect::<Result<Vec<_>>>()?;
223-
224-
from_plan(plan, &expr, &new_inputs)
130+
JoinType::Full => {
131+
if left_nonnullable && right_nonnullable {
132+
new_join_type = JoinType::Inner;
133+
} else if left_nonnullable {
134+
new_join_type = JoinType::Left;
135+
} else if right_nonnullable {
136+
new_join_type = JoinType::Right;
137+
}
225138
}
139+
_ => {}
226140
}
141+
new_join_type
227142
}
228143

229144
/// Recursively traverses expr, if expr returns false when
@@ -309,9 +224,9 @@ fn extract_nonnullable_columns(
309224
)?;
310225

311226
// for query: select *** from a left join b where b.c1 ... or b.c2 ...
312-
// this can be reduced to inner join.
227+
// this can be eliminated to inner join.
313228
// for query: select *** from a left join b where a.c1 ... or b.c2 ...
314-
// this can not be reduced.
229+
// this can not be eliminated.
315230
// If columns of relation exist in both sub exprs, any columns of this relation
316231
// can be added to non nullable columns.
317232
if !left_nonnullable_cols.is_empty() && !right_nonnullable_cols.is_empty()
@@ -387,11 +302,11 @@ mod tests {
387302
}
388303

389304
#[test]
390-
fn reduce_left_with_null() -> Result<()> {
305+
fn eliminate_left_with_null() -> Result<()> {
391306
let t1 = test_table_scan_with_name("t1")?;
392307
let t2 = test_table_scan_with_name("t2")?;
393308

394-
// could not reduce to inner join
309+
// could not eliminate to inner join
395310
let plan = LogicalPlanBuilder::from(t1)
396311
.join(
397312
&t2,
@@ -412,11 +327,11 @@ mod tests {
412327
}
413328

414329
#[test]
415-
fn reduce_left_with_not_null() -> Result<()> {
330+
fn eliminate_left_with_not_null() -> Result<()> {
416331
let t1 = test_table_scan_with_name("t1")?;
417332
let t2 = test_table_scan_with_name("t2")?;
418333

419-
// reduce to inner join
334+
// eliminate to inner join
420335
let plan = LogicalPlanBuilder::from(t1)
421336
.join(
422337
&t2,
@@ -437,11 +352,11 @@ mod tests {
437352
}
438353

439354
#[test]
440-
fn reduce_right_with_or() -> Result<()> {
355+
fn eliminate_right_with_or() -> Result<()> {
441356
let t1 = test_table_scan_with_name("t1")?;
442357
let t2 = test_table_scan_with_name("t2")?;
443358

444-
// reduce to inner join
359+
// eliminate to inner join
445360
let plan = LogicalPlanBuilder::from(t1)
446361
.join(
447362
&t2,
@@ -466,11 +381,11 @@ mod tests {
466381
}
467382

468383
#[test]
469-
fn reduce_full_with_and() -> Result<()> {
384+
fn eliminate_full_with_and() -> Result<()> {
470385
let t1 = test_table_scan_with_name("t1")?;
471386
let t2 = test_table_scan_with_name("t2")?;
472387

473-
// reduce to inner join
388+
// eliminate to inner join
474389
let plan = LogicalPlanBuilder::from(t1)
475390
.join(
476391
&t2,
@@ -495,11 +410,11 @@ mod tests {
495410
}
496411

497412
#[test]
498-
fn reduce_full_with_type_cast() -> Result<()> {
413+
fn eliminate_full_with_type_cast() -> Result<()> {
499414
let t1 = test_table_scan_with_name("t1")?;
500415
let t2 = test_table_scan_with_name("t2")?;
501416

502-
// reduce to inner join
417+
// eliminate to inner join
503418
let plan = LogicalPlanBuilder::from(t1)
504419
.join(
505420
&t2,

0 commit comments

Comments
 (0)