Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
XiangpengHao committed Dec 24, 2024
1 parent 5b7d252 commit ae1934a
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 114 deletions.
35 changes: 19 additions & 16 deletions src/file_reader.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use bytes::Bytes;
use leptos::prelude::*;
use leptos::wasm_bindgen::{prelude::Closure, JsCast};
use leptos_router::hooks::query_signal;
Expand All @@ -7,7 +6,7 @@ use object_store::{ObjectStore, PutPayload};
use opendal::{services::Http, services::S3, Operator};
use web_sys::{js_sys, Url};

use crate::INMEMORY_STORE;
use crate::{ParquetTable, INMEMORY_STORE};

const S3_ENDPOINT_KEY: &str = "s3_endpoint";
const S3_ACCESS_KEY_ID_KEY: &str = "s3_access_key_id";
Expand All @@ -34,24 +33,20 @@ fn save_to_storage(key: &str, value: &str) {
}

async fn update_file(
bytes: Bytes,
bytes_setter: WriteSignal<Option<Bytes>>,
file_name: &String,
file_name_setter: WriteSignal<String>,
parquet_table: ParquetTable,
parquet_table_setter: WriteSignal<Option<ParquetTable>>,
) {
let object_store = &*INMEMORY_STORE;
let path = Path::parse(format!("{}.parquet", file_name)).unwrap();
let payload = PutPayload::from_bytes(bytes.clone());
let path = Path::parse(format!("{}.parquet", parquet_table.table_name)).unwrap();
let payload = PutPayload::from_bytes(parquet_table.bytes.clone());
object_store.put(&path, payload).await.unwrap();
bytes_setter.set(Some(bytes));
file_name_setter.set(file_name.clone());
parquet_table_setter.set(Some(parquet_table));
}

