Skip to content

Fix result scan JSON Parsing from ResultSet #1057

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions crates/core-executor/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ use df_catalog::catalog::CachingCatalog;
use df_catalog::information_schema::session_params::SessionProperty;
use embucket_functions::semi_structured::variant::visitors::visit_all;
use embucket_functions::visitors::{
copy_into_identifiers, functions_rewriter, json_element, select_expr_aliases, table_function,
unimplemented::functions_checker::visit as unimplemented_functions_checker,
copy_into_identifiers, functions_rewriter, json_element, select_expr_aliases,
table_result_scan, unimplemented::functions_checker::visit as unimplemented_functions_checker,
};
use iceberg_rust::catalog::Catalog;
use iceberg_rust::catalog::create::CreateTableBuilder;
Expand Down Expand Up @@ -184,7 +184,7 @@ impl UserQuery {
copy_into_identifiers::visit(value);
select_expr_aliases::visit(value);
// inline_aliases_in_query::visit(value);
table_function::visit(value);
table_result_scan::visit(value);
visit_all(value);
}
Ok(())
Expand Down Expand Up @@ -1284,6 +1284,21 @@ impl UserQuery {
})
}
}

/// Fully qualifies all table references in the provided SQL statement.
///
/// This function traverses the SQL statement and updates table references to their fully qualified
/// names (including catalog and schema), based on the current session context and catalog state.
///
/// - Table references that are part of Common Table Expressions (CTEs) are skipped and left as-is.
/// - Table functions (recognized by the session context) are also skipped and left unresolved.
/// - All other table references are resolved using `resolve_table_object_name` and updated in-place.
///
/// # Arguments
/// * `statement` - The SQL statement (`DFStatement`) to update.
///
/// # Errors
/// Returns an error if table resolution fails for any non-CTE, non-table-function reference.
pub fn update_statement_references(&self, statement: &mut DFStatement) -> ExecutionResult<()> {
let (_tables, ctes) = resolve_table_references(
statement,
Expand All @@ -1296,7 +1311,6 @@ impl UserQuery {
.enable_ident_normalization,
)
.context(super::error::DataFusionSnafu)?;

match statement {
DFStatement::Statement(stmt) => {
let cte_names: HashSet<String> = ctes
Expand All @@ -1305,7 +1319,12 @@ impl UserQuery {
.collect();

let _ = visit_relations_mut(stmt, |table_name: &mut ObjectName| {
if !cte_names.contains(&table_name.to_string()) {
let is_table_func = self
.session
.ctx
.table_function(table_name.to_string().as_str())
.is_ok();
if !cte_names.contains(&table_name.to_string()) && !is_table_func {
match self.resolve_table_object_name(table_name.0.clone()) {
Ok(resolved_name) => {
*table_name = ObjectName::from(resolved_name.0);
Expand Down
11 changes: 10 additions & 1 deletion crates/core-executor/src/tests/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;
#[allow(clippy::unwrap_used)]
#[tokio::test]
async fn test_update_all_table_names_visitor() {
let args: [(&str, &str); 8] = [
let args = vec![
("select * from foo", "SELECT * FROM embucket.new_schema.foo"),
(
"insert into foo (id) values (5)",
Expand Down Expand Up @@ -50,6 +50,15 @@ async fn test_update_all_table_names_visitor() {
"WITH sales_data AS (SELECT * FROM foo) SELECT * FROM sales_data",
"WITH sales_data AS (SELECT * FROM embucket.new_schema.foo) SELECT * FROM sales_data",
),
// Skip table functions
(
"select * from result_scan('1')",
"SELECT * FROM result_scan('1')",
),
(
"SELECT * from flatten('[1,77]','',false,false,'both')",
"SELECT * FROM flatten('[1,77]', '', false, false, 'both')",
),
];

let session = create_df_session().await;
Expand Down
2 changes: 1 addition & 1 deletion crates/core-history/src/entities/result_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub struct Column {
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Row(Vec<Value>);
pub struct Row(pub Vec<Value>);

impl Row {
#[must_use]
Expand Down
29 changes: 26 additions & 3 deletions crates/embucket-functions/src/table/result_scan.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::utils::block_in_new_runtime;
use core_history::result_set::ResultSet;
use core_history::{GetQueriesParams, HistoryStore, QueryRecord};
use datafusion::arrow;
use datafusion::arrow::array::RecordBatch;
Expand Down Expand Up @@ -79,14 +80,18 @@ impl ResultScanFunc {
DataFusionError::Execution(format!("No result data for query_id {query_id_parsed}"))
})?;

let mut buf_reader = BufReader::new(Cursor::new(&result_json));
// Deserialize ResultSet string
let result_set: ResultSet = serde_json::from_str(&result_json)
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let arrow_json = convert_resultset_to_arrow_json_lines(&result_set)?;
let mut buf_reader = BufReader::new(Cursor::new(&arrow_json));
let (inferred_schema, _) = infer_json_schema(&mut buf_reader, None)
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let schema_ref: SchemaRef = Arc::new(inferred_schema);

let json_reader = ReaderBuilder::new(schema_ref.clone())
.build(Cursor::new(&result_json))
.build(Cursor::new(&arrow_json))
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let batches = json_reader
Expand Down Expand Up @@ -140,3 +145,21 @@ fn get_query_by_index(queries: &[String], index: i64) -> Option<String> {
fn utf8_val(val: impl Into<String>) -> ScalarValue {
ScalarValue::Utf8(Some(val.into()))
}

fn convert_resultset_to_arrow_json_lines(
result_set: &ResultSet,
) -> Result<String, DataFusionError> {
let mut output = String::new();

for row in &result_set.rows {
let mut record = serde_json::Map::with_capacity(result_set.columns.len());
for (col, value) in result_set.columns.iter().zip(&row.0) {
record.insert(col.name.clone(), value.clone());
}
let json_line =
serde_json::to_string(&record).map_err(|e| DataFusionError::External(Box::new(e)))?;
output.push_str(&json_line);
output.push('\n');
}
Ok(output)
}
7 changes: 5 additions & 2 deletions crates/embucket-functions/src/tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,11 @@ pub fn history_store_mock() -> Arc<dyn HistoryStore> {
mock.expect_get_query().returning(|id| {
let mut record = QueryRecord::new("query", None);
let buf = r#"
{"a": 1, "b": "2", "c": true}
{"a": 2.0, "b": "4", "c": false}
{
"columns": [{"name":"a","type":"text"},{"name":"b","type":"text"},{"name":"c","type":"text"}],
"rows": [[1,"2",true],[2.0,"4",false]],
"data_format": "arrow"
}
"#;
record.result = Some(buf.to_string());
if id == 500 {
Expand Down
5 changes: 3 additions & 2 deletions crates/embucket-functions/src/tests/visitors.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::visitors::{
functions_rewriter, inline_aliases_in_query, json_element, select_expr_aliases, table_function,
functions_rewriter, inline_aliases_in_query, json_element, select_expr_aliases,
table_result_scan,
};
use datafusion::prelude::SessionContext;
use datafusion::sql::parser::Statement as DFStatement;
Expand Down Expand Up @@ -201,7 +202,7 @@ fn test_table_function_result_scan() -> DFResult<()> {
for (input, expected) in cases {
let mut statement = state.sql_to_statement(input, "snowflake")?;
if let DFStatement::Statement(ref mut stmt) = statement {
table_function::visit(stmt);
table_result_scan::visit(stmt);
}
assert_eq!(statement.to_string(), expected);
}
Expand Down
2 changes: 1 addition & 1 deletion crates/embucket-functions/src/visitors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ pub mod functions_rewriter;
pub mod inline_aliases_in_query;
pub mod json_element;
pub mod select_expr_aliases;
pub mod table_function;
pub mod table_result_scan;
pub mod unimplemented;