Skip to content

Commit 0368f59

Browse files
Dandandanalamb
andauthored
Allow non-equijoin filters in join condition (#660)
* Allow non-equijoin filters in join condition * Revert change to query * Fix, only do for inner join * Add test * docs update * Update test name Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * Add negative test Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent fdf41ad commit 0368f59

File tree

2 files changed

+81
-24
lines changed

2 files changed

+81
-24
lines changed

datafusion/src/sql/planner.rs

Lines changed: 59 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -368,15 +368,34 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
368368
// parse ON expression
369369
let expr = self.sql_to_rex(sql_expr, &join_schema)?;
370370

371+
// expression that didn't match equi-join pattern
372+
let mut filter = vec![];
373+
371374
// extract join keys
372-
extract_join_keys(&expr, &mut keys)?;
375+
extract_join_keys(&expr, &mut keys, &mut filter);
373376

374377
let (left_keys, right_keys): (Vec<Column>, Vec<Column>) =
375378
keys.into_iter().unzip();
376379
// return the logical plan representing the join
377-
LogicalPlanBuilder::from(left)
378-
.join(right, join_type, left_keys, right_keys)?
380+
let join = LogicalPlanBuilder::from(left)
381+
.join(right, join_type, left_keys, right_keys)?;
382+
383+
if filter.is_empty() {
384+
join.build()
385+
} else if join_type == JoinType::Inner {
386+
join.filter(
387+
filter
388+
.iter()
389+
.skip(1)
390+
.fold(filter[0].clone(), |acc, e| acc.and(e.clone())),
391+
)?
379392
.build()
393+
} else {
394+
Err(DataFusionError::NotImplemented(format!(
395+
"Unsupported expressions in {:?} JOIN: {:?}",
396+
join_type, filter
397+
)))
398+
}
380399
}
381400
JoinConstraint::Using(idents) => {
382401
let keys: Vec<Column> = idents
@@ -1550,39 +1569,41 @@ fn remove_join_expressions(
15501569
}
15511570
}
15521571

1553-
/// Parse equijoin ON condition which could be a single Eq or multiple conjunctive Eqs
1572+
/// Extracts equijoin ON condition be a single Eq or multiple conjunctive Eqs
1573+
/// Filters matching this pattern are added to `accum`
1574+
/// Filters that don't match this pattern are added to `accum_filter`
1575+
/// Examples:
15541576
///
1555-
/// Examples
1577+
/// foo = bar => accum=[(foo, bar)] accum_filter=[]
1578+
/// foo = bar AND bar = baz => accum=[(foo, bar), (bar, baz)] accum_filter=[]
1579+
/// foo = bar AND baz > 1 => accum=[(foo, bar)] accum_filter=[baz > 1]
15561580
///
1557-
/// foo = bar
1558-
/// foo = bar AND bar = baz AND ...
1559-
///
1560-
fn extract_join_keys(expr: &Expr, accum: &mut Vec<(Column, Column)>) -> Result<()> {
1581+
fn extract_join_keys(
1582+
expr: &Expr,
1583+
accum: &mut Vec<(Column, Column)>,
1584+
accum_filter: &mut Vec<Expr>,
1585+
) {
15611586
match expr {
15621587
Expr::BinaryExpr { left, op, right } => match op {
15631588
Operator::Eq => match (left.as_ref(), right.as_ref()) {
15641589
(Expr::Column(l), Expr::Column(r)) => {
15651590
accum.push((l.clone(), r.clone()));
1566-
Ok(())
15671591
}
1568-
other => Err(DataFusionError::SQL(ParserError(format!(
1569-
"Unsupported expression '{:?}' in JOIN condition",
1570-
other
1571-
)))),
1592+
_other => {
1593+
accum_filter.push(expr.clone());
1594+
}
15721595
},
15731596
Operator::And => {
1574-
extract_join_keys(left, accum)?;
1575-
extract_join_keys(right, accum)
1597+
extract_join_keys(left, accum, accum_filter);
1598+
extract_join_keys(right, accum, accum_filter);
1599+
}
1600+
_other => {
1601+
accum_filter.push(expr.clone());
15761602
}
1577-
other => Err(DataFusionError::SQL(ParserError(format!(
1578-
"Unsupported expression '{:?}' in JOIN condition",
1579-
other
1580-
)))),
15811603
},
1582-
other => Err(DataFusionError::SQL(ParserError(format!(
1583-
"Unsupported expression '{:?}' in JOIN condition",
1584-
other
1585-
)))),
1604+
_other => {
1605+
accum_filter.push(expr.clone());
1606+
}
15861607
}
15871608
}
15881609

@@ -2702,6 +2723,20 @@ mod tests {
27022723
quick_test(sql, expected);
27032724
}
27042725

2726+
#[test]
2727+
fn equijoin_unsupported_expression() {
2728+
let sql = "SELECT id, order_id \
2729+
FROM person \
2730+
JOIN orders \
2731+
ON id = customer_id AND order_id > 1 ";
2732+
let expected = "Projection: #person.id, #orders.order_id\
2733+
\n Filter: #orders.order_id Gt Int64(1)\
2734+
\n Join: #person.id = #orders.customer_id\
2735+
\n TableScan: person projection=None\
2736+
\n TableScan: orders projection=None";
2737+
quick_test(sql, expected);
2738+
}
2739+
27052740
#[test]
27062741
fn join_with_table_name() {
27072742
let sql = "SELECT id, order_id \

datafusion/tests/sql.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1687,6 +1687,28 @@ async fn equijoin() -> Result<()> {
16871687
Ok(())
16881688
}
16891689

1690+
#[tokio::test]
1691+
async fn equijoin_and_other_condition() -> Result<()> {
1692+
let mut ctx = create_join_context("t1_id", "t2_id")?;
1693+
let sql =
1694+
"SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id AND t2_name >= 'y' ORDER BY t1_id";
1695+
let actual = execute(&mut ctx, sql).await;
1696+
let expected = vec![vec!["11", "a", "z"], vec!["22", "b", "y"]];
1697+
assert_eq!(expected, actual);
1698+
Ok(())
1699+
}
1700+
1701+
#[tokio::test]
1702+
async fn equijoin_and_unsupported_condition() -> Result<()> {
1703+
let ctx = create_join_context("t1_id", "t2_id")?;
1704+
let sql =
1705+
"SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t2_name >= 'y' ORDER BY t1_id";
1706+
let res = ctx.create_logical_plan(sql);
1707+
assert!(res.is_err());
1708+
assert_eq!(format!("{}", res.unwrap_err()), "This feature is not implemented: Unsupported expressions in Left JOIN: [#t2.t2_name GtEq Utf8(\"y\")]");
1709+
Ok(())
1710+
}
1711+
16901712
#[tokio::test]
16911713
async fn left_join() -> Result<()> {
16921714
let mut ctx = create_join_context("t1_id", "t2_id")?;

0 commit comments

Comments
 (0)