#[component]
pub fn FileReader(
set_error_message: WriteSignal<Option<String>>,
set_file_bytes: WriteSignal<Option<Bytes>>,
set_file_name: WriteSignal<String>,
set_parquet_table: WriteSignal<Option<ParquetTable>>,
) -> impl IntoView {
let (active_tab, set_active_tab) = query_signal::<String>("tab");

Expand Down Expand Up @@ -172,8 +167,9 @@ pub fn FileReader(
let array_buffer = result.dyn_into::<js_sys::ArrayBuffer>().unwrap();
let uint8_array = js_sys::Uint8Array::new(&array_buffer);
let bytes = bytes::Bytes::from(uint8_array.to_vec());
let parquet_table = ParquetTable { bytes, table_name };
leptos::task::spawn_local(async move {
update_file(bytes.clone(), set_file_bytes, &table_name, set_file_name).await;
update_file(parquet_table, set_parquet_table).await;
set_is_folded.set(true);
});
}) as Box<dyn FnMut(_)>);
Expand Down Expand Up @@ -214,7 +210,11 @@ pub fn FileReader(

match op.read(&path).await {
Ok(bs) => {
update_file(bs.to_bytes(), set_file_bytes, &table_name, set_file_name).await;
let parquet_table = ParquetTable {
bytes: bs.to_bytes(),
table_name,
};
update_file(parquet_table, set_parquet_table).await;
set_is_folded.set(true);
}
Err(e) => {
Expand Down Expand Up @@ -260,8 +260,11 @@ pub fn FileReader(
let operator = op.finish();
match operator.read(&s3_file_path.get()).await {
Ok(bs) => {
update_file(bs.to_bytes(), set_file_bytes, &file_name, set_file_name)
.await;
let parquet_table = ParquetTable {
bytes: bs.to_bytes(),
table_name: file_name,
};
update_file(parquet_table, set_parquet_table).await;
set_is_folded.set(true);
}
Err(e) => {
Expand Down
103 changes: 49 additions & 54 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
mod schema;
use datafusion::physical_plan::ExecutionPlan;
use file_reader::{get_stored_value, FileReader};
use futures::{future::BoxFuture, FutureExt};
use leptos_router::{
components::Router,
hooks::{query_signal, use_query_map},
Expand All @@ -18,13 +17,13 @@ mod row_group_column;
mod metadata;
use metadata::MetadataSection;

use std::{ops::Range, sync::Arc, sync::LazyLock};
use std::{sync::Arc, sync::LazyLock};

use arrow::datatypes::SchemaRef;
use bytes::Bytes;
use leptos::{logging, prelude::*};
use parquet::{
arrow::{async_reader::AsyncFileReader, parquet_to_arrow_schema},
arrow::parquet_to_arrow_schema,
errors::ParquetError,
file::metadata::{ParquetMetaData, ParquetMetaDataReader},
};
Expand All @@ -40,27 +39,28 @@ pub(crate) static INMEMORY_STORE: LazyLock<Arc<InMemory>> =

#[derive(Debug, Clone, PartialEq)]
pub(crate) struct ParquetReader {
bytes: Bytes,
parquet_table: ParquetTable,
parquet_info: ParquetInfo,
}

impl ParquetReader {
pub fn new(bytes: Bytes) -> Result<Self> {
pub fn new(table: ParquetTable) -> Result<Self> {
let mut footer = [0_u8; 8];
let bytes = &table.bytes;
footer.copy_from_slice(&bytes[bytes.len() - 8..]);
let metadata_len = ParquetMetaDataReader::decode_footer(&footer)?;

let mut metadata_reader = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_column_indexes(true)
.with_offset_indexes(true);
metadata_reader.try_parse(&bytes)?;
metadata_reader.try_parse(bytes)?;
let metadata = metadata_reader.finish()?;

let parquet_info = ParquetInfo::from_metadata(metadata, metadata_len as u64)?;

Ok(Self {
bytes,
parquet_table: table,
parquet_info,
})
}
Expand All @@ -70,26 +70,11 @@ impl ParquetReader {
}

fn bytes(&self) -> &Bytes {
&self.bytes
}
}

impl AsyncFileReader for ParquetReader {
fn get_byte_ranges(
&mut self,
ranges: Vec<Range<usize>>,
) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> {
let rt = ranges.iter().map(|r| self.bytes.slice(r.clone())).collect();
async move { Ok(rt) }.boxed()
}

fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
let sliced = self.bytes.slice(range);
async move { Ok(sliced) }.boxed()
&self.parquet_table.bytes
}

fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
async move { Ok(self.parquet_info.metadata.clone()) }.boxed()
fn table_name(&self) -> &str {
&self.parquet_table.table_name
}
}

Expand Down Expand Up @@ -215,74 +200,85 @@ async fn execute_query_async(
Ok((results, physical_plan))
}

#[derive(Debug, Clone, PartialEq)]
struct ParquetTable {
bytes: Bytes,
table_name: String,
}

#[component]
fn App() -> impl IntoView {
let (error_message, set_error_message) = signal(Option::<String>::None);
let (file_bytes, set_file_bytes) = signal(None::<Bytes>);
let (parquet_table, set_parquet_table) = signal(None::<ParquetTable>);
let (user_input, set_user_input) = query_signal::<String>("query");

let export_to = use_query_map().with(|map| map.get("export").map(|v| v.to_string()));

let (sql_query, set_sql_query) = signal(String::new());
let (query_result, set_query_result) = signal(Vec::<arrow::array::RecordBatch>::new());
let (file_name, set_file_name) = signal(String::from("uploaded"));
let (physical_plan, set_physical_plan) = signal(None::<Arc<dyn ExecutionPlan>>);
let (show_settings, set_show_settings) = signal(false);
let api_key = get_stored_value(ANTHROPIC_API_KEY, "");

let parquet_reader = Memo::new(move |_| {
file_bytes
parquet_table
.get()
.and_then(|bytes| ParquetReader::new(bytes.clone()).ok())
.and_then(|table| ParquetReader::new(table).ok())
});

Effect::watch(
parquet_reader,
move |info, _, _| {
if let Some(info) = info {
match user_input.get() {
move |reader, old_reader, _| {
let Some(reader) = reader else { return };

match old_reader.flatten() {
Some(old_reader) => {
if old_reader.table_name() != reader.table_name() {
let default_query =
format!("select * from \"{}\" limit 10", reader.table_name());
set_user_input.set(Some(default_query));
}
}
None => match user_input.get() {
Some(user_input) => {
set_user_input.set(Some(user_input));
// force update
let new_input = format!("{} ", user_input);
set_user_input.set(Some(new_input));
}
None => {
logging::log!("{}", info.info().to_string());
logging::log!("{}", reader.info().to_string());
let default_query =
format!("select * from \"{}\" limit 10", file_name.get_untracked());
format!("select * from \"{}\" limit 10", reader.table_name());
set_user_input.set(Some(default_query));
}
}
},
}
},
true,
);

Effect::watch(
move || (user_input.get(), parquet_reader.get()),
move |(user_input, parquet), _, _| {
user_input,
move |user_input, _, _| {
let Some(user_input_str) = user_input else {
return;
};
if parquet.is_none() {
return;
}
set_user_input.set(Some(user_input_str.clone()));

let user_input = user_input_str.clone();
let api_key = api_key.clone();
leptos::task::spawn_local(async move {
let Some(parquet_info) = parquet_reader.get() else {
let Some(parquet_reader) = parquet_reader.get() else {
return;
};
let api_key = get_stored_value(ANTHROPIC_API_KEY, "");
let sql = match query_input::user_input_to_sql(
&user_input,
&parquet_info.info().schema,
&file_name(),
&parquet_reader.info().schema,
parquet_reader.table_name(),
&api_key,
)
.await
{
Ok(response) => response,
Err(e) => {
logging::log!("{}", e);
set_error_message.set(Some(e));
return;
}
Expand All @@ -297,17 +293,17 @@ fn App() -> impl IntoView {
Effect::watch(
sql_query,
move |query, _, _| {
let bytes_opt = file_bytes.get();
let table_name = file_name.get();
let bytes_opt = parquet_table.get();
set_error_message.set(None);

if query.trim().is_empty() {
return;
}

if let Some(_bytes) = bytes_opt {
if let Some(parquet_table) = bytes_opt {
let query = query.clone();
let export_to = export_to.clone();
let table_name = parquet_table.table_name;

leptos::task::spawn_local(async move {
match execute_query_async(query.clone(), table_name).await {
Expand Down Expand Up @@ -374,8 +370,7 @@ fn App() -> impl IntoView {
<div class="space-y-6">
<FileReader
set_error_message=set_error_message
set_file_bytes=set_file_bytes
set_file_name=set_file_name
set_parquet_table=set_parquet_table
/>

<div class="border-t border-gray-300 my-4"></div>
Expand Down Expand Up @@ -404,7 +399,7 @@ fn App() -> impl IntoView {
}}
<div class="mt-4">
{move || {
file_bytes
parquet_table
.get()
.map(|_| {
match parquet_reader() {
Expand Down
4 changes: 2 additions & 2 deletions src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ pub fn MetadataSection(parquet_reader: super::ParquetReader) -> impl IntoView {
Some(
view! {
<div>
<super::row_group_column::RowGroupColumn parquet_reader=parquet_reader.clone()
/>
<super::row_group_column::RowGroupColumn parquet_reader=parquet_reader
.clone() />
</div>
},
)
Expand Down
Loading

0 comments on commit ae1934a

Please sign in to comment.