Skip to content

Commit db73d2d

Browse files
committed
feat: add catalog API
1 parent 1f69c4a commit db73d2d

File tree

3 files changed

+351
-1
lines changed

3 files changed

+351
-1
lines changed

crates/paimon/src/catalog/mod.rs

Lines changed: 319 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashMap;
19+
use std::fmt;
20+
use std::hash::Hash;
21+
22+
use async_trait::async_trait;
23+
use chrono::Duration;
24+
25+
use crate::error::Result;
26+
use crate::io::FileIO;
27+
use crate::spec::{RowType, SchemaChange, TableSchema};
28+
29+
/// This interface is responsible for reading and writing metadata such as database/table from a paimon catalog.
30+
///
31+
/// Impl References: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java#L42>
32+
#[async_trait]
33+
pub trait Catalog: Send + Sync {
34+
fn default_database_info(&self) -> &str {
35+
"default"
36+
}
37+
38+
fn default_system_table_splitter_info(&self) -> &str {
39+
"$"
40+
}
41+
42+
fn default_system_database_name_info(&self) -> &str {
43+
"sys"
44+
}
45+
46+
/// Returns the warehouse root path containing all database directories in this catalog.
47+
fn warehouse(&self) -> &str;
48+
49+
/// Returns the catalog options.
50+
fn options(&self) -> &HashMap<String, String>;
51+
52+
/// Returns the FileIO instance.
53+
fn file_io(&self) -> &FileIO;
54+
55+
/// Lists all databases in this catalog.
56+
async fn list_databases(&self) -> Result<Vec<String>>;
57+
58+
/// Checks if a database exists in this catalog.
59+
async fn database_exists(&self, database_name: &str) -> Result<bool>;
60+
61+
/// Creates a new database.
62+
async fn create_database(
63+
&self,
64+
name: &str,
65+
ignore_if_exists: bool,
66+
properties: Option<HashMap<String, String>>,
67+
) -> Result<()>;
68+
69+
/// Loads database properties.
70+
async fn load_database_properties(&self, name: &str) -> Result<HashMap<String, String>>;
71+
72+
/// Drops a database.
73+
async fn drop_database(
74+
&self,
75+
name: &str,
76+
ignore_if_not_exists: bool,
77+
cascade: bool,
78+
) -> Result<()>;
79+
80+
/// Returns a Table instance for the specified identifier.
81+
async fn get_table(&self, identifier: &Identifier) -> Result<Box<dyn Table>>;
82+
83+
/// Lists all tables in the specified database.
84+
async fn list_tables(&self, database_name: &str) -> Result<Vec<String>>;
85+
86+
/// Checks if a table exists.
87+
async fn table_exists(&self, identifier: &Identifier) -> Result<bool> {
88+
match self.get_table(identifier).await {
89+
Ok(_) => Ok(true),
90+
Err(e) => match e {
91+
crate::error::Error::TableNotExist { .. } => Ok(false),
92+
_ => Err(e),
93+
},
94+
}
95+
}
96+
97+
/// Drops a table.
98+
async fn drop_table(&self, identifier: &Identifier, ignore_if_not_exists: bool) -> Result<()>;
99+
100+
/// Creates a new table.
101+
async fn create_table(
102+
&self,
103+
identifier: &Identifier,
104+
schema: TableSchema,
105+
ignore_if_exists: bool,
106+
) -> Result<()>;
107+
108+
/// Renames a table.
109+
async fn rename_table(
110+
&self,
111+
from_table: &Identifier,
112+
to_table: &Identifier,
113+
ignore_if_not_exists: bool,
114+
) -> Result<()>;
115+
116+
/// Alters an existing table.
117+
async fn alter_table(
118+
&self,
119+
identifier: &Identifier,
120+
changes: Vec<SchemaChange>,
121+
ignore_if_not_exists: bool,
122+
) -> Result<()>;
123+
124+
/// Drops a partition from the specified table.
125+
async fn drop_partition(
126+
&self,
127+
identifier: &Identifier,
128+
partitions: &HashMap<String, String>,
129+
) -> Result<()>;
130+
131+
/// Returns whether this catalog is case-sensitive.
132+
fn case_sensitive(&self) -> bool {
133+
true
134+
}
135+
}
136+
137+
/// Identifies an object in a catalog.
138+
///
139+
/// Impl References: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java#L35>
140+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
141+
pub struct Identifier {
142+
database: String,
143+
table: String,
144+
}
145+
146+
impl Identifier {
147+
pub const UNKNOWN_DATABASE: &'static str = "unknown";
148+
149+
/// Create a new identifier.
150+
pub fn new(database: String, table: String) -> Self {
151+
Self { database, table }
152+
}
153+
154+
/// Get the table name.
155+
pub fn database_name(&self) -> &str {
156+
&self.database
157+
}
158+
159+
/// Get the table name.
160+
pub fn object_name(&self) -> &str {
161+
&self.table
162+
}
163+
164+
/// Get the full name of the identifier.
165+
pub fn full_name(&self) -> String {
166+
if self.database == Self::UNKNOWN_DATABASE {
167+
self.table.clone()
168+
} else {
169+
format!("{}.{}", self.database, self.table)
170+
}
171+
}
172+
173+
/// Get the full name of the identifier with a specified character.
174+
pub fn escaped_full_name(&self) -> String {
175+
self.escaped_full_name_with_char('`')
176+
}
177+
178+
/// Get the full name of the identifier with a specified character.
179+
pub fn escaped_full_name_with_char(&self, escape_char: char) -> String {
180+
format!(
181+
"{0}{1}{0}.{0}{2}{0}",
182+
escape_char, self.database, self.table
183+
)
184+
}
185+
186+
/// Create a new identifier.
187+
pub fn create(db: &str, table: &str) -> Self {
188+
Self::new(db.to_string(), table.to_string())
189+
}
190+
}
191+
192+
impl fmt::Display for Identifier {
193+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194+
write!(f, "{}", self.full_name())
195+
}
196+
}
197+
198+
/// A table provides basic abstraction for a table type and table scan, and table read.
199+
///
200+
/// Impl Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/table/Table.java#L41>
201+
pub trait Table {
202+
// ================== Table Metadata =====================
203+
204+
/// A name to identify this table.
205+
fn name(&self) -> &str;
206+
207+
/// Returns the row type of this table.
208+
fn row_type(&self) -> &RowType;
209+
210+
/// Partition keys of this table.
211+
fn partition_keys(&self) -> Vec<String>;
212+
213+
/// Primary keys of this table.
214+
fn primary_keys(&self) -> Vec<String>;
215+
216+
/// Options of this table.
217+
fn options(&self) -> HashMap<String, String>;
218+
219+
/// Optional comment of this table.
220+
fn comment(&self) -> Option<&String>;
221+
222+
// ================= Table Operations ====================
223+
224+
/// Copy this table with adding dynamic options.
225+
fn copy(&self, dynamic_options: HashMap<String, String>) -> Box<dyn Table>;
226+
227+
/// Rollback table's state to a specific snapshot.
228+
fn rollback_to(&mut self, snapshot_id: u64);
229+
230+
/// Create a tag from given snapshot.
231+
fn create_tag(&mut self, tag_name: &str, from_snapshot_id: u64);
232+
233+
fn create_tag_with_retention(
234+
&mut self,
235+
tag_name: &str,
236+
from_snapshot_id: u64,
237+
time_retained: Duration,
238+
);
239+
240+
/// Create a tag from the latest snapshot.
241+
fn create_tag_from_latest(&mut self, tag_name: &str);
242+
243+
fn create_tag_from_latest_with_retention(&mut self, tag_name: &str, time_retained: Duration);
244+
245+
/// Delete a tag by name.
246+
fn delete_tag(&mut self, tag_name: &str);
247+
248+
/// Rollback table's state to a specific tag.
249+
fn rollback_to_tag(&mut self, tag_name: &str);
250+
251+
/// Create an empty branch.
252+
fn create_branch(&mut self, branch_name: &str);
253+
254+
/// Create a branch from given snapshot.
255+
fn create_branch_from_snapshot(&mut self, branch_name: &str, snapshot_id: u64);
256+
257+
/// Create a branch from given tag.
258+
fn create_branch_from_tag(&mut self, branch_name: &str, tag_name: &str);
259+
260+
/// Delete a branch by branchName.
261+
fn delete_branch(&mut self, branch_name: &str);
262+
}
263+
#[cfg(test)]
264+
mod catalog_tests {
265+
use super::*;
266+
267+
#[tokio::test]
268+
async fn test_full_name_identifier() {
269+
let database_name = "trade".to_string();
270+
let table_name = "dwv_xxxxx".to_string();
271+
let my_sign = "`".to_string();
272+
273+
let identifier = Identifier {
274+
database: database_name.clone(),
275+
table: table_name.clone(),
276+
};
277+
278+
assert_eq!(identifier.database_name(), database_name);
279+
assert_eq!(identifier.object_name(), table_name);
280+
assert_eq!(
281+
identifier.full_name(),
282+
format!("{}.{}", database_name.clone(), table_name.clone())
283+
);
284+
assert_eq!(
285+
identifier.escaped_full_name(),
286+
format!(
287+
"{0}{1}{0}.{0}{2}{0}",
288+
my_sign,
289+
database_name.clone(),
290+
table_name.clone()
291+
)
292+
);
293+
}
294+
295+
#[tokio::test]
296+
async fn test_unkown_name_identifier() {
297+
let database_name = "unknown".to_string();
298+
let table_name = "dwv_xxxxx".to_string();
299+
let my_sign = "`".to_string();
300+
301+
let identifier = Identifier {
302+
database: database_name.to_string(),
303+
table: table_name.clone(),
304+
};
305+
306+
assert_eq!(identifier.database_name(), database_name);
307+
assert_eq!(identifier.object_name(), table_name);
308+
assert_eq!(identifier.full_name(), table_name.clone());
309+
assert_eq!(
310+
identifier.escaped_full_name(),
311+
format!(
312+
"{0}{1}{0}.{0}{2}{0}",
313+
my_sign,
314+
database_name.clone(),
315+
table_name.clone()
316+
)
317+
);
318+
}
319+
}

