Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 143 additions & 2 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ pub(super) struct ParquetOpener {
pub max_predicate_cache_size: Option<usize>,
/// Whether to read row groups in reverse order
pub reverse_row_groups: bool,
/// When `true`, row ordering must be preserved — `prune_by_limit` must not
/// discard partially-matched row groups because they may contain rows that
/// sort before fully-matched groups.
pub preserve_order: bool,
}

/// Represents a prepared access plan with optional row selection
Expand Down Expand Up @@ -262,6 +266,7 @@ impl FileOpener for ParquetOpener {
let enable_bloom_filter = self.enable_bloom_filter;
let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning;
let limit = self.limit;
let preserve_order = self.preserve_order;

let predicate_creation_errors = MetricBuilder::new(&self.metrics)
.global_counter("num_predicate_creation_errors");
Expand Down Expand Up @@ -523,8 +528,11 @@ impl FileOpener for ParquetOpener {
.add_matched(n_remaining_row_groups);
}

// Prune by limit
if let Some(limit) = limit {
// Prune by limit: only safe when order does not matter.
// With preserve_order=true, partially-matched row groups may
// contain rows that sort before fully-matched groups, so
// discarding them would return incorrect results.
if let (Some(limit), false) = (limit, preserve_order) {
row_groups.prune_by_limit(limit, rg_metadata, &file_metrics);
}

Expand Down Expand Up @@ -1076,6 +1084,7 @@ mod test {
coerce_int96: Option<arrow::datatypes::TimeUnit>,
max_predicate_cache_size: Option<usize>,
reverse_row_groups: bool,
preserve_order: bool,
}

impl ParquetOpenerBuilder {
Expand All @@ -1101,6 +1110,7 @@ mod test {
coerce_int96: None,
max_predicate_cache_size: None,
reverse_row_groups: false,
preserve_order: false,
}
}

Expand Down Expand Up @@ -1158,6 +1168,18 @@ mod test {
self
}

/// Set preserve_order flag. When true, prune_by_limit is disabled.
fn with_preserve_order(mut self, enable: bool) -> Self {
self.preserve_order = enable;
self
}

/// Set the limit.
fn with_limit(mut self, limit: Option<usize>) -> Self {
self.limit = limit;
self
}

/// Build the ParquetOpener instance.
///
/// # Panics
Expand Down Expand Up @@ -1208,6 +1230,7 @@ mod test {
encryption_factory: None,
max_predicate_cache_size: self.max_predicate_cache_size,
reverse_row_groups: self.reverse_row_groups,
preserve_order: self.preserve_order,
}
}
}
Expand Down Expand Up @@ -2196,5 +2219,123 @@ mod test {
"Output field should be nullable"
);
}

/// Regression test: `prune_by_limit` must be disabled when
/// `preserve_order = true` to avoid skipping partially-matched
/// row groups that contain rows sorting before fully-matched groups.
///
/// Layout (3 rows/RG, sorted `[a ASC]`):
/// RG0: a=[1,2,3] — partially matched for `a > 2`
/// RG1: a=[4,5,6] — fully matched for `a > 2`
///
/// `WHERE a > 2 LIMIT 1` should return `a=3` from RG0.
/// Without preserve_order, prune_by_limit discards RG0 and
/// returns `a=4` from RG1.
#[tokio::test]
async fn test_preserve_order_prevents_limit_pruning() {
use arrow::array::Int32Array;
use datafusion_common::ScalarValue;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
use object_store::memory::InMemory;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;

let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));

// Write parquet with 2 RGs of 3 rows each into memory buffer
let mut buf = Vec::new();
let props = WriterProperties::builder()
.set_max_row_group_size(3)
.build();
{
let mut writer =
ArrowWriter::try_new(&mut buf, schema.clone(), Some(props)).unwrap();
// RG0: partially matched for a > 2
writer
.write(
&RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap(),
)
.unwrap();
// RG1: fully matched for a > 2
writer
.write(
&RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from(vec![4, 5, 6]))],
)
.unwrap(),
)
.unwrap();
writer.close().unwrap();
}

let file_size = buf.len() as u64;
let store = Arc::new(InMemory::new());
let path = object_store::path::Path::from("test.parquet");
store.put(&path, buf.into()).await.unwrap();

// Predicate: a > 2
let predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(2)))),
));

// Run query with preserve_order=false: prune_by_limit may skip RG0
let opener_unordered = ParquetOpenerBuilder::new()
.with_store(store.clone())
.with_schema(schema.clone())
.with_limit(Some(1))
.with_pushdown_filters(true)
.with_predicate(predicate.clone())
.with_row_group_stats_pruning(true)
.with_preserve_order(false)
.build();

let file = PartitionedFile::new(path.clone(), file_size);
let mut stream = opener_unordered.open(file).unwrap().await.unwrap();
let batch = stream.next().await.unwrap().unwrap();
let col = batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(
col.value(0),
4,
"Without preserve_order, prune_by_limit skips partially-matched RG0"
);

// Run query with preserve_order=true: RG0 must be kept
let opener_ordered = ParquetOpenerBuilder::new()
.with_store(store)
.with_schema(schema)
.with_limit(Some(1))
.with_pushdown_filters(true)
.with_predicate(predicate)
.with_row_group_stats_pruning(true)
.with_preserve_order(true)
.build();

let file = PartitionedFile::new(path, file_size);
let mut stream = opener_ordered.open(file).unwrap().await.unwrap();
let batch = stream.next().await.unwrap().unwrap();
let col = batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(
col.value(0),
3,
"With preserve_order, partially-matched RG0 is scanned first"
);
}
}
}
1 change: 1 addition & 0 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,7 @@ impl FileSource for ParquetSource {
encryption_factory: self.get_encryption_factory_with_config(),
max_predicate_cache_size: self.max_predicate_cache_size(),
reverse_row_groups: self.reverse_row_groups,
preserve_order: !base_config.output_ordering.is_empty(),
});
Ok(opener)
}
Expand Down
Loading