Skip to content

Commit 6063bc5

Browse files
committed
Addresss comments
1 parent 66ef75a commit 6063bc5

File tree

1 file changed

+18
-3
lines changed
  • datafusion/physical-plan/src/sorts

1 file changed

+18
-3
lines changed

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use std::fmt;
2424
use std::fmt::{Debug, Formatter};
2525
use std::sync::Arc;
2626

27-
use crate::common::spawn_buffered;
2827
use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
2928
use crate::expressions::PhysicalSortExpr;
3029
use crate::limit::LimitStream;
@@ -687,23 +686,35 @@ impl ExternalSorter {
687686
let mut current_batches = Vec::new();
688687
let mut current_size = 0;
689688

690-
for batch in std::mem::take(&mut self.in_mem_batches) {
689+
// Drain in_mem_batches using pop() to release memory earlier.
690+
// This avoids holding onto the entire vector during iteration.
691+
// Note:
692+
// Now we use `sort_in_place_threshold_bytes` to determine, in future we can make it more dynamic.
693+
while let Some(batch) = self.in_mem_batches.pop() {
691694
let batch_size = get_reserved_byte_for_record_batch(&batch);
695+
696+
// If adding this batch would exceed the memory threshold, merge current_batches.
692697
if current_size + batch_size > self.sort_in_place_threshold_bytes
693698
&& !current_batches.is_empty()
694699
{
700+
// Merge accumulated batches into one.
695701
let merged = concat_batches(&self.schema, &current_batches)?;
696702
current_batches.clear();
703+
704+
// Update memory reservation.
697705
self.reservation.try_shrink(current_size)?;
698706
let merged_size = get_reserved_byte_for_record_batch(&merged);
699707
self.reservation.try_grow(merged_size)?;
708+
700709
merged_batches.push(merged);
701710
current_size = 0;
702711
}
712+
703713
current_batches.push(batch);
704714
current_size += batch_size;
705715
}
706716

717+
// Merge any remaining batches after the loop.
707718
if !current_batches.is_empty() {
708719
let merged = concat_batches(&self.schema, &current_batches)?;
709720
self.reservation.try_shrink(current_size)?;
@@ -712,15 +723,19 @@ impl ExternalSorter {
712723
merged_batches.push(merged);
713724
}
714725

726+
// Create sorted streams directly without using spawn_buffered.
727+
// This allows for sorting to happen inline and enables earlier batch drop.
715728
let streams = merged_batches
716729
.into_iter()
717730
.map(|batch| {
718731
let metrics = self.metrics.baseline.intermediate();
719732
let reservation = self
720733
.reservation
721734
.split(get_reserved_byte_for_record_batch(&batch));
735+
736+
// Sort the batch inline.
722737
let input = self.sort_batch_stream(batch, metrics, reservation)?;
723-
Ok(spawn_buffered(input, 1))
738+
Ok(input)
724739
})
725740
.collect::<Result<_>>()?;
726741

0 commit comments

Comments
 (0)