Skip to content

Commit

Permalink
auto query
Browse files Browse the repository at this point in the history
  • Loading branch information
XiangpengHao committed Nov 9, 2024
1 parent 99f93c7 commit 01ec539
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 88 deletions.
3 changes: 3 additions & 0 deletions Trunk.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

[build]
minify = "on_release"
189 changes: 101 additions & 88 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ impl PartitionStream for DummyStreamPartition {
fn App() -> impl IntoView {
let default_url = "https://raw.githubusercontent.com/RobinL/iris_parquet/main/gridwatch/gridwatch_2023-01-08.parquet";
let (url, set_url) = create_signal(default_url.to_string());
let (file_content, set_file_content) = create_signal(None);
let (file_content, set_file_content) = create_signal(None::<ParquetInfo>);
let (error_message, set_error_message) = create_signal(Option::<String>::None);
let (file_bytes, set_file_bytes) = create_signal(None::<Bytes>);
let (sql_query, set_sql_query) = create_signal(String::new());
Expand All @@ -275,6 +275,98 @@ fn App() -> impl IntoView {
let (logical_plan, set_logical_plan) = create_signal(None::<LogicalPlan>);
let (physical_plan, set_physical_plan) = create_signal(None::<Arc<dyn ExecutionPlan>>);

let execute_query = move || {
let query = sql_query.get();
let bytes_opt = file_bytes.get();
let table_name = file_name.get();
set_error_message.set(None); // Clear any previous error messages

if query.trim().is_empty() {
set_error_message.set(Some("Please enter a SQL query.".into()));
return;
}
if let Some(bytes) = bytes_opt {
wasm_bindgen_futures::spawn_local(async move {
web_sys::console::log_1(&table_name.clone().into());
let ctx = datafusion::prelude::SessionContext::new();

let schema = match file_content.get_untracked() {
Some(content) => content.schema.clone(),
None => {
set_error_message.set(Some("Failed to get file schema".into()));
return;
}
};

let streaming_table =
match datafusion::datasource::streaming::StreamingTable::try_new(
schema.clone(),
vec![Arc::new(DummyStreamPartition {
schema: schema.clone(),
bytes: bytes.clone(),
})],
) {
Ok(table) => table,
Err(e) => {
set_error_message
.set(Some(format!("Failed to create streaming table: {}", e)));
return;
}
};

if let Err(e) = ctx.register_table(table_name.clone(), Arc::new(streaming_table)) {
set_error_message.set(Some(format!(
"Failed to register table '{}': {}",
table_name, e
)));
return;
}

let plan = match ctx.sql(&query).await {
Ok(plan) => plan,
Err(e) => {
set_error_message.set(Some(format!("SQL error: {}", e)));
return;
}
};

let (state, plan) = plan.into_parts();
let plan = match state.optimize(&plan) {
Ok(plan) => plan,
Err(e) => {
set_error_message.set(Some(format!("Failed to optimize query: {}", e)));
return;
}
};

set_logical_plan.set(Some(plan.clone()));

let physical_plan = match state.create_physical_plan(&plan).await {
Ok(plan) => plan,
Err(e) => {
set_error_message
.set(Some(format!("Failed to create execution plan: {}", e)));
return;
}
};

set_physical_plan.set(Some(physical_plan.clone()));

match collect(physical_plan, ctx.task_ctx().clone()).await {
Ok(results) => {
set_error_message.set(None); // Clear error message on success
set_query_result.set(results);
}
Err(e) => {
set_error_message.set(Some(format!("Failed to execute query: {}", e)));
}
};
});
} else {
set_error_message.set(Some("No Parquet file loaded.".into()));
};
};

let on_file_select = move |ev: web_sys::Event| {
let input: web_sys::HtmlInputElement = event_target(&ev);
let files = input.files().unwrap();
Expand All @@ -296,6 +388,10 @@ fn App() -> impl IntoView {
web_sys::console::log_1(&info.to_string().into());
file_content_setter.set(Some(info));
set_file_bytes.set(Some(bytes.clone()));
let default_query =
format!("select * from \"{}\" limit 10", file_name.get_untracked());
set_sql_query.set(default_query);
execute_query();
}
Err(_e) => {
file_content_setter.set(None);
Expand Down Expand Up @@ -326,7 +422,7 @@ fn App() -> impl IntoView {
.strip_suffix(".parquet")
.unwrap_or("uploaded")
.to_string();
set_file_name.set(table_name);
set_file_name.set(table_name.clone());

wasm_bindgen_futures::spawn_local(async move {
let opts = web_sys::RequestInit::new();
Expand Down Expand Up @@ -372,6 +468,9 @@ fn App() -> impl IntoView {
Ok(info) => {
set_file_content.set(Some(info.clone()));
set_file_bytes.set(Some(bytes.clone()));
let default_query = format!("select * from \"{}\" limit 10", table_name);
set_sql_query.set(default_query);
execute_query();
}
Err(_) => {
set_file_content.set(None);
Expand All @@ -380,92 +479,6 @@ fn App() -> impl IntoView {
});
};

let execute_query = move || {
let query = sql_query.get();
let bytes_opt = file_bytes.get();
let table_name = file_name.get();
set_error_message.set(None); // Clear any previous error messages

if query.trim().is_empty() {
set_error_message.set(Some("Please enter a SQL query.".into()));
return;
}
if let Some(bytes) = bytes_opt {
wasm_bindgen_futures::spawn_local(async move {
web_sys::console::log_1(&table_name.clone().into());
let ctx = datafusion::prelude::SessionContext::new();

let schema = match file_content.get_untracked() {
Some(content) => content.schema.clone(),
None => {
set_error_message.set(Some("Failed to get file schema".into()));
return;
}
};

let streaming_table = match datafusion::datasource::streaming::StreamingTable::try_new(
schema.clone(),
vec![Arc::new(DummyStreamPartition {
schema: schema.clone(),
bytes: bytes.clone(),
})],
) {
Ok(table) => table,
Err(e) => {
set_error_message.set(Some(format!("Failed to create streaming table: {}", e)));
return;
}
};

if let Err(e) = ctx.register_table(table_name.clone(), Arc::new(streaming_table)) {
set_error_message.set(Some(format!("Failed to register table '{}': {}", table_name, e)));
return;
}

let plan = match ctx.sql(&query).await {
Ok(plan) => plan,
Err(e) => {
set_error_message.set(Some(format!("SQL error: {}", e)));
return;
}
};

let (state, plan) = plan.into_parts();
let plan = match state.optimize(&plan) {
Ok(plan) => plan,
Err(e) => {
set_error_message.set(Some(format!("Failed to optimize query: {}", e)));
return;
}
};

set_logical_plan.set(Some(plan.clone()));

let physical_plan = match state.create_physical_plan(&plan).await {
Ok(plan) => plan,
Err(e) => {
set_error_message.set(Some(format!("Failed to create execution plan: {}", e)));
return;
}
};

set_physical_plan.set(Some(physical_plan.clone()));

match collect(physical_plan, ctx.task_ctx().clone()).await {
Ok(results) => {
set_error_message.set(None); // Clear error message on success
set_query_result.set(results);
},
Err(e) => {
set_error_message.set(Some(format!("Failed to execute query: {}", e)));
}
};
});
} else {
set_error_message.set(Some("No Parquet file loaded.".into()));
};
};

let run_query = move |_ev: web_sys::MouseEvent| {
execute_query();
};
Expand Down

0 comments on commit 01ec539

Please sign in to comment.