Skip to content

Commit f7405ee

Browse files
committed
bug: remove busy-wait while sort is ongoing (#16321)
1 parent 1daa5ed commit f7405ee

File tree

1 file changed

+32
-23
lines changed
  • datafusion/physical-plan/src/sorts

1 file changed

+32
-23
lines changed

datafusion/physical-plan/src/sorts/merge.rs

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
//! Merge that deals with an arbitrary size of streaming inputs.
1919
//! This is an order-preserving merge.
2020
21-
use std::collections::VecDeque;
2221
use std::pin::Pin;
2322
use std::sync::Arc;
2423
use std::task::{ready, Context, Poll};
@@ -143,11 +142,8 @@ pub(crate) struct SortPreservingMergeStream<C: CursorValues> {
143142
/// number of rows produced
144143
produced: usize,
145144

146-
/// This queue contains partition indices in order. When a partition is polled and returns `Poll::Ready`,
147-
/// it is removed from the vector. If a partition returns `Poll::Pending`, it is moved to the end of the
148-
/// vector to ensure the next iteration starts with a different partition, preventing the same partition
149-
/// from being continuously polled.
150-
uninitiated_partitions: VecDeque<usize>,
145+
/// This vector contains the indices of the partitions that have not started emitting yet.
146+
uninitiated_partitions: Vec<usize>,
151147
}
152148

153149
impl<C: CursorValues> SortPreservingMergeStream<C> {
@@ -216,36 +212,49 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
216212
// Once all partitions have set their corresponding cursors for the loser tree,
217213
// we skip the following block. Until then, this function may be called multiple
218214
// times and can return Poll::Pending if any partition returns Poll::Pending.
215+
219216
if self.loser_tree.is_empty() {
220-
while let Some(&partition_idx) = self.uninitiated_partitions.front() {
217+
// Manual indexing since we're iterating over the vector and shrinking it in the loop
218+
let mut idx = 0;
219+
while idx < self.uninitiated_partitions.len() {
220+
// unwrap is safe since we just checked the index
221+
let &partition_idx = self.uninitiated_partitions.get(idx).unwrap();
221222
match self.maybe_poll_stream(cx, partition_idx) {
222223
Poll::Ready(Err(e)) => {
223224
self.aborted = true;
224225
return Poll::Ready(Some(Err(e)));
225226
}
226227
Poll::Pending => {
227-
// If a partition returns Poll::Pending, to avoid continuously polling it
228-
// and potentially increasing upstream buffer sizes, we move it to the
229-
// back of the polling queue.
230-
self.uninitiated_partitions.rotate_left(1);
231-
232-
// This function could remain in a pending state, so we manually wake it here.
233-
// However, this approach can be investigated further to find a more natural way
234-
// to avoid disrupting the runtime scheduler.
235-
cx.waker().wake_by_ref();
236-
return Poll::Pending;
228+
// The polled stream is pending which means we're already set up to
229+
// be woken when necessary
230+
// Try the next stream
231+
idx += 1;
237232
}
238233
_ => {
239-
// If the polling result is Poll::Ready(Some(batch)) or Poll::Ready(None),
240-
// we remove this partition from the queue so it is not polled again.
241-
self.uninitiated_partitions.pop_front();
234+
// The polled stream is ready
235+
// Remove it from uninitiated_partitions
236+
// Don't bump idx here, since a new element will have taken its
237+
// place which we'll try in the next loop iteration
238+
self.uninitiated_partitions.remove(idx);
242239
}
243240
}
244241
}
245242

246-
// Claim the memory for the uninitiated partitions
247-
self.uninitiated_partitions.shrink_to_fit();
248-
self.init_loser_tree();
243+
if self.uninitiated_partitions.is_empty() {
244+
// If there are no more uninitiated partitions, set up the loser tree and continue
245+
// to the next phase.
246+
247+
// Claim the memory for the uninitiated partitions
248+
self.uninitiated_partitions.shrink_to_fit();
249+
self.init_loser_tree();
250+
} else {
251+
// There are still uninitiated partitions so return pending.
252+
// We only get here if we've polled all uninitiated streams and at least one of them
253+
// returned pending itself. That means we will be woken as soon as one of the
254+
// streams would like to be polled again.
255+
// There is no need to reschedule ourselves eagerly.
256+
return Poll::Pending;
257+
}
249258
}
250259

251260
// NB timer records time taken on drop, so there are no

0 commit comments

Comments
 (0)