Skip to content
Closed
8 changes: 8 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,14 @@ config_namespace! {
/// aggregation ratio check and trying to switch to skipping aggregation mode
pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000

/// Maximum number of groups to accumulate in partial aggregation
/// before emitting intermediate state and resetting the hash table.
/// This keeps the hash table small enough to fit in CPU cache,
/// improving performance for high-cardinality GROUP BY queries.
/// A value of 0 disables early emission. Only applies to Partial
/// aggregation mode with unordered input.
pub partial_aggregation_group_count_emit_threshold: usize, default = 100000

/// Should DataFusion use row number estimates at the input to decide
/// whether increasing parallelism is beneficial or not. By default,
/// only exact row numbers (not estimates) are used for this decision.
Expand Down
326 changes: 326 additions & 0 deletions datafusion/datasource-parquet/src/access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
// under the License.

use crate::sort::reverse_row_selection;
use arrow::datatypes::Schema;
use datafusion_common::{Result, assert_eq_or_internal_err};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use log::debug;
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};

Expand Down Expand Up @@ -377,6 +382,106 @@ impl PreparedAccessPlan {
})
}

/// Reorder row groups by their min statistics for the given sort order.
///
/// This helps TopK queries find optimal values first. For ASC sort,
/// row groups with the smallest min values come first. For DESC sort,
/// row groups with the largest min values come first.
///
/// Gracefully skips reordering when:
/// - There is a row_selection (too complex to remap)
/// - 0 or 1 row groups (nothing to reorder)
/// - Sort expression is not a simple column reference
/// - Statistics are unavailable
pub(crate) fn reorder_by_statistics(
mut self,
sort_order: &LexOrdering,
file_metadata: &ParquetMetaData,
arrow_schema: &Schema,
) -> Result<Self> {
// Skip if row_selection present (too complex to remap)
if self.row_selection.is_some() {
debug!("Skipping RG reorder: row_selection present");
return Ok(self);
}

// Nothing to reorder
if self.row_group_indexes.len() <= 1 {
return Ok(self);
}

// Get the first sort expression
// LexOrdering is guaranteed non-empty, so first() returns &PhysicalSortExpr
let first_sort_expr = sort_order.first();

// Extract column name from sort expression
let column: &Column = match first_sort_expr.expr.as_any().downcast_ref::<Column>()
{
Some(col) => col,
None => {
debug!("Skipping RG reorder: sort expr is not a simple column");
return Ok(self);
}
};

let descending = first_sort_expr.options.descending;

// Build statistics converter for this column
let converter = match StatisticsConverter::try_new(
column.name(),
arrow_schema,
file_metadata.file_metadata().schema_descr(),
) {
Ok(c) => c,
Err(e) => {
debug!("Skipping RG reorder: cannot create stats converter: {e}");
return Ok(self);
}
};

// Get min values for the selected row groups
let rg_metadata: Vec<&RowGroupMetaData> = self
.row_group_indexes
.iter()
.map(|&idx| file_metadata.row_group(idx))
.collect();

let min_values = match converter.row_group_mins(rg_metadata.iter().copied()) {
Ok(vals) => vals,
Err(e) => {
debug!("Skipping RG reorder: cannot get min values: {e}");
return Ok(self);
}
};

// Sort indices by min values
let sort_options = arrow::compute::SortOptions {
descending,
nulls_first: first_sort_expr.options.nulls_first,
};
let sorted_indices = match arrow::compute::sort_to_indices(
&min_values,
Some(sort_options),
None,
) {
Ok(indices) => indices,
Err(e) => {
debug!("Skipping RG reorder: sort failed: {e}");
return Ok(self);
}
};

// Apply the reordering
let original_indexes = self.row_group_indexes.clone();
self.row_group_indexes = sorted_indices
.values()
.iter()
.map(|&i| original_indexes[i as usize])
.collect();

Ok(self)
}

/// Reverse the access plan for reverse scanning
pub(crate) fn reverse(mut self, file_metadata: &ParquetMetaData) -> Result<Self> {
// Get the row group indexes before reversing
Expand Down Expand Up @@ -614,4 +719,225 @@ mod test {
.unwrap();
Arc::new(SchemaDescriptor::new(Arc::new(schema)))
}

// ---- reorder_by_statistics tests ----

use arrow::datatypes::{DataType, Field, Schema};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use parquet::basic::Type as PhysicalType;
use parquet::file::metadata::FileMetaData;
use parquet::file::statistics::Statistics;
use parquet::schema::types::Type as SchemaType;

/// Create ParquetMetaData with row groups that have Int32 min/max stats
fn make_metadata_with_stats(min_max_pairs: &[(i32, i32)]) -> ParquetMetaData {
let field = SchemaType::primitive_type_builder("id", PhysicalType::INT32)
.build()
.unwrap();
let schema = SchemaType::group_type_builder("schema")
.with_fields(vec![Arc::new(field)])
.build()
.unwrap();
let schema_descr = Arc::new(SchemaDescriptor::new(Arc::new(schema)));

let row_groups: Vec<RowGroupMetaData> = min_max_pairs
.iter()
.map(|(min, max)| {
let stats =
Statistics::int32(Some(*min), Some(*max), None, Some(100), false);
let column = ColumnChunkMetaData::builder(schema_descr.column(0))
.set_num_values(100)
.set_statistics(stats)
.build()
.unwrap();
RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(100)
.set_column_metadata(vec![column])
.build()
.unwrap()
})
.collect();

let file_meta = FileMetaData::new(
1,
min_max_pairs.len() as i64 * 100,
None,
None,
schema_descr,
None,
);
ParquetMetaData::new(file_meta, row_groups)
}

fn make_sort_order_asc() -> LexOrdering {
LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new(Column::new(
"id", 0,
)))])
.unwrap()
}

