Skip to content

Commit

Permalink
fix(iceberg): change iceberg rest catalog default implementation to j…
Browse files Browse the repository at this point in the history
…ava (#19357)
  • Loading branch information
chenzl25 authored Nov 12, 2024
1 parent 98aa20b commit f0ece45
Showing 1 changed file with 14 additions and 6 deletions.
20 changes: 14 additions & 6 deletions src/connector/src/connector_common/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ mod v1 {

let catalog_type = self.catalog_type().to_string();

iceberg_configs.insert(CATALOG_TYPE.to_string(), catalog_type.clone());
iceberg_configs.insert(CATALOG_NAME.to_string(), self.catalog_name());

match catalog_type.as_str() {
Expand All @@ -267,14 +266,16 @@ mod v1 {
format!("iceberg.catalog.{}.warehouse", self.catalog_name()),
self.warehouse_path.clone(),
);
iceberg_configs.insert(CATALOG_TYPE.to_string(), "storage".into());
}
"rest" => {
"rest_rust" => {
let uri = self
.catalog_uri
.clone()
.with_context(|| "`catalog.uri` must be set in rest catalog".to_string())?;
iceberg_configs
.insert(format!("iceberg.catalog.{}.uri", self.catalog_name()), uri);
iceberg_configs.insert(CATALOG_TYPE.to_string(), "rest".into());
}
_ => {
bail!(
Expand Down Expand Up @@ -351,15 +352,16 @@ mod v1 {
java_catalog_props: &HashMap<String, String>,
) -> ConnectorResult<CatalogRef> {
match self.catalog_type() {
"storage" | "rest" => {
"storage" | "rest_rust" => {
let iceberg_configs = self.build_iceberg_configs()?;
let catalog = load_catalog(&iceberg_configs).await?;
Ok(catalog)
}
catalog_type
if catalog_type == "hive"
|| catalog_type == "jdbc"
|| catalog_type == "glue" =>
|| catalog_type == "glue"
|| catalog_type == "rest" =>
{
// Create java catalog
let (base_catalog_config, java_catalog_props) =
Expand All @@ -368,6 +370,7 @@ mod v1 {
"hive" => "org.apache.iceberg.hive.HiveCatalog",
"jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
"glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
"rest" => "org.apache.iceberg.rest.RESTCatalog",
_ => unreachable!(),
};

Expand Down Expand Up @@ -444,7 +447,7 @@ mod v2 {
let catalog = storage_catalog::StorageCatalog::new(config)?;
Ok(Arc::new(catalog))
}
"rest" => {
"rest_rust" => {
let mut iceberg_configs = HashMap::new();
if let Some(region) = &self.region {
iceberg_configs.insert(S3_REGION.to_string(), region.clone().to_string());
Expand Down Expand Up @@ -512,13 +515,18 @@ mod v2 {
let catalog = iceberg_catalog_glue::GlueCatalog::new(config).await?;
Ok(Arc::new(catalog))
}
catalog_type if catalog_type == "hive" || catalog_type == "jdbc" => {
catalog_type
if catalog_type == "hive"
|| catalog_type == "jdbc"
|| catalog_type == "rest" =>
{
// Create java catalog
let (base_catalog_config, java_catalog_props) =
self.build_jni_catalog_configs(java_catalog_props)?;
let catalog_impl = match catalog_type {
"hive" => "org.apache.iceberg.hive.HiveCatalog",
"jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
"rest" => "org.apache.iceberg.rest.RESTCatalog",
_ => unreachable!(),
};

Expand Down

0 comments on commit f0ece45

Please sign in to comment.