Skip to content

Optimize TopK with threshold filter ~1.4x speedup #15697

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
7b3a0a2
Optimize topk with filter
Dandandan Apr 13, 2025
75acd07
add sort_tpch_limit bench
Dandandan Apr 13, 2025
32f11dc
early return
Dandandan Apr 13, 2025
52583a1
Merge
Dandandan Apr 13, 2025
351663b
Clippy
Dandandan Apr 13, 2025
eeb8ce4
Respect lexicographical ordering, only apply first filter
Dandandan Apr 13, 2025
f0290c4
Respect lexicographical ordering
Dandandan Apr 13, 2025
67aa03a
Respect lexicographical ordering, only apply first filter
Dandandan Apr 13, 2025
559b789
Respect lexicographical ordering, only apply first filter
Dandandan Apr 13, 2025
4a24e75
Simplify and add link
Dandandan Apr 13, 2025
5d42ee7
Still run early completion
Dandandan Apr 13, 2025
7003aed
Keep null values
Dandandan Apr 14, 2025
1610f78
Keep null values
Dandandan Apr 14, 2025
b046a73
Update datafusion/physical-plan/src/topk/mod.rs
Dandandan Apr 14, 2025
fe6fc48
Clippy
Dandandan Apr 14, 2025
f457ce8
Refactor
Dandandan Apr 14, 2025
40dc1d9
Ignore null threshold
Dandandan Apr 14, 2025
8d1bfe3
Update datafusion/physical-plan/src/topk/mod.rs
Dandandan Apr 15, 2025
f735f64
Fix
Dandandan Apr 15, 2025
923089f
Minor improvements
Dandandan Apr 15, 2025
6fb5b59
Merge remote-tracking branch 'upstream/main' into improve_topk
Dandandan Apr 15, 2025
9a99cce
Fix
Dandandan Apr 18, 2025
ea480b7
Fix
Dandandan Apr 18, 2025
3865960
Fix
Dandandan Apr 18, 2025
375679e
Merge remote-tracking branch 'upstream/main' into improve_topk
Dandandan Apr 18, 2025
2425b93
Add scalarvalue api
Dandandan Apr 18, 2025
63eca0b
Only update if heap updated
Dandandan Apr 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ main() {
# same data as for tpch
data_tpch "1"
;;
sort_tpch_limit)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what the best name would be, but I feel it would be useful for discoverability to have topk in it. tpch_topk? sort_tpch_topk?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please also add a description of this benchmark in https://github.com/apache/datafusion/tree/main/benchmarks#benchmarks ?

# same data as for tpch
data_tpch "1"
;;
*)
echo "Error: unknown benchmark '$BENCHMARK' for data generation"
usage
Expand Down Expand Up @@ -251,6 +255,7 @@ main() {
run_cancellation
run_parquet
run_sort
run_sort_tpch_limit
run_clickbench_1
run_clickbench_partitioned
run_clickbench_extended
Expand Down Expand Up @@ -320,6 +325,9 @@ main() {
sort_tpch)
run_sort_tpch
;;
sort_tpch_limit)
run_sort_tpch_limit
;;
*)
echo "Error: unknown benchmark '$BENCHMARK' for run"
usage
Expand Down Expand Up @@ -918,6 +926,15 @@ run_sort_tpch() {
$CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}"
}

# Runs the sort tpch integration benchmark with limit
run_sort_tpch_limit() {
TPCH_DIR="${DATA_DIR}/tpch_sf1"
RESULTS_FILE="${RESULTS_DIR}/sort_tpch_limit.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running sort tpch benchmark..."

$CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" --limit 100
}

