Skip to content

Commit c0ff963

Browse files
authored
Merge branch 'main' into fix/improve-caching-in-github-action
2 parents c507611 + 18f949e commit c0ff963

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+432
-328
lines changed

crates/aws/src/logstore/dynamodb_logstore.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ impl S3DynamoDbLogStore {
148148
),
149149
}
150150
}
151-
unreachable!("for loop yields Ok or Err in body when retyr = MAX_REPAIR_RETRIES")
151+
unreachable!("for loop yields Ok or Err in body when retry = MAX_REPAIR_RETRIES")
152152
}
153153

154154
fn map_retry_result(

crates/benchmarks/src/bin/merge.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ async fn benchmark_merge_tpcds(
199199
let table = DeltaTableBuilder::from_uri(table_url)?.load().await?;
200200

201201
let provider = DeltaTableProvider::try_new(
202-
table.snapshot()?.clone(),
202+
table.snapshot()?.snapshot().clone(),
203203
table.log_store(),
204204
DeltaScanConfig {
205205
file_column_name: Some("file_path".to_string()),

crates/core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ tokio = { workspace = true, features = [
6868
] }
6969

7070
# caching
71-
foyer = { version = "0.17.2", optional = true, features = ["serde"] }
71+
foyer = { version = "0.20.0", optional = true, features = ["serde"] }
7272
tempfile = { version = "3.19.1", optional = true }
7373

7474
# other deps (these should be organized and pulled into workspace.dependencies as necessary)

crates/core/src/delta_datafusion/expr.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,7 @@ macro_rules! format_option {
478478
}};
479479
}
480480

481-
/// Epoch days from ce calander until 1970-01-01
481+
/// Epoch days from ce calendar until 1970-01-01
482482
pub const EPOCH_DAYS_FROM_CE: i32 = 719_163;
483483

484484
struct ScalarValueFormat<'a> {

crates/core/src/delta_datafusion/mod.rs

Lines changed: 58 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ pub(crate) fn register_store(store: LogStoreRef, env: Arc<RuntimeEnv>) {
309309
/// columns must appear at the end of the schema. This is to align with how partition are handled
310310
/// at the physical level
311311
pub(crate) fn df_logical_schema(
312-
snapshot: &DeltaTableState,
312+
snapshot: &EagerSnapshot,
313313
file_column_name: &Option<String>,
314314
schema: Option<ArrowSchemaRef>,
315315
) -> DeltaResult<SchemaRef> {
@@ -576,13 +576,13 @@ impl DeltaDataChecker {
576576
}
577577

578578
/// Create a new DeltaDataChecker
579-
pub fn new(snapshot: &DeltaTableState) -> Self {
579+
pub fn new(snapshot: &EagerSnapshot) -> Self {
580580
let invariants = snapshot.schema().get_invariants().unwrap_or_default();
581581
let generated_columns = snapshot
582582
.schema()
583583
.get_generated_columns()
584584
.unwrap_or_default();
585-
let constraints = snapshot.table_config().get_constraints();
585+
let constraints = snapshot.table_properties().get_constraints();
586586
let non_nullable_columns = snapshot
587587
.schema()
588588
.fields()
@@ -937,16 +937,17 @@ fn join_batches_with_add_actions(
937937

938938
/// Determine which files contain a record that satisfies the predicate
939939
pub(crate) async fn find_files_scan(
940-
snapshot: &DeltaTableState,
940+
snapshot: &EagerSnapshot,
941941
log_store: LogStoreRef,
942942
state: &SessionState,
943943
expression: Expr,
944944
) -> DeltaResult<Vec<Add>> {
945945
let candidate_map: HashMap<String, Add> = snapshot
946-
.file_actions_iter(&log_store)
947-
.map_ok(|add| (add.path.clone(), add.to_owned()))
948-
.try_collect()
949-
.await?;
946+
.log_data()
947+
.iter()
948+
.map(|f| f.add_action())
949+
.map(|add| (add.path.clone(), add.to_owned()))
950+
.collect();
950951

951952
let scan_config = DeltaScanConfigBuilder {
952953
include_file_column: true,
@@ -997,10 +998,14 @@ pub(crate) async fn find_files_scan(
997998

998999
pub(crate) async fn scan_memory_table(
9991000
log_store: &dyn LogStore,
1000-
snapshot: &DeltaTableState,
1001+
snapshot: &EagerSnapshot,
10011002
predicate: &Expr,
10021003
) -> DeltaResult<Vec<Add>> {
1003-
let actions = snapshot.file_actions(log_store).await?;
1004+
let actions = snapshot
1005+
.log_data()
1006+
.iter()
1007+
.map(|f| f.add_action())
1008+
.collect_vec();
10041009

10051010
let batch = snapshot.add_actions_table(true)?;
10061011
let mut arrays = Vec::new();
@@ -1052,7 +1057,7 @@ pub(crate) async fn scan_memory_table(
10521057

10531058
/// Finds files in a snapshot that match the provided predicate.
10541059
pub async fn find_files(
1055-
snapshot: &DeltaTableState,
1060+
snapshot: &EagerSnapshot,
10561061
log_store: LogStoreRef,
10571062
state: &SessionState,
10581063
predicate: Option<Expr>,
@@ -1088,7 +1093,7 @@ pub async fn find_files(
10881093
}
10891094
}
10901095
None => Ok(FindFiles {
1091-
candidates: snapshot.file_actions(&log_store).await?,
1096+
candidates: snapshot.log_data().iter().map(|f| f.add_action()).collect(),
10921097
partition_scan: true,
10931098
}),
10941099
}
@@ -1192,7 +1197,7 @@ impl From<DeltaColumn> for Column {
11921197
}
11931198
}
11941199

1195-
/// Create a column, resuing the existing datafusion column
1200+
/// Create a column, reusing the existing datafusion column
11961201
impl From<Column> for DeltaColumn {
11971202
fn from(c: Column) -> Self {
11981203
DeltaColumn { inner: c }
@@ -1517,13 +1522,16 @@ mod tests {
15171522
let table = crate::open_table(table_url).await.unwrap();
15181523
let config = DeltaScanConfigBuilder::new()
15191524
.with_file_column_name(&"file_source")
1520-
.build(table.snapshot().unwrap())
1525+
.build(table.snapshot().unwrap().snapshot())
15211526
.unwrap();
15221527

15231528
let log_store = table.log_store();
1524-
let provider =
1525-
DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log_store, config)
1526-
.unwrap();
1529+
let provider = DeltaTableProvider::try_new(
1530+
table.snapshot().unwrap().snapshot().clone(),
1531+
log_store,
1532+
config,
1533+
)
1534+
.unwrap();
15271535
let ctx = SessionContext::new();
15281536
ctx.register_table("test", Arc::new(provider)).unwrap();
15291537

@@ -1581,13 +1589,16 @@ mod tests {
15811589
.unwrap();
15821590

15831591
let config = DeltaScanConfigBuilder::new()
1584-
.build(table.snapshot().unwrap())
1592+
.build(table.snapshot().unwrap().snapshot())
15851593
.unwrap();
15861594

15871595
let log_store = table.log_store();
1588-
let provider =
1589-
DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log_store, config)
1590-
.unwrap();
1596+
let provider = DeltaTableProvider::try_new(
1597+
table.snapshot().unwrap().snapshot().clone(),
1598+
log_store,
1599+
config,
1600+
)
1601+
.unwrap();
15911602
let logical_schema = provider.schema();
15921603
let ctx = SessionContext::new();
15931604
ctx.register_table("test", Arc::new(provider)).unwrap();
@@ -1645,12 +1656,13 @@ mod tests {
16451656
.unwrap();
16461657

16471658
let config = DeltaScanConfigBuilder::new()
1648-
.build(table.snapshot().unwrap())
1659+
.build(table.snapshot().unwrap().snapshot())
16491660
.unwrap();
16501661
let log = table.log_store();
16511662

16521663
let provider =
1653-
DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap();
1664+
DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1665+
.unwrap();
16541666
let ctx: SessionContext = DeltaSessionContext::default().into();
16551667
ctx.register_table("test", Arc::new(provider)).unwrap();
16561668

@@ -1737,12 +1749,13 @@ mod tests {
17371749
.unwrap();
17381750

17391751
let config = DeltaScanConfigBuilder::new()
1740-
.build(table.snapshot().unwrap())
1752+
.build(table.snapshot().unwrap().snapshot())
17411753
.unwrap();
17421754
let log = table.log_store();
17431755

17441756
let provider =
1745-
DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap();
1757+
DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1758+
.unwrap();
17461759
let ctx: SessionContext = DeltaSessionContext::default().into();
17471760
ctx.register_table("test", Arc::new(provider)).unwrap();
17481761

@@ -1793,12 +1806,13 @@ mod tests {
17931806
.unwrap();
17941807

17951808
let config = DeltaScanConfigBuilder::new()
1796-
.build(table.snapshot().unwrap())
1809+
.build(table.snapshot().unwrap().snapshot())
17971810
.unwrap();
17981811
let log = table.log_store();
17991812

18001813
let provider =
1801-
DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap();
1814+
DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1815+
.unwrap();
18021816

18031817
let mut cfg = SessionConfig::default();
18041818
cfg.options_mut().execution.parquet.pushdown_filters = true;
@@ -1889,12 +1903,13 @@ mod tests {
18891903
.unwrap();
18901904

18911905
let config = DeltaScanConfigBuilder::new()
1892-
.build(table.snapshot().unwrap())
1906+
.build(table.snapshot().unwrap().snapshot())
18931907
.unwrap();
18941908
let log = table.log_store();
18951909

18961910
let provider =
1897-
DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap();
1911+
DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1912+
.unwrap();
18981913
let ctx: SessionContext = DeltaSessionContext::default().into();
18991914
ctx.register_table("test", Arc::new(provider)).unwrap();
19001915

@@ -1972,11 +1987,15 @@ mod tests {
19721987

19731988
let ctx = SessionContext::new();
19741989
let state = ctx.state();
1975-
let scan = DeltaScanBuilder::new(table.snapshot().unwrap(), table.log_store(), &state)
1976-
.with_filter(Some(col("a").eq(lit("s"))))
1977-
.build()
1978-
.await
1979-
.unwrap();
1990+
let scan = DeltaScanBuilder::new(
1991+
table.snapshot().unwrap().snapshot(),
1992+
table.log_store(),
1993+
&state,
1994+
)
1995+
.with_filter(Some(col("a").eq(lit("s"))))
1996+
.build()
1997+
.await
1998+
.unwrap();
19801999

19812000
let mut visitor = ParquetVisitor::default();
19822001
visit_execution_plan(&scan, &mut visitor).unwrap();
@@ -1997,12 +2016,12 @@ mod tests {
19972016
let snapshot = table.snapshot().unwrap();
19982017
let ctx = SessionContext::new();
19992018
let state = ctx.state();
2000-
let scan = DeltaScanBuilder::new(snapshot, table.log_store(), &state)
2019+
let scan = DeltaScanBuilder::new(snapshot.snapshot(), table.log_store(), &state)
20012020
.with_filter(Some(col("a").eq(lit("s"))))
20022021
.with_scan_config(
20032022
DeltaScanConfigBuilder::new()
20042023
.with_parquet_pushdown(false)
2005-
.build(snapshot)
2024+
.build(snapshot.snapshot())
20062025
.unwrap(),
20072026
)
20082027
.build()
@@ -2032,7 +2051,7 @@ mod tests {
20322051
let ctx = SessionContext::new_with_config(config);
20332052
let state = ctx.state();
20342053

2035-
let scan = DeltaScanBuilder::new(snapshot, table.log_store(), &state)
2054+
let scan = DeltaScanBuilder::new(snapshot.snapshot(), table.log_store(), &state)
20362055
.build()
20372056
.await
20382057
.unwrap();
@@ -2151,7 +2170,7 @@ mod tests {
21512170
.unwrap();
21522171

21532172
let config = DeltaScanConfigBuilder::new()
2154-
.build(table.snapshot().unwrap())
2173+
.build(table.snapshot().unwrap().snapshot())
21552174
.unwrap();
21562175

21572176
let (object_store, mut operations) =
@@ -2164,7 +2183,7 @@ mod tests {
21642183
table.log_store().config().clone(),
21652184
);
21662185
let provider = DeltaTableProvider::try_new(
2167-
table.snapshot().unwrap().clone(),
2186+
table.snapshot().unwrap().snapshot().clone(),
21682187
Arc::new(log_store),
21692188
config,
21702189
)

crates/core/src/delta_datafusion/schema_adapter.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::sync::Arc;
44
use crate::kernel::schema::cast_record_batch;
55
use arrow_array::RecordBatch;
66
use arrow_schema::{Schema, SchemaRef};
7-
use datafusion::common::{not_impl_err, ColumnStatistics};
7+
use datafusion::common::{not_impl_err, ColumnStatistics, Result};
88
use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};
99

1010
/// A Schema Adapter Factory which provides casting record batches from parquet to meet
@@ -39,10 +39,7 @@ impl SchemaAdapter for DeltaSchemaAdapter {
3939
Some(file_schema.fields.find(field.name())?.0)
4040
}
4141

42-
fn map_schema(
43-
&self,
44-
file_schema: &Schema,
45-
) -> datafusion::common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
42+
fn map_schema(&self, file_schema: &Schema) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
4643
let mut projection = Vec::with_capacity(file_schema.fields().len());
4744

4845
for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
@@ -71,15 +68,15 @@ pub(crate) struct SchemaMapping {
7168
}
7269

7370
impl SchemaMapper for SchemaMapping {
74-
fn map_batch(&self, batch: RecordBatch) -> datafusion::common::Result<RecordBatch> {
71+
fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
7572
let record_batch = cast_record_batch(&batch, self.projected_schema.clone(), false, true)?;
7673
Ok(record_batch)
7774
}
7875

7976
fn map_column_statistics(
8077
&self,
8178
_file_col_statistics: &[ColumnStatistics],
82-
) -> datafusion::common::Result<Vec<ColumnStatistics>> {
79+
) -> Result<Vec<ColumnStatistics>> {
8380
not_impl_err!("Mapping column statistics is not implemented for DeltaSchemaAdapter")
8481
}
8582
}

0 commit comments

Comments
 (0)