Skip to content

Commit

Permalink
feat: make changes such that a glue deltalake table can be loaded
Browse files Browse the repository at this point in the history
  • Loading branch information
timvw committed Mar 29, 2024
1 parent 0033212 commit 115b4b5
Showing 1 changed file with 26 additions and 3 deletions.
29 changes: 26 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;

use aws_config::BehaviorVersion;
use aws_credential_types::provider::ProvideCredentials;
use aws_sdk_glue::types::StorageDescriptor;
use aws_sdk_glue::types::{StorageDescriptor, Table};
use aws_sdk_glue::Client;
use aws_types::SdkConfig;
use clap::Parser;
Expand Down Expand Up @@ -219,6 +219,7 @@ async fn get_path_and_format(
database_name, table_name
))
})?;

let sd = table.storage_descriptor().ok_or_else(|| {
DataFusionError::Execution(format!(
"Could not find storage descriptor for {}.{} in glue",
Expand All @@ -227,7 +228,7 @@ async fn get_path_and_format(
})?;

let location = lookup_storage_location(sd)?;
let format_arc = lookup_file_format(sd)?;
let format_arc = lookup_file_format(table.clone(), sd)?;
Ok((location, format_arc))
}

Expand All @@ -238,7 +239,7 @@ fn lookup_storage_location(sd: &StorageDescriptor) -> Result<String> {
Ok(location.to_string())
}

fn lookup_file_format(sd: &StorageDescriptor) -> Result<Arc<dyn FileFormat>> {
fn lookup_file_format(table: Table, sd: &StorageDescriptor) -> Result<Arc<dyn FileFormat>> {
let empty_str = String::from("");
let input_format = sd.input_format.as_ref().unwrap_or(&empty_str);
let output_format = sd.output_format.as_ref().unwrap_or(&empty_str);
Expand Down Expand Up @@ -266,13 +267,34 @@ fn lookup_file_format(sd: &StorageDescriptor) -> Result<Arc<dyn FileFormat>> {
None => HashMap::new(),
};

let table_parameters = table.parameters.unwrap_or_default();
let table_type = table_parameters

Check failure on line 271 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo clippy

unused variable: `table_type`

Check warning on line 271 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo test

unused variable: `table_type`

Check warning on line 271 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo test

unused variable: `table_type`
.get("table_type")
.map(|x| x.as_str())
.unwrap_or_default();
// this can be delta...
// or ICEBERG...

Check warning on line 276 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/qv/qv/src/main.rs

/*
Table format: Apache Iceberg
Input format: -
Output format: -
Serde serialization lib:-
*/

let item: (&str, &str, &str) = (input_format, output_format, serialization_library);
let format_result: Result<Arc<dyn FileFormat>> = match item {
(
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
) => Ok(Arc::new(ParquetFormat::default())),
(
// actually this is Deltalake format...
"org.apache.hadoop.mapred.SequenceFileInputFormat",
"org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat",
"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
) => Ok(Arc::new(ParquetFormat::default())),
(
"org.apache.hadoop.mapred.TextInputFormat",
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
Expand Down Expand Up @@ -321,6 +343,7 @@ fn lookup_file_format(sd: &StorageDescriptor) -> Result<Arc<dyn FileFormat>> {
input_format, output_format, sd
))),
};

let format = format_result?;
Ok(format)
}
Expand Down

0 comments on commit 115b4b5

Please sign in to comment.