compare_benchmarks() {
BASE_RESULTS_DIR="${SCRIPT_DIR}/results"
Expand Down
161 changes: 142 additions & 19 deletions datafusion/physical-plan/src/topk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,22 @@
//! TopK: Combination of Sort / LIMIT

use arrow::{
compute::interleave_record_batch,
array::{BooleanArray, Scalar},
compute::{interleave_record_batch, is_null, or, FilterBuilder},
row::{RowConverter, Rows, SortField},
};
use std::mem::size_of;
use arrow_ord::cmp::{gt, gt_eq, lt, lt_eq};
use datafusion_expr::ColumnarValue;
use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
use std::{mem::size_of, sync::RwLock};

use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
use crate::spill::get_record_batch_memory_size;
use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
use arrow::array::{ArrayRef, RecordBatch};
use arrow::datatypes::SchemaRef;
use datafusion_common::{internal_datafusion_err, HashMap, ScalarValue};
use datafusion_common::Result;
use datafusion_common::{internal_datafusion_err, HashMap};
use datafusion_execution::{
memory_pool::{MemoryConsumer, MemoryReservation},
runtime_env::RuntimeEnv,
Expand Down Expand Up @@ -117,6 +120,8 @@ pub struct TopK {
/// to be greater (by byte order, after row conversion) than the top K,
/// which means the top K won't change and the computation can be finished early.
pub(crate) finished: bool,

thresholds: Arc<RwLock<Vec<Option<ScalarValue>>>>,
}

// Guesstimate for memory allocation: estimated number of bytes used per row in the RowConverter
Expand Down Expand Up @@ -170,7 +175,7 @@ impl TopK {
build_sort_fields(&common_sort_prefix, &schema)?;
Some(RowConverter::new(input_sort_fields)?)
};

let num_exprs = expr.len();
Ok(Self {
schema: Arc::clone(&schema),
metrics: TopKMetrics::new(metrics, partition_id),
Expand All @@ -183,6 +188,7 @@ impl TopK {
common_sort_prefix_converter: prefix_row_converter,
common_sort_prefix: Arc::from(common_sort_prefix),
finished: false,
thresholds: Arc::new(RwLock::new(vec![None; num_exprs])),
})
}

Expand All @@ -193,7 +199,7 @@ impl TopK {
let baseline = self.metrics.baseline.clone();
let _timer = baseline.elapsed_compute().timer();

let sort_keys: Vec<ArrayRef> = self
let mut sort_keys: Vec<ArrayRef> = self
.expr
.iter()
.map(|expr| {
Expand All @@ -202,26 +208,117 @@ impl TopK {
})
.collect::<Result<Vec<_>>>()?;

// Selected indices in the input batch.
// Some indices may be pre-filtered if they exceed the heap’s current max value.

let mut selected_rows = None;

let threshold0 = self
.thresholds
.read()
.expect("Read lock should succeed")[0].clone();

// If the heap doesn't have k elements yet, we can't create thresholds
if let Some(threshold0) = threshold0 {
let threshold0 = threshold0.clone();
// skip filtering if threshold is null
if !threshold0.is_null() {
// Convert to scalar value - should be a single value since we're evaluating on a single row batch
let threshold = Scalar::new(threshold0.to_array_of_size(1)?);

// Create a filter for each sort key
let is_multi_col = self.expr.len() > 1;

let mut filter = match (is_multi_col, self.expr[0].options.descending) {
(true, true) => BooleanArray::new(
gt_eq(&sort_keys[0], &threshold)?.values().clone(),
None,
),
(true, false) => BooleanArray::new(
lt_eq(&sort_keys[0], &threshold)?.values().clone(),
None,
),
(false, true) => BooleanArray::new(
gt(&sort_keys[0], &threshold)?.values().clone(),
None,
),
(false, false) => BooleanArray::new(
lt(&sort_keys[0], &threshold)?.values().clone(),
None,
),
};
if sort_keys[0].is_nullable() {
// Keep any null values
// TODO it is possible to optimize this based on the current threshold value
// and the nulls first/last option and the number of following sort keys
filter = or(&filter, &is_null(&sort_keys[0])?)?;
}
if filter.true_count() == 0 {
// No rows are less than the max row, so we can skip this batch
// Early completion is still possible, as last row might be greater
self.attempt_early_completion(&batch)?;

return Ok(());
}

let filter_predicate = FilterBuilder::new(&filter);
let filter_predicate = if sort_keys.len() > 1 {
// Optimize filter when it has multiple sort keys
filter_predicate.optimize().build()
} else {
filter_predicate.build()
};
selected_rows = Some(filter);

sort_keys = sort_keys
.iter()
.map(|key| filter_predicate.filter(key).map_err(|x| x.into()))
.collect::<Result<Vec<_>>>()?;
}
}

// reuse existing `Rows` to avoid reallocations
let rows = &mut self.scratch_rows;
rows.clear();
self.row_converter.append(rows, &sort_keys)?;

// TODO make this algorithmically better?:
// Idea: filter out rows >= self.heap.max() early (before passing to `RowConverter`)
// this avoids some work and also might be better vectorizable.
let mut batch_entry = self.heap.register_batch(batch.clone());
for (index, row) in rows.iter().enumerate() {
match self.heap.max() {
// heap has k items, and the new row is greater than the
// current max in the heap ==> it is not a new topk
Some(max_row) if row.as_ref() >= max_row.row() => {}
// don't yet have k items or new item is lower than the currently k low values
None | Some(_) => {
self.heap.add(&mut batch_entry, row, index);
self.metrics.row_replacements.add(1);
}

let replacements = match selected_rows {
Some(filter) => {
self.find_new_topk_items(filter.values().set_indices(), &mut batch_entry)
}
None => self.find_new_topk_items(0..sort_keys[0].len(), &mut batch_entry),
};

self.metrics.row_replacements.add(replacements);

if replacements > 0 {
// Extract threshold values for each sort expression
// TODO: create a filter for each key that respects lexical ordering
// in the form of col0 < threshold0 || col0 == threshold0 && (col1 < threshold1 || ...)
// This could use BinaryExpr to benefit from short circuiting and early evaluation
// https://github.com/apache/datafusion/issues/15698
// Extract the value for this column from the max row
let thresholds: Vec<_> = self
.expr
.iter()
.map(|expr| {
let value = expr
.expr
.evaluate(&batch_entry.batch.slice(self.heap.max().unwrap().index, 1))?;
Ok(Some(match value {
ColumnarValue::Array(array) => {
ScalarValue::try_from_array(&array, 0)?
}
ColumnarValue::Scalar(scalar_value) => scalar_value,
}))
})
.collect::<Result<_>>()?;
self.thresholds
.write()
.expect("Write lock should succeed")
.clone_from(&thresholds);
}
self.heap.insert_batch_entry(batch_entry);

Expand All @@ -235,10 +332,31 @@ impl TopK {
// subsequent batches are guaranteed to be greater (by byte order, after row conversion) than the top K,
// which means the top K won't change and the computation can be finished early.
self.attempt_early_completion(&batch)?;

Ok(())
}

fn find_new_topk_items(
&mut self,
items: impl Iterator<Item = usize>,
batch_entry: &mut RecordBatchEntry,
) -> usize {
let mut replacements = 0;
let rows = &mut self.scratch_rows;
for (index, row) in items.zip(rows.iter()) {
match self.heap.max() {
// heap has k items, and the new row is greater than the
// current max in the heap ==> it is not a new topk
Some(max_row) if row.as_ref() >= max_row.row() => {}
// don't yet have k items or new item is lower than the currently k low values
None | Some(_) => {
self.heap.add(batch_entry, row, index);
replacements += 1;
}
}
}
replacements
}

/// If input ordering shares a common sort prefix with the TopK, and if the TopK's heap is full,
/// check if the computation can be finished early.
/// This is the case if the last row of the current batch is strictly greater than the max row in the heap,
Expand Down Expand Up @@ -328,6 +446,7 @@ impl TopK {
common_sort_prefix_converter: _,
common_sort_prefix: _,
finished: _,
thresholds: _,
} = self;
let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop

Expand Down Expand Up @@ -360,6 +479,10 @@ impl TopK {
+ self.scratch_rows.size()
+ self.heap.size()
}

pub fn thresholds(&self) -> &Arc<RwLock<Vec<Option<ScalarValue>>>> {
&self.thresholds
}
}

struct TopKMetrics {
Expand Down
Loading