Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: prefer usage of metadata and protocol fields #1935

Merged
merged 6 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ fn get_prune_stats(table: &DeltaTable, column: &Column, get_max: bool) -> Option
}

let data_type = field.data_type().try_into().ok()?;
let partition_columns = &table.get_metadata().ok()?.partition_columns;
let partition_columns = &table.metadata().ok()?.partition_columns;

let values = table.get_state().files().iter().map(|add| {
if partition_columns.contains(&column.name) {
Expand Down Expand Up @@ -310,7 +310,7 @@ impl PruningStatistics for DeltaTable {
///
/// Note: the returned array must contain `num_containers()` rows.
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
let partition_columns = &self.get_metadata().ok()?.partition_columns;
let partition_columns = &self.metadata().ok()?.partition_columns;

let values = self.get_state().files().iter().map(|add| {
if let Ok(Some(statistics)) = add.get_stats() {
Expand Down Expand Up @@ -1602,6 +1602,7 @@ mod tests {
tags: None,
base_row_id: None,
default_row_commit_version: None,
clustering_provider: None,
};
let schema = ArrowSchema::new(vec![
Field::new("year", ArrowDataType::Int64, true),
Expand Down
1 change: 1 addition & 0 deletions crates/deltalake-core/src/kernel/actions/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ lazy_static! {
deletion_vector_field(),
StructField::new("baseRowId", DataType::long(), true),
StructField::new("defaultRowCommitVersion", DataType::long(), true),
StructField::new("clusteringProvider", DataType::string(), true),
]))),
true,
);
Expand Down
3 changes: 3 additions & 0 deletions crates/deltalake-core/src/kernel/actions/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,9 @@ pub struct Add {
/// First commit version in which an add action with the same path was committed to the table.
pub default_row_commit_version: Option<i64>,

/// The name of the clustering implementation
pub clustering_provider: Option<String>,

// TODO remove migration filds added to not do too many business logic changes in one PR
/// Partition values stored in raw parquet struct format. In this struct, the column names
/// correspond to the partition columns and the values are stored in their corresponding data
Expand Down
52 changes: 27 additions & 25 deletions crates/deltalake-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ pub fn crate_version() -> &'static str {

#[cfg(test)]
mod tests {
use itertools::Itertools;

use super::*;
use crate::table::PeekCommit;
use std::collections::HashMap;
Expand All @@ -204,10 +206,10 @@ mod tests {
async fn read_delta_2_0_table_without_version() {
let table = crate::open_table("./tests/data/delta-0.2.0").await.unwrap();
assert_eq!(table.version(), 3);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(table.protocol().min_writer_version, 2);
assert_eq!(table.protocol().min_reader_version, 1);
assert_eq!(
table.get_files(),
table.get_files_iter().collect_vec(),
vec![
Path::from("part-00000-cb6b150b-30b8-4662-ad28-ff32ddab96d2-c000.snappy.parquet"),
Path::from("part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet"),
Expand Down Expand Up @@ -241,8 +243,8 @@ mod tests {
table_to_update.update().await.unwrap();

assert_eq!(
table_newest_version.get_files(),
table_to_update.get_files()
table_newest_version.get_files_iter().collect_vec(),
table_to_update.get_files_iter().collect_vec()
);
}
#[tokio::test]
Expand All @@ -251,10 +253,10 @@ mod tests {
.await
.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(table.protocol().min_writer_version, 2);
assert_eq!(table.protocol().min_reader_version, 1);
assert_eq!(
table.get_files(),
table.get_files_iter().collect_vec(),
vec![
Path::from("part-00000-b44fcdb0-8b06-4f3a-8606-f8311a96f6dc-c000.snappy.parquet"),
Path::from("part-00001-185eca06-e017-4dea-ae49-fc48b973e37e-c000.snappy.parquet"),
Expand All @@ -265,10 +267,10 @@ mod tests {
.await
.unwrap();
assert_eq!(table.version(), 2);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(table.protocol().min_writer_version, 2);
assert_eq!(table.protocol().min_reader_version, 1);
assert_eq!(
table.get_files(),
table.get_files_iter().collect_vec(),
vec![
Path::from("part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet"),
Path::from("part-00001-c373a5bd-85f0-4758-815e-7eb62007a15c-c000.snappy.parquet"),
Expand All @@ -279,10 +281,10 @@ mod tests {
.await
.unwrap();
assert_eq!(table.version(), 3);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(table.protocol().min_writer_version, 2);
assert_eq!(table.protocol().min_reader_version, 1);
assert_eq!(
table.get_files(),
table.get_files_iter().collect_vec(),
vec![
Path::from("part-00000-cb6b150b-30b8-4662-ad28-ff32ddab96d2-c000.snappy.parquet"),
Path::from("part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet"),
Expand All @@ -295,10 +297,10 @@ mod tests {
async fn read_delta_8_0_table_without_version() {
let table = crate::open_table("./tests/data/delta-0.8.0").await.unwrap();
assert_eq!(table.version(), 1);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(table.protocol().min_writer_version, 2);
assert_eq!(table.protocol().min_reader_version, 1);
assert_eq!(
table.get_files(),
table.get_files_iter().collect_vec(),
vec![
Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"),
Path::from("part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet")
Expand Down Expand Up @@ -341,21 +343,21 @@ mod tests {
async fn read_delta_8_0_table_with_load_version() {
let mut table = crate::open_table("./tests/data/delta-0.8.0").await.unwrap();
assert_eq!(table.version(), 1);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(table.protocol().min_writer_version, 2);
assert_eq!(table.protocol().min_reader_version, 1);
assert_eq!(
table.get_files(),
table.get_files_iter().collect_vec(),
vec![
Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"),
Path::from("part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet"),
]
);
table.load_version(0).await.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(table.protocol().min_writer_version, 2);
assert_eq!(table.protocol().min_reader_version, 1);
assert_eq!(
table.get_files(),
table.get_files_iter().collect_vec(),
vec![
Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"),
Path::from("part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"),
Expand Down Expand Up @@ -483,7 +485,7 @@ mod tests {
.unwrap();

assert_eq!(
table.get_files(),
table.get_files_iter().collect_vec(),
vec![
Path::parse(
"x=A%2FA/part-00007-b350e235-2832-45df-9918-6cab4f7578f7.c000.snappy.parquet"
Expand Down Expand Up @@ -683,7 +685,7 @@ mod tests {
.unwrap();
assert_eq!(table.version(), 2);
assert_eq!(
table.get_files(),
table.get_files_iter().collect_vec(),
vec![Path::from(
"part-00000-7444aec4-710a-4a4c-8abe-3323499043e9.c000.snappy.parquet"
),]
Expand Down
3 changes: 2 additions & 1 deletion crates/deltalake-core/src/operations/convert_to_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ mod tests {
storage::config::StorageOptions,
Path,
};
use itertools::Itertools;
use pretty_assertions::assert_eq;
use std::fs;
use tempfile::tempdir;
Expand Down Expand Up @@ -501,7 +502,7 @@ mod tests {
"Testing location: {test_data_from:?}"
);

let mut files = table.get_files();
let mut files = table.get_files_iter().collect_vec();
files.sort();
assert_eq!(
files, expected_paths,
Expand Down
22 changes: 11 additions & 11 deletions crates/deltalake-core/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ mod tests {
.await
.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(table.get_metadata().unwrap().schema, table_schema)
assert_eq!(table.get_schema().unwrap(), &table_schema)
}

#[tokio::test]
Expand All @@ -362,7 +362,7 @@ mod tests {
.await
.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(table.get_metadata().unwrap().schema, table_schema)
assert_eq!(table.get_schema().unwrap(), &table_schema)
}

#[tokio::test]
Expand Down Expand Up @@ -391,14 +391,14 @@ mod tests {
.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(
table.get_min_reader_version(),
table.protocol().min_reader_version,
PROTOCOL.default_reader_version()
);
assert_eq!(
table.get_min_writer_version(),
table.protocol().min_writer_version,
PROTOCOL.default_writer_version()
);
assert_eq!(table.schema().unwrap(), &schema);
assert_eq!(table.get_schema().unwrap(), &schema);

// check we can overwrite default settings via adding actions
let protocol = Protocol {
Expand All @@ -413,8 +413,8 @@ mod tests {
.with_actions(vec![Action::Protocol(protocol)])
.await
.unwrap();
assert_eq!(table.get_min_reader_version(), 0);
assert_eq!(table.get_min_writer_version(), 0);
assert_eq!(table.protocol().min_reader_version, 0);
assert_eq!(table.protocol().min_writer_version, 0);

let table = CreateBuilder::new()
.with_location("memory://")
Expand All @@ -423,7 +423,7 @@ mod tests {
.await
.unwrap();
let append = table
.get_metadata()
.metadata()
.unwrap()
.configuration
.get(DeltaConfigKey::AppendOnly.as_ref())
Expand All @@ -445,7 +445,7 @@ mod tests {
.await
.unwrap();
assert_eq!(table.version(), 0);
let first_id = table.get_metadata().unwrap().id.clone();
let first_id = table.metadata().unwrap().id.clone();

let log_store = table.log_store;

Expand All @@ -464,7 +464,7 @@ mod tests {
.with_save_mode(SaveMode::Ignore)
.await
.unwrap();
assert_eq!(table.get_metadata().unwrap().id, first_id);
assert_eq!(table.metadata().unwrap().id, first_id);

// Check table is overwritten
let table = CreateBuilder::new()
Expand All @@ -473,6 +473,6 @@ mod tests {
.with_save_mode(SaveMode::Overwrite)
.await
.unwrap();
assert_ne!(table.get_metadata().unwrap().id, first_id)
assert_ne!(table.metadata().unwrap().id, first_id)
}
}
12 changes: 6 additions & 6 deletions crates/deltalake-core/src/operations/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,27 +207,27 @@ async fn execute(
let mut actions = vec![];
let protocol = if protocol_downgrade_allowed {
Protocol {
min_reader_version: table.get_min_reader_version(),
min_writer_version: table.get_min_writer_version(),
min_reader_version: table.protocol().min_reader_version,
min_writer_version: table.protocol().min_writer_version,
writer_features: if snapshot.protocol().min_writer_version < 7 {
None
} else {
table.get_writer_features().cloned()
table.protocol().writer_features.clone()
},
reader_features: if snapshot.protocol().min_reader_version < 3 {
None
} else {
table.get_reader_features().cloned()
table.protocol().reader_features.clone()
},
}
} else {
Protocol {
min_reader_version: max(
table.get_min_reader_version(),
table.protocol().min_reader_version,
snapshot.protocol().min_reader_version,
),
min_writer_version: max(
table.get_min_writer_version(),
table.protocol().min_writer_version,
snapshot.protocol().min_writer_version,
),
writer_features: snapshot.protocol().writer_features.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub fn create_add_action(
default_row_commit_version: None,
tags: None,
deletion_vector: None,
clustering_provider: None,
})
}

Expand Down
4 changes: 2 additions & 2 deletions crates/deltalake-core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ mod tests {
.await
.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(table.get_metadata().unwrap().schema, table_schema);
assert_eq!(table.get_schema().unwrap(), &table_schema);
let res = create_checkpoint_for(0, table.get_state(), table.log_store.as_ref()).await;
assert!(res.is_ok());

Expand Down Expand Up @@ -548,7 +548,7 @@ mod tests {
.await
.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(table.get_metadata().unwrap().schema, table_schema);
assert_eq!(table.get_schema().unwrap(), &table_schema);
match create_checkpoint_for(1, table.get_state(), table.log_store.as_ref()).await {
Ok(_) => {
/*
Expand Down
2 changes: 2 additions & 0 deletions crates/deltalake-core/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,7 @@ mod tests {
modification_time: 0,
base_row_id: None,
default_row_commit_version: None,
clustering_provider: None,
};

let stats = action.get_stats().unwrap().unwrap();
Expand Down Expand Up @@ -796,6 +797,7 @@ mod tests {
modification_time: 0,
base_row_id: None,
default_row_commit_version: None,
clustering_provider: None,
};

let stats = action.get_stats().unwrap().unwrap();
Expand Down
1 change: 1 addition & 0 deletions crates/deltalake-core/src/protocol/parquet_read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl Add {
base_row_id: None,
default_row_commit_version: None,
tags: None,
clustering_provider: None,
};

for (i, (name, _)) in record.get_column_iter().enumerate() {
Expand Down
1 change: 1 addition & 0 deletions crates/deltalake-core/src/storage/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ mod tests {
deletion_vector: None,
partition_values_parsed: None,
stats_parsed: None,
clustering_provider: None,
};

let meta: ObjectMeta = (&add).try_into().unwrap();
Expand Down
Loading