crates/paimon/src/error.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use crate::catalog::Identifier;
1819
use snafu::prelude::*;
19-
2020
/// Result type used in paimon.
2121
pub type Result<T, E = Error> = std::result::Result<T, E>;
2222

@@ -65,6 +65,36 @@ pub enum Error {
6565
display("Paimon hitting invalid file index format: {}", message)
6666
)]
6767
FileIndexFormatInvalid { message: String },
68+
69+
#[snafu(display("Database {} is not empty.", database))]
70+
DatabaseNotEmpty { database: String },
71+
72+
#[snafu(display("Database {} already exists.", database))]
73+
DatabaseAlreadyExist { database: String },
74+
75+
#[snafu(display("Database {} does not exist.", database))]
76+
DatabaseNotExist { database: String },
77+
78+
#[snafu(display("Can't do operation on system database."))]
79+
ProcessSystemDatabase,
80+
81+
#[snafu(display("Table {} already exists.", identifier.full_name()))]
82+
TableAlreadyExist { identifier: Identifier },
83+
84+
#[snafu(display("Table {} does not exist.", identifier.full_name()))]
85+
TableNotExist { identifier: Identifier },
86+
87+
#[snafu(display("Partition {} do not exist in the table {}.", identifier.full_name(), partitions))]
88+
PartitionNotExist {
89+
identifier: Identifier,
90+
partitions: String,
91+
},
92+
93+
#[snafu(display("Column {} already exists.", column_name))]
94+
ColumnAlreadyExist { column_name: String },
95+
96+
#[snafu(display("Column {} does not exist.", column_name))]
97+
ColumnNotExist { column_name: String },
6898
}
6999

70100
impl From<opendal::Error> for Error {

crates/paimon/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,4 @@ pub use error::Result;
2222
pub mod file_index;
2323
pub mod io;
2424
pub mod spec;
25+
pub mod catalog;

0 commit comments

Comments
 (0)