Skip to content

Perf: Optimize in memory sort #15380

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 94 additions & 9 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ use crate::{
};

use arrow::array::{
Array, RecordBatch, RecordBatchOptions, StringViewArray, UInt32Array,
Array, ArrayRef, RecordBatch, RecordBatchOptions, StringViewArray, UInt32Array,
};
use arrow::compute::{
concat, interleave_record_batch, lexsort_to_indices, take_arrays, SortColumn,
};
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, SortColumn};
use arrow::datatypes::{DataType, SchemaRef};
use arrow::row::{RowConverter, Rows, SortField};
use datafusion_common::{
Expand Down Expand Up @@ -674,16 +676,27 @@ impl ExternalSorter {
return self.sort_batch_stream(batch, metrics, reservation);
}

// If less than sort_in_place_threshold_bytes, concatenate and sort in place
// If less than sort_in_place_threshold_bytes, we sort in memory.
if self.reservation.size() < self.sort_in_place_threshold_bytes {
// Concatenate memory batches together and sort
let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
let interleave_indices = self.build_sorted_indices(
self.in_mem_batches.as_slice(),
Arc::clone(&self.expr),
)?;

let batches: Vec<&RecordBatch> = self.in_mem_batches.iter().collect();
let sorted_batch = interleave_record_batch(&batches, &interleave_indices)?;

self.in_mem_batches.clear();
self.reservation
.try_resize(get_reserved_byte_for_record_batch(&batch))
.try_resize(get_reserved_byte_for_record_batch(&sorted_batch))
.map_err(Self::err_with_oom_context)?;
let reservation = self.reservation.take();
return self.sort_batch_stream(batch, metrics, reservation);

metrics.record_output(sorted_batch.num_rows());

return Ok(Box::pin(RecordBatchStreamAdapter::new(
Arc::clone(&self.schema),
futures::stream::once(async { Ok(sorted_batch) }),
)) as SendableRecordBatchStream);
}

let streams = std::mem::take(&mut self.in_mem_batches)
Expand Down Expand Up @@ -711,6 +724,78 @@ impl ExternalSorter {
.build()
}

fn build_sorted_indices(
&self,
current_batches: &[RecordBatch],
expr: Arc<[PhysicalSortExpr]>,
) -> Result<Vec<(usize, usize)>> {
// ===== Phase 1: Build global sort columns for each sort expression =====
// For each sort expression, evaluate and collect the corresponding sort column from each in-memory batch
// Here, `self.expr` is a list of sort expressions, each providing `evaluate_to_sort_column()`,
// which returns an ArrayRef (in `.values`) and sort options (`options`)

// ```text
// columns_by_expr for example:
// ├── expr_0 ──┬── ArrayRef_0_0 (from batch_0)
// │ ├── ArrayRef_0_1 (from batch_1)
// │ └── ArrayRef_0_2 (from batch_2)
// ├── expr_1 ──┬── ArrayRef_1_0 (from batch_0)
// │ ├── ArrayRef_1_1 (from batch_1)
// │ └── ArrayRef_1_2 (from batch_2)
// ```
let mut columns_by_expr: Vec<Vec<ArrayRef>> = expr
.iter()
.map(|_| Vec::with_capacity(current_batches.len()))
.collect();

for batch in current_batches {
for (i, e) in expr.iter().enumerate() {
let col = e.evaluate_to_sort_column(batch)?.values;
columns_by_expr[i].push(col);
}
}

// For each sort expression, concatenate arrays from all batches into one global array
let mut sort_columns = Vec::with_capacity(expr.len());
for (arrays, e) in columns_by_expr.into_iter().zip(expr.iter()) {
let array = concat(
&arrays
.iter()
.map(|a| a.as_ref())
.collect::<Vec<&dyn Array>>(),
)?;
sort_columns.push(SortColumn {
values: array,
options: e.options.into(),
});
}

// ===== Phase 2: Compute global sorted indices =====
// Use `lexsort_to_indices` to get global row indices in sorted order (as if all batches were concatenated)
let indices = if !is_multi_column_with_lists(&sort_columns) {
lexsort_to_indices(&sort_columns, None)?
} else {
lexsort_to_indices_multi_columns(sort_columns, None)?
};

// Phase 3: Prepare indices for interleaving
let batch_indices: Vec<(usize, usize)> = current_batches
.iter()
.enumerate()
.flat_map(|(batch_id, batch)| {
(0..batch.num_rows()).map(move |i| (batch_id, i))
})
.collect();

let interleave_indices: Vec<(usize, usize)> = indices
.values()
.iter()
.map(|x| batch_indices[*x as usize])
.collect();

Ok(interleave_indices)
}

/// Sorts a single `RecordBatch` into a single stream.
///
/// `reservation` accounts for the memory used by this batch and
Expand Down Expand Up @@ -1382,7 +1467,7 @@ mod tests {
use crate::test::TestMemoryExec;

use arrow::array::*;
use arrow::compute::SortOptions;
use arrow::compute::{concat_batches, SortOptions};
use arrow::datatypes::*;
use datafusion_common::cast::as_primitive_array;
use datafusion_common::test_util::batches_to_string;
Expand Down