diff --git a/src/main.rs b/src/main.rs index c52dbd9..215defc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use aws_config::BehaviorVersion; use aws_credential_types::provider::ProvideCredentials; -use aws_sdk_glue::types::StorageDescriptor; +use aws_sdk_glue::types::{StorageDescriptor, Table}; use aws_sdk_glue::Client; use aws_types::SdkConfig; use clap::Parser; @@ -219,6 +219,7 @@ async fn get_path_and_format( database_name, table_name )) })?; + let sd = table.storage_descriptor().ok_or_else(|| { DataFusionError::Execution(format!( "Could not find storage descriptor for {}.{} in glue", @@ -227,7 +228,7 @@ async fn get_path_and_format( })?; let location = lookup_storage_location(sd)?; - let format_arc = lookup_file_format(sd)?; + let format_arc = lookup_file_format(table.clone(), sd)?; Ok((location, format_arc)) } @@ -238,7 +239,7 @@ fn lookup_storage_location(sd: &StorageDescriptor) -> Result { Ok(location.to_string()) } -fn lookup_file_format(sd: &StorageDescriptor) -> Result> { +fn lookup_file_format(table: Table, sd: &StorageDescriptor) -> Result> { let empty_str = String::from(""); let input_format = sd.input_format.as_ref().unwrap_or(&empty_str); let output_format = sd.output_format.as_ref().unwrap_or(&empty_str); @@ -266,6 +267,21 @@ fn lookup_file_format(sd: &StorageDescriptor) -> Result> { None => HashMap::new(), }; + let table_parameters = table.parameters.unwrap_or_default(); + let table_type = table_parameters + .get("table_type") + .map(|x| x.as_str()) + .unwrap_or_default(); + // this can be delta... + // or ICEBERG... + + /* + Table format: Apache Iceberg +Input format: - +Output format: - +Serde serialization lib:- + */ + let item: (&str, &str, &str) = (input_format, output_format, serialization_library); let format_result: Result> = match item { ( @@ -273,6 +289,12 @@ fn lookup_file_format(sd: &StorageDescriptor) -> Result> { "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat", "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", ) => Ok(Arc::new(ParquetFormat::default())), + ( + // actually this is Deltalake format... + "org.apache.hadoop.mapred.SequenceFileInputFormat", + "org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat", + "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", + ) => Ok(Arc::new(ParquetFormat::default())), ( "org.apache.hadoop.mapred.TextInputFormat", "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", @@ -321,6 +343,7 @@ fn lookup_file_format(sd: &StorageDescriptor) -> Result> { input_format, output_format, sd ))), }; + let format = format_result?; Ok(format) }