Skip to content

Commit b5c0c60

Browse files
mustafasrepoappletreeisyellow
authored andcommitted
Not fail when window input is empty record batch (apache#8466)
1 parent 665c068 commit b5c0c60

File tree

3 files changed

+17
-8
lines changed

3 files changed

+17
-8
lines changed

datafusion/common/src/utils.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use arrow::compute;
2525
use arrow::compute::{partition, SortColumn, SortOptions};
2626
use arrow::datatypes::{Field, SchemaRef, UInt32Type};
2727
use arrow::record_batch::RecordBatch;
28-
use arrow_array::{Array, LargeListArray, ListArray};
28+
use arrow_array::{Array, LargeListArray, ListArray, RecordBatchOptions};
2929
use arrow_schema::DataType;
3030
use sqlparser::ast::Ident;
3131
use sqlparser::dialect::GenericDialect;
@@ -90,8 +90,12 @@ pub fn get_record_batch_at_indices(
9090
indices: &PrimitiveArray<UInt32Type>,
9191
) -> Result<RecordBatch> {
9292
let new_columns = get_arrayref_at_indices(record_batch.columns(), indices)?;
93-
RecordBatch::try_new(record_batch.schema(), new_columns)
94-
.map_err(DataFusionError::ArrowError)
93+
RecordBatch::try_new_with_options(
94+
record_batch.schema(),
95+
new_columns,
96+
&RecordBatchOptions::new().with_row_count(Some(indices.len())),
97+
)
98+
.map_err(DataFusionError::ArrowError)
9599
}
96100

97101
/// This function compares two tuples depending on the given sort options.

datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use crate::{
4040
};
4141

4242
use arrow::{
43-
array::{Array, ArrayRef, UInt32Builder},
43+
array::{Array, ArrayRef, RecordBatchOptions, UInt32Builder},
4444
compute::{concat, concat_batches, sort_to_indices},
4545
datatypes::{Schema, SchemaBuilder, SchemaRef},
4646
record_batch::RecordBatch,
@@ -1026,8 +1026,11 @@ impl BoundedWindowAggStream {
10261026
.iter()
10271027
.map(|elem| elem.slice(n_out, n_to_keep))
10281028
.collect::<Vec<_>>();
1029-
self.input_buffer =
1030-
RecordBatch::try_new(self.input_buffer.schema(), batch_to_keep)?;
1029+
self.input_buffer = RecordBatch::try_new_with_options(
1030+
self.input_buffer.schema(),
1031+
batch_to_keep,
1032+
&RecordBatchOptions::new().with_row_count(Some(n_to_keep)),
1033+
)?;
10311034
Ok(())
10321035
}
10331036

datafusion/sqllogictest/test_files/window.slt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3771,10 +3771,12 @@ select a,
37713771
1 1
37723772
2 1
37733773

3774-
# TODO: this works in Postgres which returns [1, 1].
3775-
query error DataFusion error: Arrow error: Invalid argument error: must either specify a row count or at least one column
3774+
query I
37763775
select rank() over (RANGE between UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) rnk
37773776
from (select 1 a union select 2 a) q;
3777+
----
3778+
1
3779+
1
37783780

37793781
query II
37803782
select a,

0 commit comments

Comments
 (0)