Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions datafusion/sql/src/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,18 +149,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
)?;

let func_deps = schema.functional_dependencies();
// Find whether ties are possible in the given ordering:
let is_ordering_strict = order_by.iter().any(|orderby_expr| {
// Find whether ties are possible in the given ordering
let is_ordering_strict = order_by.iter().find_map(|orderby_expr| {
if let Expr::Sort(sort_expr) = orderby_expr {
if let Expr::Column(col) = sort_expr.expr.as_ref() {
let idx = schema.index_of_column(col).unwrap();
return func_deps.iter().any(|dep| {
let idx = schema.index_of_column(col).ok()?;
return if func_deps.iter().any(|dep| {
dep.source_indices == vec![idx]
&& dep.mode == Dependency::Single
});
}) {
Some(true)
} else {
Some(false)
};
}
}
false
Some(false)
});

let window_frame = window
Expand All @@ -176,8 +180,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let window_frame = if let Some(window_frame) = window_frame {
regularize_window_order_by(&window_frame, &mut order_by)?;
window_frame
} else if is_ordering_strict {
WindowFrame::new(Some(true))
} else if let Some(is_ordering_strict) = is_ordering_strict {
WindowFrame::new(Some(is_ordering_strict))
} else {
WindowFrame::new((!order_by.is_empty()).then_some(false))
};
Expand Down
23 changes: 23 additions & 0 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4343,6 +4343,29 @@ fn test_multi_grouping_sets() {
quick_test(sql, expected);
}

#[test]
fn test_field_not_found_window_function() {
let order_by_sql = "SELECT count() OVER (order by a);";
let order_by_err = logical_plan(order_by_sql).expect_err("query should have failed");
assert_eq!(
"Schema error: No field named a.",
order_by_err.strip_backtrace()
);

let partition_by_sql = "SELECT count() OVER (PARTITION BY a);";
let partition_by_err =
logical_plan(partition_by_sql).expect_err("query should have failed");
assert_eq!(
"Schema error: No field named a.",
partition_by_err.strip_backtrace()
);

let qualified_sql =
"SELECT order_id, MAX(qty) OVER (PARTITION BY orders.order_id) from orders";
let expected = "Projection: orders.order_id, MAX(orders.qty) PARTITION BY [orders.order_id] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING\n WindowAggr: windowExpr=[[MAX(orders.qty) PARTITION BY [orders.order_id] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\n TableScan: orders";
quick_test(qualified_sql, expected);
}

fn assert_field_not_found(err: DataFusionError, name: &str) {
match err {
DataFusionError::SchemaError { .. } => {
Expand Down