Skip to content

Commit

Permalink
Return an error when a column does not exist in window function (#9202)
Browse files Browse the repository at this point in the history
* fix: return err instead of panic

Signed-off-by: Hoang Pham <a2k40pbc@gmail.com>

* fix: update error message

Signed-off-by: Hoang Pham <a2k40pbc@gmail.com>

* fix: use find_map instead of any

Signed-off-by: Hoang Pham <a2k40pbc@gmail.com>

* fix: cargo fmt to format the code

Signed-off-by: Hoang Pham <a2k40pbc@gmail.com>

* fix: update is_ordering_strict checking logic

Signed-off-by: Hoang Pham <a2k40pbc@gmail.com>

* test: add test_field_not_found_window_function

Signed-off-by: Hoang Pham <a2k40pbc@gmail.com>

* feat: simplify the logic of creating new WindowFrame

Signed-off-by: Hoang Pham <a2k40pbc@gmail.com>

* fix: simplify error handling code

Signed-off-by: Hoang Pham <a2k40pbc@gmail.com>

* add more test on window_function

Signed-off-by: Hoang Pham <a2k40pbc@gmail.com>

---------

Signed-off-by: Hoang Pham <a2k40pbc@gmail.com>
  • Loading branch information
PhVHoang authored Feb 14, 2024
1 parent 528fdfb commit 0444dad
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 8 deletions.
20 changes: 12 additions & 8 deletions datafusion/sql/src/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,18 +149,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
)?;

let func_deps = schema.functional_dependencies();
// Find whether ties are possible in the given ordering:
let is_ordering_strict = order_by.iter().any(|orderby_expr| {
// Find whether ties are possible in the given ordering
let is_ordering_strict = order_by.iter().find_map(|orderby_expr| {
if let Expr::Sort(sort_expr) = orderby_expr {
if let Expr::Column(col) = sort_expr.expr.as_ref() {
let idx = schema.index_of_column(col).unwrap();
return func_deps.iter().any(|dep| {
let idx = schema.index_of_column(col).ok()?;
return if func_deps.iter().any(|dep| {
dep.source_indices == vec![idx]
&& dep.mode == Dependency::Single
});
}) {
Some(true)
} else {
Some(false)
};
}
}
false
Some(false)
});

let window_frame = window
Expand All @@ -176,8 +180,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let window_frame = if let Some(window_frame) = window_frame {
regularize_window_order_by(&window_frame, &mut order_by)?;
window_frame
} else if is_ordering_strict {
WindowFrame::new(Some(true))
} else if let Some(is_ordering_strict) = is_ordering_strict {
WindowFrame::new(Some(is_ordering_strict))
} else {
WindowFrame::new((!order_by.is_empty()).then_some(false))
};
Expand Down
23 changes: 23 additions & 0 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4400,6 +4400,29 @@ fn test_multi_grouping_sets() {
quick_test(sql, expected);
}

#[test]
fn test_field_not_found_window_function() {
let order_by_sql = "SELECT count() OVER (order by a);";
let order_by_err = logical_plan(order_by_sql).expect_err("query should have failed");
assert_eq!(
"Schema error: No field named a.",
order_by_err.strip_backtrace()
);

let partition_by_sql = "SELECT count() OVER (PARTITION BY a);";
let partition_by_err =
logical_plan(partition_by_sql).expect_err("query should have failed");
assert_eq!(
"Schema error: No field named a.",
partition_by_err.strip_backtrace()
);

let qualified_sql =
"SELECT order_id, MAX(qty) OVER (PARTITION BY orders.order_id) from orders";
let expected = "Projection: orders.order_id, MAX(orders.qty) PARTITION BY [orders.order_id] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING\n WindowAggr: windowExpr=[[MAX(orders.qty) PARTITION BY [orders.order_id] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\n TableScan: orders";
quick_test(qualified_sql, expected);
}

fn assert_field_not_found(err: DataFusionError, name: &str) {
match err {
DataFusionError::SchemaError { .. } => {
Expand Down

0 comments on commit 0444dad

Please sign in to comment.