Skip to content

Add result_scan visitor to replace table(result_scan.. #1052

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 4 additions & 61 deletions crates/core-executor/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use datafusion::execution::session_state::SessionState;
use datafusion::logical_expr::{LogicalPlan, TableSource};
use datafusion::prelude::CsvReadOptions;
use datafusion::scalar::ScalarValue;
use datafusion::sql::parser::{CreateExternalTable, DFParser, Statement as DFStatement};
use datafusion::sql::parser::{CreateExternalTable, Statement as DFStatement};
use datafusion::sql::resolve::resolve_table_references;
use datafusion::sql::sqlparser::ast::{
CreateTable as CreateTableStatement, Expr, Ident, ObjectName, Query, SchemaName, Statement,
Expand All @@ -43,8 +43,7 @@ use df_catalog::catalog::CachingCatalog;
use df_catalog::information_schema::session_params::SessionProperty;
use embucket_functions::semi_structured::variant::visitors::visit_all;
use embucket_functions::visitors::{
copy_into_identifiers, functions_rewriter, inline_aliases_in_query, json_element,
select_expr_aliases,
copy_into_identifiers, functions_rewriter, json_element, select_expr_aliases, table_function,
unimplemented::functions_checker::visit as unimplemented_functions_checker,
};
use iceberg_rust::catalog::Catalog;
Expand Down Expand Up @@ -184,7 +183,8 @@ impl UserQuery {
})?;
copy_into_identifiers::visit(value);
select_expr_aliases::visit(value);
inline_aliases_in_query::visit(value);
// inline_aliases_in_query::visit(value);
table_function::visit(value);
visit_all(value);
}
Ok(())
Expand Down Expand Up @@ -325,7 +325,6 @@ impl UserQuery {
}
Statement::Query(mut subquery) => {
self.update_qualify_in_query(subquery.as_mut());
Self::update_table_result_scan_in_query(subquery.as_mut());
self.traverse_and_update_query(subquery.as_mut()).await;
return Box::pin(self.execute_with_custom_plan(&subquery.to_string())).await;
}
Expand Down Expand Up @@ -1552,62 +1551,6 @@ impl UserQuery {
})
}

fn update_table_result_scan_in_query(query: &mut Query) {
// TODO: Add logic to get result_scan from the historical results
if let sqlparser::ast::SetExpr::Select(select) = query.body.as_mut() {
// Remove is_iceberg field since it is not supported by information_schema.tables
select.projection.retain(|field| {
if let SelectItem::UnnamedExpr(Expr::Identifier(ident)) = field {
ident.value.to_lowercase() != "is_iceberg"
} else {
true
}
});

// Replace result_scan with the select from information_schema.tables
for table_with_joins in &mut select.from {
if let TableFactor::TableFunction {
expr: Expr::Function(f),
alias,
} = &mut table_with_joins.relation
{
if f.name.to_string().to_lowercase() == "result_scan" {
let columns = [
"table_catalog as 'database_name'",
"table_schema as 'schema_name'",
"table_name as 'name'",
"case when table_type='BASE TABLE' then 'TABLE' else table_type end as 'kind'",
"null as 'comment'",
"case when table_type='BASE TABLE' then 'Y' else 'N' end as is_iceberg",
"'N' as 'is_dynamic'",
].join(", ");
let information_schema_query =
format!("SELECT {columns} FROM information_schema.tables");

match DFParser::parse_sql(information_schema_query.as_str()) {
Ok(mut statements) => {
if let Some(DFStatement::Statement(s)) = statements.pop_front() {
if let Statement::Query(subquery) = *s {
select.from = vec![TableWithJoins {
relation: TableFactor::Derived {
lateral: false,
alias: alias.clone(),
subquery,
},
joins: table_with_joins.joins.clone(),
}];
break;
}
}
}
Err(_) => return,
}
}
}
}
}
}

