Closed
Description
Environment
Delta-rs version: 0.6
Binding: Rust
Environment:
- Cloud provider: Azure
- OS: macOS
- Other: M1
Bug
What happened:
When providing filters directly to DeltaTable scan implementation for DataFusion TableProvider, files are not pruned properly based on the statistics. The files that do NOT match the predicate are passed to the parquet plan.
What you expected to happen:
Files that do match the predicate are passed to the parquet plan.
How to reproduce it:
Run the following snippet with attached table data.zip
let table = Box::new(open_table("./data/table").await?);
let ctx = SessionContext::new();
let filters = [
col("n").eq(lit(1 as i64))
];
let plan = table.scan(&ctx.state(), None, &filters, None).await?;
let children = plan.children();
let parquet_plan = children.first().unwrap().as_any().downcast_ref::<ParquetExec>().unwrap();
let file_scan = parquet_plan.base_config();
for f in file_scan.file_groups.iter().flatten() {
println!("{:?}", f.object_meta.location);
}
Record 1
is in partition 0 which is not present in the file scan although all the other files that do not match predicate are present.
More details:
The pruning logic https://github.com/delta-io/delta-rs/blob/main/rust/src/delta_datafusion.rs#L354 should be updated to the following
let pruning_predicate = PruningPredicate::try_new(predicate, schema.clone())?;
let files_to_prune = pruning_predicate.prune(self)?;
self.get_state()
.files()
.iter()
.zip(files_to_prune.into_iter())
.for_each(|(action, keep_file)| {
if keep_file {
let part = partitioned_file_from_action(action, &schema);
file_groups
.entry(part.partition_values.clone())
.or_default()
.push(part);
};
});
Activity