Skip to content

Commit ee57340

Browse files
author
Jiayu Liu
committed
use array ref
1 parent c82d100 commit ee57340

File tree

3 files changed

+23
-18
lines changed

3 files changed

+23
-18
lines changed

datafusion/src/physical_plan/expressions/row_number.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::physical_plan::{
2222
window_functions::BuiltInWindowFunctionExpr, PhysicalExpr, WindowAccumulator,
2323
};
2424
use crate::scalar::ScalarValue;
25-
use arrow::array::ArrayRef;
25+
use arrow::array::{ArrayRef, UInt64Array};
2626
use arrow::datatypes::{DataType, Field};
2727
use std::any::Any;
2828
use std::sync::Arc;
@@ -89,13 +89,11 @@ impl WindowAccumulator for RowNumberAccumulator {
8989
&mut self,
9090
num_rows: usize,
9191
_values: &[ArrayRef],
92-
) -> Result<Option<Vec<ScalarValue>>> {
92+
) -> Result<Option<ArrayRef>> {
9393
let new_row_number = self.row_number + (num_rows as u64);
94-
let result = (self.row_number..new_row_number)
95-
.map(|i| ScalarValue::UInt64(Some(i)))
96-
.collect();
94+
let result = UInt64Array::from_iter_values(self.row_number..new_row_number);
9795
self.row_number = new_row_number;
98-
Ok(Some(result))
96+
Ok(Some(Arc::new(result)))
9997
}
10098

10199
fn evaluate(&self) -> Result<Option<ScalarValue>> {

datafusion/src/physical_plan/mod.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -539,12 +539,12 @@ pub trait WindowAccumulator: Send + Sync + Debug {
539539
&mut self,
540540
num_rows: usize,
541541
values: &[ArrayRef],
542-
) -> Result<Option<Vec<ScalarValue>>> {
542+
) -> Result<Option<ArrayRef>> {
543543
if values.is_empty() {
544544
return Ok(None);
545545
};
546546
// transpose columnar to row based so that we can apply window
547-
let result: Vec<Option<ScalarValue>> = (0..num_rows)
547+
let result: Option<Vec<ScalarValue>> = (0..num_rows)
548548
.map(|index| {
549549
let v = values
550550
.iter()
@@ -553,8 +553,15 @@ pub trait WindowAccumulator: Send + Sync + Debug {
553553
self.scan(&v)
554554
})
555555
.into_iter()
556-
.collect::<Result<Vec<Option<ScalarValue>>>>()?;
557-
let result: Option<Vec<ScalarValue>> = result.into_iter().collect();
556+
.into_iter()
557+
.collect::<Result<Option<Vec<_>>>>()?;
558+
559+
let result: Option<ArrayRef> = if let Some(arr) = result {
560+
Some(ScalarValue::iter_to_array(&arr)?)
561+
} else {
562+
None
563+
};
564+
558565
Ok(result)
559566
}
560567

datafusion/src/physical_plan/windows.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use crate::physical_plan::{
2929
use crate::scalar::ScalarValue;
3030
use arrow::compute::concat;
3131
use arrow::{
32-
array::ArrayRef,
32+
array::{Array, ArrayRef},
3333
datatypes::{Field, Schema, SchemaRef},
3434
error::{ArrowError, Result as ArrowResult},
3535
record_batch::RecordBatch,
@@ -307,7 +307,7 @@ fn window_aggregate_batch(
307307
batch: &RecordBatch,
308308
window_accumulators: &mut [WindowAccumulatorItem],
309309
expressions: &[Vec<Arc<dyn PhysicalExpr>>],
310-
) -> Result<Vec<Option<Vec<ScalarValue>>>> {
310+
) -> Result<Vec<Option<ArrayRef>>> {
311311
// 1.1 iterate accumulators and respective expressions together
312312
// 1.2 evaluate expressions
313313
// 1.3 update / merge window accumulators with the expressions' values
@@ -364,7 +364,7 @@ async fn compute_window_aggregate(
364364
let expressions = Arc::new(expressions);
365365

366366
// TODO each element shall have some size hint
367-
let mut accumulator: Vec<Vec<ScalarValue>> =
367+
let mut accumulator: Vec<Vec<ArrayRef>> =
368368
iter::repeat(vec![]).take(window_expr.len()).collect();
369369

370370
let mut original_batches: Vec<RecordBatch> = vec![];
@@ -382,7 +382,7 @@ async fn compute_window_aggregate(
382382
accumulator.iter_mut().zip(batch_aggregated).for_each(
383383
|(acc_for_window, window_batch)| {
384384
if let Some(data) = window_batch {
385-
acc_for_window.extend(data);
385+
acc_for_window.push(data);
386386
}
387387
},
388388
);
@@ -400,13 +400,13 @@ async fn compute_window_aggregate(
400400
scalar_value.to_array_of_size(total_num_rows)
401401
}
402402
(acc, None) if !acc.is_empty() => {
403-
return Err(DataFusionError::NotImplemented(
404-
"built in window function not yet implemented".to_owned(),
405-
))
403+
let converted: Vec<&dyn Array> =
404+
acc.iter().map(|arc| arc.as_ref()).collect::<Vec<_>>();
405+
concat(&converted)?
406406
}
407407
_ => {
408408
return Err(DataFusionError::Execution(
409-
"invalid window function behavior".to_owned(),
409+
"Invalid window function behavior".to_owned(),
410410
))
411411
}
412412
};

0 commit comments

Comments
 (0)