fn convert_batches_to_exprs(batches: Vec<RecordBatch>) -> Vec<sqlparser::ast::ExprWithAlias> {
let mut exprs = Vec::new();
for batch in batches {
Expand Down
35 changes: 34 additions & 1 deletion crates/embucket-functions/src/tests/visitors.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::visitors::{
functions_rewriter, inline_aliases_in_query, json_element, select_expr_aliases,
functions_rewriter, inline_aliases_in_query, json_element, select_expr_aliases, table_function,
};
use datafusion::prelude::SessionContext;
use datafusion::sql::parser::Statement as DFStatement;
Expand Down Expand Up @@ -174,3 +174,36 @@ fn test_inline_aliases_in_query() -> DFResult<()> {
}
Ok(())
}

#[test]
fn test_table_function_result_scan() -> DFResult<()> {
let state = SessionContext::new().state();
let cases = vec![
(
"SELECT * FROM table(RESULT_SCAN(LAST_QUERY_ID(-2)))",
"SELECT * FROM RESULT_SCAN(LAST_QUERY_ID(-2))",
),
(
"SELECT * FROM table(FUNC('1'))",
"SELECT * FROM TABLE(FUNC('1'))",
),
(
"SELECT c2 FROM TABLE(RESULT_SCAN('id')) WHERE c2 > 1",
"SELECT c2 FROM RESULT_SCAN('id') WHERE c2 > 1",
),
(
"select a.*, b.IS_ICEBERG as 'is_iceberg'
from table(result_scan(last_query_id(-1))) a left join test as b on a.t = b.t",
"SELECT a.*, b.IS_ICEBERG AS 'is_iceberg' FROM result_scan(last_query_id(-1)) AS a LEFT JOIN test AS b ON a.t = b.t",
),
];

for (input, expected) in cases {
let mut statement = state.sql_to_statement(input, "snowflake")?;
if let DFStatement::Statement(ref mut stmt) = statement {
table_function::visit(stmt);
}
assert_eq!(statement.to_string(), expected);
}
Ok(())
}
1 change: 1 addition & 0 deletions crates/embucket-functions/src/visitors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ pub mod functions_rewriter;
pub mod inline_aliases_in_query;
pub mod json_element;
pub mod select_expr_aliases;
pub mod table_function;
pub mod unimplemented;
74 changes: 74 additions & 0 deletions crates/embucket-functions/src/visitors/table_function.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use datafusion::logical_expr::sqlparser::ast::{Expr, TableFactor, VisitMut};
use datafusion::sql::sqlparser::ast::{
Function, FunctionArguments, Query, SetExpr, Statement, VisitorMut,
};
use std::ops::ControlFlow;

/// A SQL AST visitor that rewrites `TABLE(RESULT_SCAN(...))` table functions
/// into `RESULT_SCAN(...)` by removing the unnecessary `TABLE(...)` wrapper.
///
/// This transformation is useful because in many SQL dialects, especially Snowflake-like syntax,
/// queries such as:
///
/// ```sql
/// SELECT * FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())) WHERE value > 1;
/// ```
///
/// are semantically equivalent to:
///
/// ```sql
/// SELECT * FROM RESULT_SCAN(LAST_QUERY_ID()) WHERE value > 1;
/// ```
///
/// However, the presence of the `TABLE(...)` wrapper can complicate query parsing
/// or downstream analysis in some tools, such as logical planners or optimizers.
/// This visitor simplifies the AST by stripping the redundant `TABLE(...)`
/// call when it wraps a single `RESULT_SCAN(...)` function call.
///
/// # How it works:
/// - It traverses SQL `Query` nodes in the AST.
/// - For each `FROM` clause entry that is a `TableFactor::TableFunction`, it checks whether the expression is:
/// - A function call named `TABLE`,
/// - With exactly one argument,
/// - And that argument is a function call named `RESULT_SCAN`.
/// - If all conditions are met, it replaces the outer `TABLE(...)` function expression
/// with the inner `RESULT_SCAN(...)` function directly.
///
/// This transformation is performed in-place using the `VisitorMut` trait.
#[derive(Debug, Default)]
pub struct TableFunctionVisitor {}

impl VisitorMut for TableFunctionVisitor {
type Break = ();

fn pre_visit_query(&mut self, query: &mut Query) -> ControlFlow<Self::Break> {
if let SetExpr::Select(select) = query.body.as_mut() {
for item in &mut select.from {
if let TableFactor::TableFunction {
expr:
Expr::Function(Function {
name,
args: FunctionArguments::List(args),
..
}),
alias,
} = &mut item.relation
{
if name.to_string().to_lowercase() == "result_scan" {
item.relation = TableFactor::Function {
name: name.clone(),
args: args.args.clone(),
alias: alias.clone(),
lateral: false,
};
}
}
}
}
ControlFlow::Continue(())
}
}

pub fn visit(stmt: &mut Statement) {
let _ = stmt.visit(&mut TableFunctionVisitor {});
}