Skip to content

Commit 1b629c6

Browse files
committed
feat: predicate pushdown in parquet scan
Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com>
1 parent 283ef2b commit 1b629c6

File tree

7 files changed

+36
-28
lines changed

7 files changed

+36
-28
lines changed

crates/core/src/delta_datafusion/cdf/scan.rs

+12-6
Original file line numberDiff line numberDiff line change
@@ -71,14 +71,20 @@ impl TableProvider for DeltaCdfTableProvider {
7171
limit: Option<usize>,
7272
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
7373
let session_state = session_state_from_session(session)?;
74-
let mut plan = self.cdf_builder.build(session_state).await?;
74+
let schema: DFSchema = self.schema().try_into()?;
7575

76-
let df_schema: DFSchema = plan.schema().try_into()?;
76+
let mut plan = if let Some(filter_expr) = conjunction(filters.iter().cloned()) {
77+
let physical_expr = session.create_physical_expr(filter_expr, &schema)?;
78+
let plan = self
79+
.cdf_builder
80+
.build(session_state, Some(&physical_expr))
81+
.await?;
82+
Arc::new(FilterExec::try_new(physical_expr, plan)?)
83+
} else {
84+
self.cdf_builder.build(session_state, None).await?
85+
};
7786

78-
if let Some(filter_expr) = conjunction(filters.iter().cloned()) {
79-
let physical_expr = session.create_physical_expr(filter_expr, &df_schema)?;
80-
plan = Arc::new(FilterExec::try_new(physical_expr, plan)?);
81-
}
87+
let df_schema: DFSchema = plan.schema().try_into()?;
8288

8389
if let Some(projection) = projection {
8490
let current_projection = (0..plan.schema().fields().len()).collect::<Vec<usize>>();

crates/core/src/operations/delete.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -975,7 +975,7 @@ mod tests {
975975
let table = DeltaOps(table)
976976
.load_cdf()
977977
.with_starting_version(0)
978-
.build(&ctx.state())
978+
.build(&ctx.state(), None)
979979
.await
980980
.expect("Failed to load CDF");
981981

@@ -1059,7 +1059,7 @@ mod tests {
10591059
let table = DeltaOps(table)
10601060
.load_cdf()
10611061
.with_starting_version(0)
1062-
.build(&ctx.state())
1062+
.build(&ctx.state(), None)
10631063
.await
10641064
.expect("Failed to load CDF");
10651065

crates/core/src/operations/load_cdf.rs

+15-14
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ impl CdfLoadBuilder {
323323
pub(crate) async fn build(
324324
&self,
325325
session_sate: &SessionState,
326+
filters: Option<&Arc<dyn PhysicalExpr>>,
326327
) -> DeltaResult<Arc<dyn ExecutionPlan>> {
327328
let (cdc, add, remove) = self.determine_files_to_read().await?;
328329
register_store(self.log_store.clone(), session_sate.runtime_env().clone());
@@ -388,7 +389,7 @@ impl CdfLoadBuilder {
388389
table_partition_cols: cdc_partition_cols,
389390
output_ordering: vec![],
390391
},
391-
None,
392+
filters,
392393
)
393394
.await?;
394395

@@ -406,7 +407,7 @@ impl CdfLoadBuilder {
406407
table_partition_cols: add_remove_partition_cols.clone(),
407408
output_ordering: vec![],
408409
},
409-
None,
410+
filters,
410411
)
411412
.await?;
412413

@@ -424,7 +425,7 @@ impl CdfLoadBuilder {
424425
table_partition_cols: add_remove_partition_cols,
425426
output_ordering: vec![],
426427
},
427-
None,
428+
filters,
428429
)
429430
.await?;
430431

@@ -502,7 +503,7 @@ pub(crate) mod tests {
502503
.await?
503504
.load_cdf()
504505
.with_starting_version(0)
505-
.build(&ctx.state())
506+
.build(&ctx.state(), None)
506507
.await?;
507508

508509
let batches = collect_batches(
@@ -553,7 +554,7 @@ pub(crate) mod tests {
553554
.load_cdf()
554555
.with_starting_version(0)
555556
.with_ending_timestamp(starting_timestamp.and_utc())
556-
.build(&ctx.state())
557+
.build(&ctx.state(), None)
557558
.await
558559
.unwrap();
559560

@@ -599,7 +600,7 @@ pub(crate) mod tests {
599600
.await?
600601
.load_cdf()
601602
.with_starting_version(0)
602-
.build(&ctx.state())
603+
.build(&ctx.state(), None)
603604
.await?;
604605

605606
let batches = collect_batches(
@@ -652,7 +653,7 @@ pub(crate) mod tests {
652653
.load_cdf()
653654
.with_starting_version(4)
654655
.with_ending_version(1)
655-
.build(&ctx.state())
656+
.build(&ctx.state(), None)
656657
.await;
657658

658659
assert!(table.is_err());
@@ -671,7 +672,7 @@ pub(crate) mod tests {
671672
.await?
672673
.load_cdf()
673674
.with_starting_version(5)
674-
.build(&ctx.state())
675+
.build(&ctx.state(), None)
675676
.await;
676677

677678
assert!(table.is_err());
@@ -691,7 +692,7 @@ pub(crate) mod tests {
691692
.load_cdf()
692693
.with_starting_version(5)
693694
.with_allow_out_of_range()
694-
.build(&ctx.state())
695+
.build(&ctx.state(), None)
695696
.await?;
696697

697698
let batches = collect_batches(
@@ -714,7 +715,7 @@ pub(crate) mod tests {
714715
.await?
715716
.load_cdf()
716717
.with_starting_timestamp(ending_timestamp.and_utc())
717-
.build(&ctx.state())
718+
.build(&ctx.state(), None)
718719
.await;
719720

720721
assert!(table.is_err());
@@ -735,7 +736,7 @@ pub(crate) mod tests {
735736
.load_cdf()
736737
.with_starting_timestamp(ending_timestamp.and_utc())
737738
.with_allow_out_of_range()
738-
.build(&ctx.state())
739+
.build(&ctx.state(), None)
739740
.await?;
740741

741742
let batches = collect_batches(
@@ -757,7 +758,7 @@ pub(crate) mod tests {
757758
.await?
758759
.load_cdf()
759760
.with_starting_version(0)
760-
.build(&ctx.state())
761+
.build(&ctx.state(), None)
761762
.await;
762763

763764
assert!(table.is_err());
@@ -777,7 +778,7 @@ pub(crate) mod tests {
777778
.await?
778779
.load_cdf()
779780
.with_starting_timestamp(ending_timestamp.and_utc())
780-
.build(&ctx.state())
781+
.build(&ctx.state(), None)
781782
.await?;
782783

783784
let batches = collect_batches(
@@ -868,7 +869,7 @@ pub(crate) mod tests {
868869
let cdf_scan = DeltaOps(table.clone())
869870
.load_cdf()
870871
.with_starting_version(0)
871-
.build(&ctx.state())
872+
.build(&ctx.state(), None)
872873
.await
873874
.expect("Failed to load CDF");
874875

crates/core/src/operations/merge/mod.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -4081,7 +4081,7 @@ mod tests {
40814081
let table = DeltaOps(table)
40824082
.load_cdf()
40834083
.with_starting_version(0)
4084-
.build(&ctx.state())
4084+
.build(&ctx.state(), None)
40854085
.await
40864086
.expect("Failed to load CDF");
40874087

@@ -4198,7 +4198,7 @@ mod tests {
41984198
let table = DeltaOps(table)
41994199
.load_cdf()
42004200
.with_starting_version(0)
4201-
.build(&ctx.state())
4201+
.build(&ctx.state(), None)
42024202
.await
42034203
.expect("Failed to load CDF");
42044204

@@ -4286,7 +4286,7 @@ mod tests {
42864286
let table = DeltaOps(table)
42874287
.load_cdf()
42884288
.with_starting_version(0)
4289-
.build(&ctx.state())
4289+
.build(&ctx.state(), None)
42904290
.await
42914291
.expect("Failed to load CDF");
42924292

crates/core/src/operations/update.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1275,7 +1275,7 @@ mod tests {
12751275
let table = DeltaOps(table)
12761276
.load_cdf()
12771277
.with_starting_version(0)
1278-
.build(&ctx.state())
1278+
.build(&ctx.state(), None)
12791279
.await
12801280
.expect("Failed to load CDF");
12811281

@@ -1365,7 +1365,7 @@ mod tests {
13651365
let table = DeltaOps(table)
13661366
.load_cdf()
13671367
.with_starting_version(0)
1368-
.build(&ctx.state())
1368+
.build(&ctx.state(), None)
13691369
.await
13701370
.expect("Failed to load CDF");
13711371

crates/core/src/operations/write.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -2314,7 +2314,7 @@ mod tests {
23142314
let cdf_scan = DeltaOps(table.clone())
23152315
.load_cdf()
23162316
.with_starting_version(0)
2317-
.build(&ctx.state())
2317+
.build(&ctx.state(), None)
23182318
.await
23192319
.expect("Failed to load CDF");
23202320

python/tests/test_cdf.py

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ def test_read_cdf_partitioned_with_predicate():
1818
assert len(values) == 1
1919
assert values[0] == date(2023, 12, 25)
2020

21+
2122
def test_read_cdf_partitioned():
2223
dt = DeltaTable("../crates/test/tests/data/cdf-table/")
2324
b = dt.load_cdf(0, 3).read_all().to_pydict()

0 commit comments

Comments
 (0)