Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make changes such that a glue deltalake table can be loaded #90

Merged
merged 3 commits into from
Mar 29, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;

use aws_config::BehaviorVersion;
use aws_credential_types::provider::ProvideCredentials;
use aws_sdk_glue::types::StorageDescriptor;
use aws_sdk_glue::types::{StorageDescriptor, Table};
use aws_sdk_glue::Client;
use aws_types::SdkConfig;
use clap::Parser;
Expand Down Expand Up @@ -219,6 +219,7 @@ async fn get_path_and_format(
database_name, table_name
))
})?;

let sd = table.storage_descriptor().ok_or_else(|| {
DataFusionError::Execution(format!(
"Could not find storage descriptor for {}.{} in glue",
Expand All @@ -227,7 +228,7 @@ async fn get_path_and_format(
})?;

let location = lookup_storage_location(sd)?;
let format_arc = lookup_file_format(sd)?;
let format_arc = lookup_file_format(table.clone(), sd)?;
Ok((location, format_arc))
}

Expand All @@ -238,7 +239,7 @@ fn lookup_storage_location(sd: &StorageDescriptor) -> Result<String> {
Ok(location.to_string())
}

fn lookup_file_format(sd: &StorageDescriptor) -> Result<Arc<dyn FileFormat>> {
fn lookup_file_format(table: Table, sd: &StorageDescriptor) -> Result<Arc<dyn FileFormat>> {
let empty_str = String::from("");
let input_format = sd.input_format.as_ref().unwrap_or(&empty_str);
let output_format = sd.output_format.as_ref().unwrap_or(&empty_str);
Expand Down Expand Up @@ -266,13 +267,35 @@ fn lookup_file_format(sd: &StorageDescriptor) -> Result<Arc<dyn FileFormat>> {
None => HashMap::new(),
};

let table_parameters = table.parameters.unwrap_or_default();
let _table_type = table_parameters
.get("table_type")
.map(|x| x.as_str())
.unwrap_or_default();

// this can be delta...
// or ICEBERG...

/*
Table format: Apache Iceberg
Input format: -
Output format: -
Serde serialization lib:-
*/

let item: (&str, &str, &str) = (input_format, output_format, serialization_library);
let format_result: Result<Arc<dyn FileFormat>> = match item {
(
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
) => Ok(Arc::new(ParquetFormat::default())),
(
// actually this is Deltalake format...
"org.apache.hadoop.mapred.SequenceFileInputFormat",
"org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat",
"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
) => Ok(Arc::new(ParquetFormat::default())),
(
"org.apache.hadoop.mapred.TextInputFormat",
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
Expand Down Expand Up @@ -321,6 +344,7 @@ fn lookup_file_format(sd: &StorageDescriptor) -> Result<Arc<dyn FileFormat>> {
input_format, output_format, sd
))),
};

let format = format_result?;
Ok(format)
}
Expand Down
Loading