Skip to content

feat(query): support create/drop table under iceberg catalog #17619

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1490,6 +1490,7 @@ impl SchemaApiTestSuite {
if_exists: false,
tenant: tenant.clone(),
db_id: *db_id,
db_name: "db".to_string(),
table_name: table_name.to_string(),
tb_id: table_id,
engine: "FUSE".to_string(),
Expand Down Expand Up @@ -1811,6 +1812,7 @@ impl SchemaApiTestSuite {
if_exists: false,
tenant: tenant.clone(),
db_id: *db_id,
db_name: "db1".to_string(),
table_name: tbl_name.to_string(),
tb_id,
engine: "FUSE".to_string(),
Expand Down Expand Up @@ -1868,6 +1870,7 @@ impl SchemaApiTestSuite {
db_id: *db_id,
table_name: tbl_name.to_string(),
tb_id,
db_name: "db3".to_string(),
engine: "FUSE".to_string(),
session_id: "".to_string(),
};
Expand All @@ -1889,6 +1892,7 @@ impl SchemaApiTestSuite {
db_id: *db_id,
table_name: tbl_name.to_string(),
tb_id,
db_name: "db3".to_string(),
engine: "FUSE".to_string(),
session_id: "".to_string(),
};
Expand Down Expand Up @@ -4304,6 +4308,7 @@ impl SchemaApiTestSuite {
if_exists: false,
tenant: req.name_ident.tenant.clone(),
db_id: *db2_id,
db_name: "db2".to_string(),
table_name: req.name_ident.table_name.clone(),
tb_id: resp.table_id,
engine: "FUSE".to_string(),
Expand All @@ -4326,6 +4331,7 @@ impl SchemaApiTestSuite {
if_exists: false,
tenant: req.name_ident.tenant.clone(),
db_id: *db2_id,
db_name: "db2".to_string(),
table_name: req.name_ident.table_name.clone(),
tb_id: resp.table_id,
engine: "FUSE".to_string(),
Expand Down Expand Up @@ -4400,6 +4406,7 @@ impl SchemaApiTestSuite {
if_exists: false,
tenant: req.name_ident.tenant.clone(),
db_id: *db3_id,
db_name: "db3".to_string(),
table_name: req.name_ident.table_name.clone(),
tb_id: resp.table_id,
engine: "FUSE".to_string(),
Expand All @@ -4423,6 +4430,7 @@ impl SchemaApiTestSuite {
if_exists: false,
tenant: req.name_ident.tenant.clone(),
db_id: *db3_id,
db_name: "db3".to_string(),
table_name: req.name_ident.table_name.clone(),
tb_id: resp.table_id,
engine: "FUSE".to_string(),
Expand Down Expand Up @@ -4597,6 +4605,7 @@ impl SchemaApiTestSuite {
tenant: req.name_ident.tenant.clone(),
db_id,
table_name: req.name_ident.table_name.clone(),
db_name: db.to_string(),
tb_id: resp.table_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
Expand Down Expand Up @@ -4840,6 +4849,7 @@ impl SchemaApiTestSuite {
tenant: tbl_name_ident.tenant.clone(),
db_id: old_db.database_id.db_id,
table_name: tbl_name_ident.table_name.clone(),
db_name: db_name.to_string(),
tb_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
Expand Down Expand Up @@ -4885,6 +4895,7 @@ impl SchemaApiTestSuite {
tenant: tenant.clone(),
db_id: old_db.database_id.db_id,
table_name: tbl_name.to_string(),
db_name: db_name.to_string(),
tb_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
Expand Down Expand Up @@ -4936,6 +4947,7 @@ impl SchemaApiTestSuite {
tenant: tenant.clone(),
db_id: old_db.database_id.db_id,
table_name: tbl_name.to_string(),
db_name: tbl_name.to_string(),
tb_id: tb_info.ident.table_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
Expand Down Expand Up @@ -5023,6 +5035,7 @@ impl SchemaApiTestSuite {
tenant: tenant.clone(),
db_id: cur_db.database_id.db_id,
table_name: tbl_name.to_string(),
db_name: db_name.to_string(),
tb_id: new_tb_info.ident.table_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
Expand Down Expand Up @@ -8214,6 +8227,7 @@ where MT: SchemaApi + kvapi::AsKVApi<Error = MetaError>
if_exists: false,
db_id: self.db_id,
tb_id: self.table_id,
db_name: "".to_string(),
engine: "FUSE".to_string(),
session_id: "".to_string(),
};
Expand Down
2 changes: 2 additions & 0 deletions src/meta/app/src/schema/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,8 @@ pub struct DropTableByIdReq {

pub db_id: MetaId,

pub db_name: String,

pub engine: String,

pub session_id: String,
Expand Down
1 change: 1 addition & 0 deletions src/meta/binaries/metabench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ async fn benchmark_table(client: &Arc<ClientHandle>, prefix: u64, client_num: u6
if_exists: false,
tenant: tenant(),
db_id,
db_name: db_name().to_string(),
table_name: table_name(),
tb_id: t.ident.table_id,
engine: "FUSE".to_string(),
Expand Down
1 change: 1 addition & 0 deletions src/query/ee/src/stream/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ impl StreamHandler for RealStreamHandler {
table_name: stream_name.clone(),
tb_id: table.get_id(),
db_id: db.get_db_info().database_id.db_id,
db_name: db.name().to_string(),
engine: engine.to_string(),
session_id: "".to_string(),
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ impl Interpreter for DropTableInterpreter {
table_name: tbl_name.to_string(),
tb_id: tbl.get_table_info().ident.table_id,
db_id: db.get_db_info().database_id.db_id,
db_name: db.name().to_string(),
engine: tbl.engine().to_string(),
session_id: tbl
.options()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ impl Interpreter for DropViewInterpreter {
table_name: self.plan.view_name.clone(),
tb_id: table.get_id(),
db_id: db.get_db_info().database_id.db_id,
db_name: db.name().to_string(),
engine: table.engine().to_string(),
session_id: table
.options()
Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1773,6 +1773,7 @@ impl TableContext for QueryContext {
tb_id: table.get_table_info().ident.table_id,
table_name: table_name.to_string(),
db_id: db.get_db_info().database_id.db_id,
db_name: db.name().to_string(),
engine: table.engine().to_string(),
session_id: table
.options()
Expand Down
1 change: 1 addition & 0 deletions src/query/service/tests/it/catalogs/database_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ async fn test_catalogs_table() -> Result<()> {
table_name: "test_table".to_string(),
tb_id: tbl.get_table_info().ident.table_id,
db_id: db.get_db_info().database_id.db_id,
db_name: db.name().to_string(),
engine: tbl.engine().to_string(),
session_id: "".to_string(),
})
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ license = "Apache-2.0"
publish = false

[dependencies]
arrow-schema = { workspace = true }
async-backtrace = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
Expand Down
14 changes: 9 additions & 5 deletions src/query/storages/iceberg/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,13 +471,17 @@ impl Catalog for IcebergCatalog {
}

#[async_backtrace::framed]
async fn create_table(&self, _req: CreateTableReq) -> Result<CreateTableReply> {
unimplemented!()
async fn create_table(&self, req: CreateTableReq) -> Result<CreateTableReply> {
let db = self
.get_database(&req.name_ident.tenant, &req.name_ident.db_name)
.await?;
db.create_table(req).await
}

#[async_backtrace::framed]
async fn drop_table_by_id(&self, _req: DropTableByIdReq) -> Result<DropTableReply> {
unimplemented!()
async fn drop_table_by_id(&self, req: DropTableByIdReq) -> Result<DropTableReply> {
let db = self.get_database(&req.tenant, &req.db_name).await?;
db.drop_table_by_id(req).await
}

#[async_backtrace::framed]
Expand Down Expand Up @@ -664,7 +668,7 @@ impl Catalog for IcebergCatalog {

// Get table engines
fn get_table_engines(&self) -> Vec<StorageDescription> {
unimplemented!()
vec![]
}

async fn create_sequence(&self, _req: CreateSequenceReq) -> Result<CreateSequenceReply> {
Expand Down
141 changes: 141 additions & 0 deletions src/query/storages/iceberg/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,38 @@

//! Wrapping of the parent directory containing iceberg tables

use std::collections::HashMap;
use std::sync::Arc;

use arrow_schema::Field;
use arrow_schema::Schema;
use async_trait::async_trait;
use databend_common_catalog::database::Database;
use databend_common_catalog::table::Table;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::TableSchema;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_app::schema::CreateTableReply;
use databend_common_meta_app::schema::CreateTableReq;
use databend_common_meta_app::schema::DatabaseId;
use databend_common_meta_app::schema::DatabaseInfo;
use databend_common_meta_app::schema::DatabaseMeta;
use databend_common_meta_app::schema::DropTableByIdReq;
use databend_common_meta_app::schema::DropTableReply;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_types::seq_value::SeqV;
use iceberg::arrow::arrow_schema_to_schema;
use iceberg::spec::Schema as IcebergSchema;
use iceberg::TableCreation;
use iceberg::TableIdent;

use crate::table::IcebergTable;
use crate::IcebergCatalog;

const PARQUET_FIELD_ID_META_KEY: &str = "PARQUET:field_id";

#[derive(Clone, Debug)]
pub struct IcebergDatabase {
ctl: IcebergCatalog,
Expand Down Expand Up @@ -116,4 +131,130 @@ impl Database for IcebergDatabase {
.map(|table_name| table_name.name.to_string())
.collect())
}

// create iceberg_catalog.db.t (col_name type);
#[async_backtrace::framed]
async fn create_table(&self, req: CreateTableReq) -> Result<CreateTableReply> {
let table_name = TableIdent::new(self.ident.clone(), req.table_name().to_string());

match req.create_option {
CreateOption::Create => {}
CreateOption::CreateIfNotExists => {
if let Ok(exists) = self.ctl.iceberg_catalog().table_exists(&table_name).await {
if exists {
return Ok(CreateTableReply {
table_id: 0,
table_id_seq: None,
db_id: 0,
new_table: true,
spec_vec: None,
prev_table_id: None,
orphan_table_name: None,
});
}
}
}
CreateOption::CreateOrReplace => {
self.drop_table_by_id(DropTableByIdReq {
if_exists: true,
tenant: req.tenant().clone(),
tb_id: 0,
table_name: req.table_name().to_string(),
db_id: 0,
db_name: req.db_name().to_string(),
engine: req.table_meta.engine.to_string(),
session_id: "".to_string(),
})
.await?;
}
}

let table_create_option = TableCreation::builder()
.name(req.table_name().to_string())
.properties(HashMap::new())
.schema(convert_table_schema(
req.table_meta.schema.as_ref(),
req.db_name(),
req.table_name(),
)?)
.build();

let _ = self
.ctl
.iceberg_catalog()
.create_table(&self.ident, table_create_option)
.await
.map_err(|err| {
ErrorCode::Internal(format!(
"Iceberg create table {}.{} failed: {err:?}",
req.db_name(),
req.table_name()
))
})?;
Ok(CreateTableReply {
table_id: 0,
table_id_seq: None,
db_id: 0,
new_table: true,
spec_vec: None,
prev_table_id: None,
orphan_table_name: None,
})
}

#[async_backtrace::framed]
async fn drop_table_by_id(&self, req: DropTableByIdReq) -> Result<DropTableReply> {
let table_name = TableIdent::new(self.ident.clone(), req.table_name.to_string());
if let Err(err) = self
.ctl
.iceberg_catalog()
.drop_table(&table_name)
.await
.map_err(|err| {
ErrorCode::Internal(format!(
"Iceberg drop table {}.{} failed: {err:?}",
req.db_name, req.table_name
))
})
{
if req.if_exists {
Ok(DropTableReply {})
} else {
Err(err)
}
} else {
Ok(DropTableReply {})
}
}
}

fn convert_table_schema(
schema: &TableSchema,
db_name: &str,
table_name: &str,
) -> Result<IcebergSchema> {
let mut fields = vec![];
for f in schema.fields() {
let mut metadata = HashMap::new();
metadata.insert(
PARQUET_FIELD_ID_META_KEY.to_string(),
f.column_id.to_string(),
);
fields.push(Field::from(f).with_metadata(metadata));
}
let metadata = schema
.metadata
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();

let arrow_schema = Schema::new(fields).with_metadata(metadata);
let schema = arrow_schema_to_schema(&arrow_schema).map_err(|err| {
ErrorCode::Internal(format!(
"Iceberg create table {}.{} failed: {err:?}",
db_name, table_name
))
})?;

Ok(schema) // Return the converted Iceberg schema
}
Loading