Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update table config to contain new config keys #2127

Merged
merged 1 commit into from
Jan 27, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 45 additions & 3 deletions crates/deltalake-core/src/table/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ pub enum DeltaConfigKey {
/// statistics beyond this number, even when such statistics exist).
DataSkippingNumIndexedCols,

/// A comma-separated list of column names on which Delta Lake collects statistics to enhance
/// data skipping functionality. This property takes precedence over
/// [DataSkippingNumIndexedCols](Self::DataSkippingNumIndexedCols).
DataSkippingStatsColumns,

/// The shortest duration for Delta Lake to keep logically deleted data files before deleting
/// them physically. This is to prevent failures in stale readers after compactions or partition overwrites.
///
Expand All @@ -61,6 +66,9 @@ pub enum DeltaConfigKey {
/// true to enable change data feed.
EnableChangeDataFeed,

/// true to enable deletion vectors and predictive I/O for updates.
EnableDeletionVectors,

/// The degree to which a transaction must be isolated from modifications made by concurrent transactions.
///
/// Valid values are `Serializable` and `WriteSerializable`.
Expand Down Expand Up @@ -120,8 +128,10 @@ impl AsRef<str> for DeltaConfigKey {
Self::CheckpointPolicy => "delta.checkpointPolicy",
Self::ColumnMappingMode => "delta.columnMapping.mode",
Self::DataSkippingNumIndexedCols => "delta.dataSkippingNumIndexedCols",
Self::DataSkippingStatsColumns => "delta.dataSkippingStatsColumns",
Self::DeletedFileRetentionDuration => "delta.deletedFileRetentionDuration",
Self::EnableChangeDataFeed => "delta.enableChangeDataFeed",
Self::EnableDeletionVectors => "delta.enableDeletionVectors",
Self::IsolationLevel => "delta.isolationLevel",
Self::LogRetentionDuration => "delta.logRetentionDuration",
Self::EnableExpiredLogCleanup => "delta.enableExpiredLogCleanup",
Expand Down Expand Up @@ -150,10 +160,12 @@ impl FromStr for DeltaConfigKey {
"delta.checkpointPolicy" => Ok(Self::CheckpointPolicy),
"delta.columnMapping.mode" => Ok(Self::ColumnMappingMode),
"delta.dataSkippingNumIndexedCols" => Ok(Self::DataSkippingNumIndexedCols),
"delta.dataSkippingStatsColumns" => Ok(Self::DataSkippingStatsColumns),
"delta.deletedFileRetentionDuration" | "deletedFileRetentionDuration" => {
Ok(Self::DeletedFileRetentionDuration)
}
"delta.enableChangeDataFeed" => Ok(Self::EnableChangeDataFeed),
"delta.enableDeletionVectors" => Ok(Self::EnableDeletionVectors),
"delta.isolationLevel" => Ok(Self::IsolationLevel),
"delta.logRetentionDuration" | "logRetentionDuration" => Ok(Self::LogRetentionDuration),
"delta.enableExpiredLogCleanup" | "enableExpiredLogCleanup" => {
Expand All @@ -180,9 +192,9 @@ pub enum DeltaConfigError {
}

macro_rules! table_config {
($(($key:expr, $name:ident, $ret:ty, $default:literal),)*) => {
($(($docs:literal, $key:expr, $name:ident, $ret:ty, $default:literal),)*) => {
$(
/// read property $key
#[doc = $docs]
pub fn $name(&self) -> $ret {
self.0
.get($key.as_ref())
Expand All @@ -198,45 +210,67 @@ pub struct TableConfig<'a>(pub(crate) &'a HashMap<String, Option<String>>);

impl<'a> TableConfig<'a> {
table_config!(
(DeltaConfigKey::AppendOnly, append_only, bool, false),
(
"true for this Delta table to be append-only",
DeltaConfigKey::AppendOnly,
append_only,
bool,
false
),
(
"true for Delta Lake to write file statistics in checkpoints in JSON format for the stats column.",
DeltaConfigKey::CheckpointWriteStatsAsJson,
write_stats_as_json,
bool,
true
),
(
"true for Delta Lake to write file statistics to checkpoints in struct format",
DeltaConfigKey::CheckpointWriteStatsAsStruct,
write_stats_as_struct,
bool,
false
),
(
"The target file size in bytes or higher units for file tuning",
DeltaConfigKey::TargetFileSize,
target_file_size,
i64,
// Databricks / spark defaults to 104857600 (bytes) or 100mb
104857600
),
(
"true to enable change data feed.",
DeltaConfigKey::EnableChangeDataFeed,
enable_change_data_feed,
bool,
false
),
(
"true to enable deletion vectors and predictive I/O for updates.",
DeltaConfigKey::EnableDeletionVectors,
enable_deletio0n_vectors,
bool,
// in databricks the default is dependent on the workspace settings and runtime version
// https://learn.microsoft.com/en-us/azure/databricks/administration-guide/workspace-settings/deletion-vectors
false
),
(
"The number of columns for Delta Lake to collect statistics about for data skipping.",
DeltaConfigKey::DataSkippingNumIndexedCols,
num_indexed_cols,
i32,
32
),
(
"whether to cleanup expired logs",
DeltaConfigKey::EnableExpiredLogCleanup,
enable_expired_log_cleanup,
bool,
true
),
(
"Interval (number of commits) after which a new checkpoint should be created",
DeltaConfigKey::CheckpointInterval,
checkpoint_interval,
i32,
Expand Down Expand Up @@ -318,6 +352,14 @@ impl<'a> TableConfig<'a> {
})
.collect()
}

/// Column names on which Delta Lake collects statistics to enhance data skipping functionality.
/// This property takes precedence over [num_indexed_cols](Self::num_indexed_cols).
pub fn stats_columns(&self) -> Option<Vec<&str>> {
self.0
.get(DeltaConfigKey::DataSkippingStatsColumns.as_ref())
.and_then(|o| o.as_ref().map(|v| v.split(',').collect()))
}
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
Expand Down
Loading