File tree Expand file tree Collapse file tree 1 file changed +10
-0
lines changed
datafusion/physical-plan/src/aggregates Expand file tree Collapse file tree 1 file changed +10
-0
lines changed Original file line number Diff line number Diff line change @@ -179,13 +179,15 @@ impl AggregateStream {
179179pub struct YieldStream {
180180 inner : SendableRecordBatchStream ,
181181 batches_processed : usize ,
182+ buffer : Option < Result < RecordBatch > > ,
182183}
183184
184185impl YieldStream {
185186 pub fn new ( inner : SendableRecordBatchStream ) -> Self {
186187 Self {
187188 inner,
188189 batches_processed : 0 ,
190+ buffer : None ,
189191 }
190192 }
191193}
@@ -201,11 +203,19 @@ impl Stream for YieldStream {
201203 const YIELD_BATCHES : usize = 64 ;
202204 let this = & mut * self ;
203205
206+ if let Some ( batch) = this. buffer . take ( ) {
207+ return Poll :: Ready ( Some ( batch) ) ;
208+ }
209+
204210 match this. inner . poll_next_unpin ( cx) {
205211 Poll :: Ready ( Some ( Ok ( batch) ) ) => {
206212 this. batches_processed += 1 ;
207213 if this. batches_processed >= YIELD_BATCHES {
208214 this. batches_processed = 0 ;
215+ // We need to buffer the batch when we return Poll::Pending,
216+ // so that we can return it on the next poll.
217+ // Otherwise, the next poll will miss the batch and return None.
218+ this. buffer = Some ( Ok ( batch) ) ;
209219 cx. waker ( ) . wake_by_ref ( ) ;
210220 Poll :: Pending
211221 } else {
You can’t perform that action at this time.
0 commit comments