Skip to content

Add TOP clause support #1060

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/core-executor/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use df_catalog::information_schema::session_params::SessionProperty;
use embucket_functions::semi_structured::variant::visitors::visit_all;
use embucket_functions::visitors::{
copy_into_identifiers, functions_rewriter, inline_aliases_in_query, json_element,
select_expr_aliases, table_result_scan,
select_expr_aliases, table_result_scan, top_limit,
unimplemented::functions_checker::visit as unimplemented_functions_checker,
};
use iceberg_rust::catalog::Catalog;
Expand Down Expand Up @@ -182,6 +182,7 @@ impl UserQuery {
if let DFStatement::Statement(value) = statement {
json_element::visit(value);
functions_rewriter::visit(value);
top_limit::visit(value);
unimplemented_functions_checker(value)
// Can't use context here since underlying Error require handling
.map_err(|e| {
Expand Down
1 change: 1 addition & 0 deletions crates/core-executor/src/tests/sql/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
mod pivot;
mod top;
mod unpivot;
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
source: crates/core-executor/src/tests/sql/commands/top.rs
description: "\"SELECT TOP 4 c1 FROM testtable ORDER BY c1\""
info: "Setup queries: CREATE OR REPLACE TABLE testtable (c1 STRING); INSERT INTO testtable (c1) VALUES ('1'), ('2'), ('3'), ('20'), ('19'), ('18'), ('1'), ('2'), ('3'), ('4'), (NULL), ('30'), (NULL)"
---
Ok(
[
"+----+",
"| c1 |",
"+----+",
"| 1 |",
"| 1 |",
"| 18 |",
"| 19 |",
"+----+",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
source: crates/core-executor/src/tests/sql/commands/top.rs
description: "\"WITH cte AS (SELECT TOP 2 c1 FROM testtable ORDER BY c1) SELECT * FROM cte ORDER BY c1\""
info: "Setup queries: CREATE OR REPLACE TABLE testtable (c1 STRING); INSERT INTO testtable (c1) VALUES ('1'), ('2'), ('3'), ('20'), ('19'), ('18'), ('1'), ('2'), ('3'), ('4'), (NULL), ('30'), (NULL)"
---
Ok(
[
"+----+",
"| c1 |",
"+----+",
"| 1 |",
"| 1 |",
"+----+",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
source: crates/core-executor/src/tests/sql/commands/top.rs
description: "\"SELECT c1 FROM (SELECT TOP 3 c1 FROM testtable ORDER BY c1) sub ORDER BY c1 DESC\""
info: "Setup queries: CREATE OR REPLACE TABLE testtable (c1 STRING); INSERT INTO testtable (c1) VALUES ('1'), ('2'), ('3'), ('20'), ('19'), ('18'), ('1'), ('2'), ('3'), ('4'), (NULL), ('30'), (NULL)"
---
Ok(
[
"+----+",
"| c1 |",
"+----+",
"| 18 |",
"| 1 |",
"| 1 |",
"+----+",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
source: crates/core-executor/src/tests/sql/commands/top.rs
description: "\"SELECT c1 FROM (SELECT TOP 1 c1 FROM testtable ORDER BY c1 ASC) UNION ALL (SELECT TOP 1 c1 FROM testtable ORDER BY c1 DESC) ORDER BY c1\""
info: "Setup queries: CREATE OR REPLACE TABLE testtable (c1 STRING); INSERT INTO testtable (c1) VALUES ('1'), ('2'), ('3'), ('20'), ('19'), ('18'), ('1'), ('2'), ('3'), ('4'), (NULL), ('30'), (NULL)"
---
Ok(
[
"+----+",
"| c1 |",
"+----+",
"| 1 |",
"| |",
"+----+",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
source: crates/core-executor/src/tests/sql/commands/top.rs
description: "\"SELECT TOP 2 c1 FROM (SELECT TOP 4 c1 FROM testtable ORDER BY c1) sub ORDER BY c1\""
info: "Setup queries: CREATE OR REPLACE TABLE testtable (c1 STRING); INSERT INTO testtable (c1) VALUES ('1'), ('2'), ('3'), ('20'), ('19'), ('18'), ('1'), ('2'), ('3'), ('4'), (NULL), ('30'), (NULL)"
---
Ok(
[
"+----+",
"| c1 |",
"+----+",
"| 1 |",
"| 1 |",
"+----+",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
source: crates/core-executor/src/tests/sql/commands/top.rs
description: "\"SELECT TOP 5 c1 FROM testtable ORDER BY c1\""
info: "Setup queries: CREATE OR REPLACE TABLE testtable (c1 STRING); INSERT INTO testtable (c1) VALUES ('1'), ('2'), ('3'), ('20'), ('19'), ('18'), ('1'), ('2'), ('3'), ('4'), (NULL), ('30'), (NULL)"
---
Ok(
[
"+----+",
"| c1 |",
"+----+",
"| 1 |",
"| 1 |",
"| 18 |",
"| 19 |",
"| 2 |",
"+----+",
],
)
48 changes: 48 additions & 0 deletions crates/core-executor/src/tests/sql/commands/top.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use crate::test_query;

const SETUP_QUERY: [&str; 2] = [
"CREATE OR REPLACE TABLE testtable (c1 STRING)",
"INSERT INTO testtable (c1) VALUES ('1'), ('2'), ('3'), ('20'), ('19'), ('18'), ('1'), ('2'), ('3'), ('4'), (NULL), ('30'), (NULL)",
];

test_query!(
top_basic,
"SELECT TOP 4 c1 FROM testtable ORDER BY c1",
setup_queries = [SETUP_QUERY[0], SETUP_QUERY[1]],
snapshot_path = "top"
);

test_query!(
top_in_subquery,
"SELECT c1 FROM (SELECT TOP 3 c1 FROM testtable ORDER BY c1) sub ORDER BY c1 DESC",
setup_queries = [SETUP_QUERY[0], SETUP_QUERY[1]],
snapshot_path = "top"
);

test_query!(
top_in_cte,
"WITH cte AS (SELECT TOP 2 c1 FROM testtable ORDER BY c1) SELECT * FROM cte ORDER BY c1",
setup_queries = [SETUP_QUERY[0], SETUP_QUERY[1]],
snapshot_path = "top"
);

test_query!(
top_in_union,
"SELECT c1 FROM (SELECT TOP 1 c1 FROM testtable ORDER BY c1 ASC) UNION ALL (SELECT TOP 1 c1 FROM testtable ORDER BY c1 DESC) ORDER BY c1",
setup_queries = [SETUP_QUERY[0], SETUP_QUERY[1]],
snapshot_path = "top"
);

test_query!(
top_with_no_order,
"SELECT TOP 5 c1 FROM testtable ORDER BY c1",
setup_queries = [SETUP_QUERY[0], SETUP_QUERY[1]],
snapshot_path = "top"
);

test_query!(
top_with_limit,
"SELECT TOP 2 c1 FROM (SELECT TOP 4 c1 FROM testtable ORDER BY c1) sub ORDER BY c1",
setup_queries = [SETUP_QUERY[0], SETUP_QUERY[1]],
snapshot_path = "top"
);
2 changes: 2 additions & 0 deletions crates/embucket-functions/src/visitors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ pub mod json_element;
pub mod select_expr_aliases;
pub mod table_result_scan;
pub mod unimplemented;

pub mod top_limit;
70 changes: 70 additions & 0 deletions crates/embucket-functions/src/visitors/top_limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use datafusion_expr::sqlparser::ast::VisitMut;
use datafusion_expr::sqlparser::ast::{
Expr, Query, SetExpr, Statement, TableFactor, TopQuantity, Value, VisitorMut,
};
use std::ops::ControlFlow;

#[derive(Debug, Default)]
pub struct TopLimitVisitor;

impl TopLimitVisitor {
fn process_set_expr(&mut self, set_expr: &mut SetExpr, outer_limit: &mut Option<Expr>) {
match set_expr {
SetExpr::Select(select) => {
for table_with_joins in &mut select.from {
if let TableFactor::Derived { subquery, .. } = &mut table_with_joins.relation {
self.process_query(subquery);
}
}

if let Some(top) = select.top.take() {
if !top.percent && !top.with_ties {
if outer_limit.is_none() {
if let Some(expr) = top.quantity.map(|q| match q {
TopQuantity::Expr(expr) => expr,
TopQuantity::Constant(n) => Expr::Value(
Value::Number(n.to_string(), false).with_empty_span(),
),
}) {
*outer_limit = Some(expr);
}
}
} else {
select.top = Some(top);
}
}
}
SetExpr::Query(q) => self.process_query(q),
SetExpr::SetOperation { left, right, .. } => {
self.process_set_expr(left, outer_limit);
self.process_set_expr(right, outer_limit);
}
_ => {}
}
}

fn process_query(&mut self, query: &mut Query) {
if let Some(with) = query.with.as_mut() {
for cte in &mut with.cte_tables {
self.process_query(&mut cte.query);
}
}

self.process_set_expr(&mut query.body, &mut query.limit);
}
}

impl VisitorMut for TopLimitVisitor {
type Break = ();

fn pre_visit_statement(&mut self, stmt: &mut Statement) -> ControlFlow<Self::Break> {
if let Statement::Query(query) = stmt {
self.process_query(query);
}
ControlFlow::Continue(())
}
}

pub fn visit(stmt: &mut Statement) {
let _ = stmt.visit(&mut TopLimitVisitor);
}