fn make_sort_order_desc() -> LexOrdering {
use arrow::compute::SortOptions;
LexOrdering::new(vec![PhysicalSortExpr::new(
Arc::new(Column::new("id", 0)),
SortOptions {
descending: true,
nulls_first: false,
},
)])
.unwrap()
}

fn make_arrow_schema() -> Schema {
Schema::new(vec![Field::new("id", DataType::Int32, false)])
}

#[test]
fn test_reorder_by_statistics_asc() {
// RGs in wrong order: [50-99, 200-299, 1-30]
let metadata = make_metadata_with_stats(&[(50, 99), (200, 299), (1, 30)]);
let schema = make_arrow_schema();
let sort_order = make_sort_order_asc();

let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap();
let plan = plan
.reorder_by_statistics(&sort_order, &metadata, &schema)
.unwrap();

// Should be reordered: RG2(1-30), RG0(50-99), RG1(200-299)
assert_eq!(plan.row_group_indexes, vec![2, 0, 1]);
}

#[test]
fn test_reorder_by_statistics_desc() {
// RGs: [50-99, 200-299, 1-30]
let metadata = make_metadata_with_stats(&[(50, 99), (200, 299), (1, 30)]);
let schema = make_arrow_schema();
let sort_order = make_sort_order_desc();

let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap();
let plan = plan
.reorder_by_statistics(&sort_order, &metadata, &schema)
.unwrap();

// DESC: largest min first: RG1(200-299), RG0(50-99), RG2(1-30)
assert_eq!(plan.row_group_indexes, vec![1, 0, 2]);
}

#[test]
fn test_reorder_by_statistics_single_rg() {
let metadata = make_metadata_with_stats(&[(1, 100)]);
let schema = make_arrow_schema();
let sort_order = make_sort_order_asc();

let plan = PreparedAccessPlan::new(vec![0], None).unwrap();
let plan = plan
.reorder_by_statistics(&sort_order, &metadata, &schema)
.unwrap();

// Single RG, no reorder
assert_eq!(plan.row_group_indexes, vec![0]);
}

#[test]
fn test_reorder_by_statistics_with_skipped_rgs() {
// 4 RGs but only 0, 2, 3 are selected (RG1 was pruned)
let metadata =
make_metadata_with_stats(&[(300, 400), (100, 200), (1, 50), (50, 99)]);
let schema = make_arrow_schema();
let sort_order = make_sort_order_asc();

let plan = PreparedAccessPlan::new(vec![0, 2, 3], None).unwrap();
let plan = plan
.reorder_by_statistics(&sort_order, &metadata, &schema)
.unwrap();

// Reorder selected RGs by min: RG2(1-50), RG3(50-99), RG0(300-400)
assert_eq!(plan.row_group_indexes, vec![2, 3, 0]);
}

#[test]
fn test_reorder_by_statistics_skips_with_row_selection() {
let metadata = make_metadata_with_stats(&[(50, 99), (1, 30)]);
let schema = make_arrow_schema();
let sort_order = make_sort_order_asc();

let selection = RowSelection::from(vec![
RowSelector::select(50),
RowSelector::skip(50),
RowSelector::select(100),
]);

let plan = PreparedAccessPlan::new(vec![0, 1], Some(selection)).unwrap();
let plan = plan
.reorder_by_statistics(&sort_order, &metadata, &schema)
.unwrap();

// Should NOT reorder because row_selection is present
assert_eq!(plan.row_group_indexes, vec![0, 1]);
}

#[test]
fn test_reorder_by_statistics_already_sorted() {
// Already in correct ASC order
let metadata = make_metadata_with_stats(&[(1, 30), (50, 99), (200, 299)]);
let schema = make_arrow_schema();
let sort_order = make_sort_order_asc();

let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap();
let plan = plan
.reorder_by_statistics(&sort_order, &metadata, &schema)
.unwrap();

// Already sorted, order preserved
assert_eq!(plan.row_group_indexes, vec![0, 1, 2]);
}

#[test]
fn test_reorder_by_statistics_skips_non_column_expr() {
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::BinaryExpr;

let metadata = make_metadata_with_stats(&[(50, 99), (1, 30)]);
let schema = make_arrow_schema();

// Sort expression is a binary expression (id + 1), not a simple column
let expr = Arc::new(BinaryExpr::new(
Arc::new(Column::new("id", 0)),
Operator::Plus,
Arc::new(datafusion_physical_expr::expressions::Literal::new(
datafusion_common::ScalarValue::Int32(Some(1)),
)),
));
let sort_order =
LexOrdering::new(vec![PhysicalSortExpr::new_default(expr)]).unwrap();

let plan = PreparedAccessPlan::new(vec![0, 1], None).unwrap();
let plan = plan
.reorder_by_statistics(&sort_order, &metadata, &schema)
.unwrap();

// Should NOT reorder because sort expr is not a simple column
assert_eq!(plan.row_group_indexes, vec![0, 1]);
}

#[test]
fn test_reorder_by_statistics_skips_missing_column() {
let metadata = make_metadata_with_stats(&[(50, 99), (1, 30)]);
// Schema has "id" but sort order references "nonexistent"
let schema = make_arrow_schema();
let sort_order = LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new(
Column::new("nonexistent", 99),
))])
.unwrap();

let plan = PreparedAccessPlan::new(vec![0, 1], None).unwrap();
let plan = plan
.reorder_by_statistics(&sort_order, &metadata, &schema)
.unwrap();

// Should NOT reorder because column not found in schema
assert_eq!(plan.row_group_indexes, vec![0, 1]);
}
}
Loading
Loading