Skip to content

Commit 8bbadaf

Browse files
committed
Address comments
1 parent 64fc038 commit 8bbadaf

File tree

2 files changed

+46
-16
lines changed

2 files changed

+46
-16
lines changed

datafusion/physical-plan/src/coalesce/mod.rs

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,17 @@ pub struct LimitedBatchCoalescer {
3535
finished: bool,
3636
}
3737

38+
/// Status returned by [`LimitedBatchCoalescer::push_batch`]
39+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40+
pub enum PushBatchStatus {
41+
/// The limit has **not** been reached, and more batches can be pushed
42+
Continue,
43+
/// The limit **has** been reached after processing this batch
44+
/// The caller should call [`LimitedBatchCoalescer::finish`]
45+
/// to flush any buffered rows and stop pushing more batches.
46+
LimitReached,
47+
}
48+
3849
impl LimitedBatchCoalescer {
3950
/// Create a new `BatchCoalescer`
4051
///
@@ -61,11 +72,21 @@ impl LimitedBatchCoalescer {
6172
self.inner.schema()
6273
}
6374

64-
/// Push next batch, and returns [`true`] indicating if the limit is hit
75+
/// Pushes the next [`RecordBatch`] into the coalescer and returns its status.
76+
///
77+
/// # Arguments
78+
/// * `batch` - The [`RecordBatch`] to append.
79+
///
80+
/// # Returns
81+
/// * [`PushBatchStatus::Continue`] - More batches can still be pushed.
82+
/// * [`PushBatchStatus::LimitReached`] - The row limit was reached after processing
83+
/// this batch. The caller should call [`Self::finish`] before retrieving the
84+
/// remaining buffered batches.
6585
///
66-
/// If the limit is reached, the caller must call [`Self::finish()`] to
67-
/// complete the buffered results as a batch and finish the query.
68-
pub fn push_batch(&mut self, batch: RecordBatch) -> Result<bool> {
86+
/// # Errors
87+
/// Returns an error if called after [`Self::finish`] or if the internal push
88+
/// operation fails.
89+
pub fn push_batch(&mut self, batch: RecordBatch) -> Result<PushBatchStatus> {
6990
if self.finished {
7091
return internal_err!(
7192
"LimitedBatchCoalescer: cannot push batch after finish"
@@ -76,7 +97,7 @@ impl LimitedBatchCoalescer {
7697
if let Some(fetch) = self.fetch {
7798
// limit previously reached
7899
if self.total_rows >= fetch {
79-
return Ok(true);
100+
return Ok(PushBatchStatus::LimitReached);
80101
}
81102

82103
// limit now reached
@@ -88,14 +109,14 @@ impl LimitedBatchCoalescer {
88109
let batch_head = batch.slice(0, remaining_rows);
89110
self.total_rows += batch_head.num_rows();
90111
self.inner.push_batch(batch_head)?;
91-
return Ok(true);
112+
return Ok(PushBatchStatus::LimitReached);
92113
}
93114
}
94115

116+
// Limit not reached, push the entire batch
95117
self.total_rows += batch.num_rows();
96118
self.inner.push_batch(batch)?;
97-
98-
Ok(false) // not at limit
119+
Ok(PushBatchStatus::Continue)
99120
}
100121

101122
/// Return true if there is no data buffered
@@ -276,9 +297,13 @@ mod tests {
276297

277298
let mut output_batches = vec![];
278299
for batch in input_batches {
279-
if coalescer.push_batch(batch).unwrap() {
280-
// at limit, finish the coalescer
281-
break;
300+
match coalescer.push_batch(batch).unwrap() {
301+
PushBatchStatus::Continue => {
302+
// continue pushing batches
303+
}
304+
PushBatchStatus::LimitReached => {
305+
break;
306+
}
282307
}
283308
}
284309
coalescer.finish().unwrap();

datafusion/physical-plan/src/coalesce_batches.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use datafusion_common::Result;
3434
use datafusion_execution::TaskContext;
3535
use datafusion_physical_expr::PhysicalExpr;
3636

37-
use crate::coalesce::LimitedBatchCoalescer;
37+
use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
3838
use crate::execution_plan::CardinalityEffect;
3939
use crate::filter_pushdown::{
4040
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
@@ -302,10 +302,15 @@ impl CoalesceBatchesStream {
302302
self.coalescer.finish()?;
303303
}
304304
Some(Ok(batch)) => {
305-
if self.coalescer.push_batch(batch)? {
306-
// limit was reached, so stop early
307-
self.completed = true;
308-
self.coalescer.finish()?;
305+
match self.coalescer.push_batch(batch)? {
306+
PushBatchStatus::Continue => {
307+
// Keep pushing more batches
308+
}
309+
PushBatchStatus::LimitReached => {
310+
// limit was reached, so stop early
311+
self.completed = true;
312+
self.coalescer.finish()?;
313+
}
309314
}
310315
}
311316
// Error case

0 commit comments

Comments
 (0)