Skip to content

Commit 645e0c8

Browse files
authored
Merge pull request #4 from yjshen/merge_fuzz
skip empty batch while inserting
2 parents e7b6d7a + 67c6a83 commit 645e0c8

File tree

1 file changed

+41
-28
lines changed

1 file changed

+41
-28
lines changed

datafusion/src/physical_plan/sorts/sort_preserving_merge.rs

Lines changed: 41 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -410,40 +410,53 @@ impl SortPreservingMergeStream {
410410
// Cursor is not finished - don't need a new RecordBatch yet
411411
return Poll::Ready(Ok(()));
412412
}
413-
let mut streams = self.streams.streams.lock().unwrap();
413+
let mut empty_batch = false;
414+
{
415+
let mut streams = self.streams.streams.lock().unwrap();
414416

415-
let stream = &mut streams[idx];
416-
if stream.is_terminated() {
417-
return Poll::Ready(Ok(()));
418-
}
419-
420-
// Fetch a new input record and create a cursor from it
421-
match futures::ready!(stream.poll_next_unpin(cx)) {
422-
None => return Poll::Ready(Ok(())),
423-
Some(Err(e)) => {
424-
return Poll::Ready(Err(e));
417+
let stream = &mut streams[idx];
418+
if stream.is_terminated() {
419+
return Poll::Ready(Ok(()));
425420
}
426-
Some(Ok(batch)) => {
427-
let cursor = match SortKeyCursor::new(
428-
idx,
429-
self.next_batch_id, // assign this batch an ID
430-
&batch,
431-
&self.column_expressions,
432-
self.sort_options.clone(),
433-
) {
434-
Ok(cursor) => cursor,
435-
Err(e) => {
436-
return Poll::Ready(Err(ArrowError::ExternalError(Box::new(e))));
421+
422+
// Fetch a new input record and create a cursor from it
423+
match futures::ready!(stream.poll_next_unpin(cx)) {
424+
None => return Poll::Ready(Ok(())),
425+
Some(Err(e)) => {
426+
return Poll::Ready(Err(e));
427+
}
428+
Some(Ok(batch)) => {
429+
if batch.num_rows() > 0 {
430+
let cursor = match SortKeyCursor::new(
431+
idx,
432+
self.next_batch_id, // assign this batch an ID
433+
&batch,
434+
&self.column_expressions,
435+
self.sort_options.clone(),
436+
) {
437+
Ok(cursor) => cursor,
438+
Err(e) => {
439+
return Poll::Ready(Err(ArrowError::ExternalError(
440+
Box::new(e),
441+
)));
442+
}
443+
};
444+
self.next_batch_id += 1;
445+
self.min_heap.push(cursor);
446+
self.cursor_finished[idx] = false;
447+
self.batches[idx].push_back(batch)
448+
} else {
449+
empty_batch = true;
437450
}
438-
};
439-
self.next_batch_id += 1;
440-
self.min_heap.push(cursor);
441-
self.cursor_finished[idx] = false;
442-
self.batches[idx].push_back(batch)
451+
}
443452
}
444453
}
445454

446-
Poll::Ready(Ok(()))
455+
if empty_batch {
456+
self.maybe_poll_stream(cx, idx)
457+
} else {
458+
Poll::Ready(Ok(()))
459+
}
447460
}
448461

449462
/// Drains the in_progress row indexes, and builds a new RecordBatch from them

0 commit comments

Comments
 (0)