Skip to content

Commit 923089f

Browse files
committed
Minor improvements
1 parent f735f64 commit 923089f

File tree

1 file changed

+9
-19
lines changed
  • datafusion/physical-plan/src/topk

1 file changed

+9
-19
lines changed

datafusion/physical-plan/src/topk/mod.rs

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ impl TopK {
274274

275275
let filter_predicate = FilterBuilder::new(&filter);
276276
let filter_predicate = if sort_keys.len() > 1 {
277+
// Optimize filter when it has multiple sort keys
277278
filter_predicate.optimize().build()
278279
} else {
279280
filter_predicate.build()
@@ -294,24 +295,12 @@ impl TopK {
294295

295296
let mut batch_entry = self.heap.register_batch(batch.clone());
296297

297-
let mut replacements = 0;
298-
299-
match selected_rows {
298+
let replacements = match selected_rows {
300299
Some(filter) => {
301-
self.find_new_topk_items(
302-
filter.values().set_indices(),
303-
&mut batch_entry,
304-
&mut replacements,
305-
);
300+
self.find_new_topk_items(filter.values().set_indices(), &mut batch_entry)
306301
}
307-
None => {
308-
self.find_new_topk_items(
309-
0..sort_keys[0].len(),
310-
&mut batch_entry,
311-
&mut replacements,
312-
);
313-
}
314-
}
302+
None => self.find_new_topk_items(0..sort_keys[0].len(), &mut batch_entry),
303+
};
315304

316305
self.metrics.row_replacements.add(replacements);
317306
self.heap.insert_batch_entry(batch_entry);
@@ -333,8 +322,8 @@ impl TopK {
333322
&mut self,
334323
items: impl Iterator<Item = usize>,
335324
batch_entry: &mut RecordBatchEntry,
336-
replacements: &mut usize,
337-
) {
325+
) -> usize {
326+
let mut replacements = 0;
338327
let rows = &mut self.scratch_rows;
339328
for (index, row) in items.zip(rows.iter()) {
340329
match self.heap.max() {
@@ -344,10 +333,11 @@ impl TopK {
344333
// don't yet have k items or new item is lower than the currently k low values
345334
None | Some(_) => {
346335
self.heap.add(batch_entry, row, index);
347-
*replacements += 1;
336+
replacements += 1;
348337
}
349338
}
350339
}
340+
replacements
351341
}
352342

353343
/// If input ordering shares a common sort prefix with the TopK, and if the TopK's heap is full,

0 commit comments

Comments
 (0)