From 01ec53975349f3a993d2a01ec369bdaabea60790 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Fri, 8 Nov 2024 20:45:18 -0600 Subject: [PATCH] auto query --- Trunk.toml | 3 + src/main.rs | 189 ++++++++++++++++++++++++++++------------------------ 2 files changed, 104 insertions(+), 88 deletions(-) create mode 100644 Trunk.toml diff --git a/Trunk.toml b/Trunk.toml new file mode 100644 index 0000000..0d083d5 --- /dev/null +++ b/Trunk.toml @@ -0,0 +1,3 @@ + +[build] +minify = "on_release" diff --git a/src/main.rs b/src/main.rs index 782c7c1..74187bd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -266,7 +266,7 @@ impl PartitionStream for DummyStreamPartition { fn App() -> impl IntoView { let default_url = "https://raw.githubusercontent.com/RobinL/iris_parquet/main/gridwatch/gridwatch_2023-01-08.parquet"; let (url, set_url) = create_signal(default_url.to_string()); - let (file_content, set_file_content) = create_signal(None); + let (file_content, set_file_content) = create_signal(None::); let (error_message, set_error_message) = create_signal(Option::::None); let (file_bytes, set_file_bytes) = create_signal(None::); let (sql_query, set_sql_query) = create_signal(String::new()); @@ -275,6 +275,98 @@ fn App() -> impl IntoView { let (logical_plan, set_logical_plan) = create_signal(None::); let (physical_plan, set_physical_plan) = create_signal(None::>); + let execute_query = move || { + let query = sql_query.get(); + let bytes_opt = file_bytes.get(); + let table_name = file_name.get(); + set_error_message.set(None); // Clear any previous error messages + + if query.trim().is_empty() { + set_error_message.set(Some("Please enter a SQL query.".into())); + return; + } + if let Some(bytes) = bytes_opt { + wasm_bindgen_futures::spawn_local(async move { + web_sys::console::log_1(&table_name.clone().into()); + let ctx = datafusion::prelude::SessionContext::new(); + + let schema = match file_content.get_untracked() { + Some(content) => content.schema.clone(), + None => { + set_error_message.set(Some("Failed to get file schema".into())); + return; + } + }; + + let streaming_table = + match datafusion::datasource::streaming::StreamingTable::try_new( + schema.clone(), + vec![Arc::new(DummyStreamPartition { + schema: schema.clone(), + bytes: bytes.clone(), + })], + ) { + Ok(table) => table, + Err(e) => { + set_error_message + .set(Some(format!("Failed to create streaming table: {}", e))); + return; + } + }; + + if let Err(e) = ctx.register_table(table_name.clone(), Arc::new(streaming_table)) { + set_error_message.set(Some(format!( + "Failed to register table '{}': {}", + table_name, e + ))); + return; + } + + let plan = match ctx.sql(&query).await { + Ok(plan) => plan, + Err(e) => { + set_error_message.set(Some(format!("SQL error: {}", e))); + return; + } + }; + + let (state, plan) = plan.into_parts(); + let plan = match state.optimize(&plan) { + Ok(plan) => plan, + Err(e) => { + set_error_message.set(Some(format!("Failed to optimize query: {}", e))); + return; + } + }; + + set_logical_plan.set(Some(plan.clone())); + + let physical_plan = match state.create_physical_plan(&plan).await { + Ok(plan) => plan, + Err(e) => { + set_error_message + .set(Some(format!("Failed to create execution plan: {}", e))); + return; + } + }; + + set_physical_plan.set(Some(physical_plan.clone())); + + match collect(physical_plan, ctx.task_ctx().clone()).await { + Ok(results) => { + set_error_message.set(None); // Clear error message on success + set_query_result.set(results); + } + Err(e) => { + set_error_message.set(Some(format!("Failed to execute query: {}", e))); + } + }; + }); + } else { + set_error_message.set(Some("No Parquet file loaded.".into())); + }; + }; + let on_file_select = move |ev: web_sys::Event| { let input: web_sys::HtmlInputElement = event_target(&ev); let files = input.files().unwrap(); @@ -296,6 +388,10 @@ fn App() -> impl IntoView { web_sys::console::log_1(&info.to_string().into()); file_content_setter.set(Some(info)); set_file_bytes.set(Some(bytes.clone())); + let default_query = + format!("select * from \"{}\" limit 10", file_name.get_untracked()); + set_sql_query.set(default_query); + execute_query(); } Err(_e) => { file_content_setter.set(None); @@ -326,7 +422,7 @@ fn App() -> impl IntoView { .strip_suffix(".parquet") .unwrap_or("uploaded") .to_string(); - set_file_name.set(table_name); + set_file_name.set(table_name.clone()); wasm_bindgen_futures::spawn_local(async move { let opts = web_sys::RequestInit::new(); @@ -372,6 +468,9 @@ fn App() -> impl IntoView { Ok(info) => { set_file_content.set(Some(info.clone())); set_file_bytes.set(Some(bytes.clone())); + let default_query = format!("select * from \"{}\" limit 10", table_name); + set_sql_query.set(default_query); + execute_query(); } Err(_) => { set_file_content.set(None); @@ -380,92 +479,6 @@ fn App() -> impl IntoView { }); }; - let execute_query = move || { - let query = sql_query.get(); - let bytes_opt = file_bytes.get(); - let table_name = file_name.get(); - set_error_message.set(None); // Clear any previous error messages - - if query.trim().is_empty() { - set_error_message.set(Some("Please enter a SQL query.".into())); - return; - } - if let Some(bytes) = bytes_opt { - wasm_bindgen_futures::spawn_local(async move { - web_sys::console::log_1(&table_name.clone().into()); - let ctx = datafusion::prelude::SessionContext::new(); - - let schema = match file_content.get_untracked() { - Some(content) => content.schema.clone(), - None => { - set_error_message.set(Some("Failed to get file schema".into())); - return; - } - }; - - let streaming_table = match datafusion::datasource::streaming::StreamingTable::try_new( - schema.clone(), - vec![Arc::new(DummyStreamPartition { - schema: schema.clone(), - bytes: bytes.clone(), - })], - ) { - Ok(table) => table, - Err(e) => { - set_error_message.set(Some(format!("Failed to create streaming table: {}", e))); - return; - } - }; - - if let Err(e) = ctx.register_table(table_name.clone(), Arc::new(streaming_table)) { - set_error_message.set(Some(format!("Failed to register table '{}': {}", table_name, e))); - return; - } - - let plan = match ctx.sql(&query).await { - Ok(plan) => plan, - Err(e) => { - set_error_message.set(Some(format!("SQL error: {}", e))); - return; - } - }; - - let (state, plan) = plan.into_parts(); - let plan = match state.optimize(&plan) { - Ok(plan) => plan, - Err(e) => { - set_error_message.set(Some(format!("Failed to optimize query: {}", e))); - return; - } - }; - - set_logical_plan.set(Some(plan.clone())); - - let physical_plan = match state.create_physical_plan(&plan).await { - Ok(plan) => plan, - Err(e) => { - set_error_message.set(Some(format!("Failed to create execution plan: {}", e))); - return; - } - }; - - set_physical_plan.set(Some(physical_plan.clone())); - - match collect(physical_plan, ctx.task_ctx().clone()).await { - Ok(results) => { - set_error_message.set(None); // Clear error message on success - set_query_result.set(results); - }, - Err(e) => { - set_error_message.set(Some(format!("Failed to execute query: {}", e))); - } - }; - }); - } else { - set_error_message.set(Some("No Parquet file loaded.".into())); - }; - }; - let run_query = move |_ev: web_sys::MouseEvent| { execute_query(); };