Skip to content

Commit 23ded9f

Browse files
author
Jiayu Liu
committed
Squashed commit of the following:
commit 7fb3640 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 16:38:25 2021 +0800 row number done commit 1723926 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 16:05:50 2021 +0800 add row number commit bf5b8a5 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 15:04:49 2021 +0800 save commit d2ce852 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 14:53:05 2021 +0800 add streams commit 0a861a7 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 20 22:28:34 2021 +0800 save stream commit a9121af Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 20 22:01:51 2021 +0800 update unit test commit 2af2a27 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 14:25:12 2021 +0800 fix unit test commit bb57c76 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 14:23:34 2021 +0800 use upper case commit 5d96e52 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 14:16:16 2021 +0800 fix unit test commit 1ecae8f Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 12:27:26 2021 +0800 fix unit test commit bc2271d Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 10:04:29 2021 +0800 fix error commit 880b94f Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 08:24:00 2021 +0800 fix unit test commit 4e792e1 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 08:05:17 2021 +0800 fix test commit c36c04a Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 00:07:54 2021 +0800 add more tests commit f5e64de Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 20 23:41:36 2021 +0800 update commit a1eae86 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 20 23:36:15 2021 +0800 enrich unit test commit 0d2a214 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 20 23:25:43 2021 +0800 adding filter by todo commit 8b486d5 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 20 23:17:22 2021 +0800 adding more built-in functions commit abf08cd Author: Jiayu Liu <Jimexist@users.noreply.github.com> Date: Thu May 20 22:36:27 2021 +0800 Update datafusion/src/physical_plan/window_functions.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> commit 0cbca53 Author: Jiayu Liu <Jimexist@users.noreply.github.com> Date: Thu May 20 22:34:57 2021 +0800 Update datafusion/src/physical_plan/window_functions.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> commit 831c069 Author: Jiayu Liu <Jimexist@users.noreply.github.com> Date: Thu May 20 22:34:04 2021 +0800 Update datafusion/src/logical_plan/builder.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> commit f70c739 Author: Jiayu Liu <Jimexist@users.noreply.github.com> Date: Thu May 20 22:33:04 2021 +0800 Update datafusion/src/logical_plan/builder.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> commit 3ee87aa Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Wed May 19 22:55:08 2021 +0800 fix unit test commit 5c4d92d Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Wed May 19 22:48:26 2021 +0800 fix clippy commit a0b7526 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Wed May 19 22:46:38 2021 +0800 fix unused imports commit 1d3b076 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 13 18:51:14 2021 +0800 add window expr
1 parent 68ad990 commit 23ded9f

File tree

10 files changed

+627
-67
lines changed

10 files changed

+627
-67
lines changed

datafusion/src/execution/context.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1268,6 +1268,13 @@ mod tests {
12681268
Ok(())
12691269
}
12701270

1271+
#[tokio::test]
1272+
async fn window() -> Result<()> {
1273+
let results = execute("SELECT c1, MAX(c2) OVER () FROM test", 4).await?;
1274+
assert_eq!(results.len(), 1);
1275+
Ok(())
1276+
}
1277+
12711278
#[tokio::test]
12721279
async fn aggregate() -> Result<()> {
12731280
let results = execute("SELECT SUM(c1), SUM(c2) FROM test", 4).await?;

datafusion/src/physical_plan/expressions/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ mod min_max;
4141
mod negative;
4242
mod not;
4343
mod nullif;
44+
mod row_number;
4445
mod sum;
4546
mod try_cast;
4647

@@ -58,6 +59,7 @@ pub use min_max::{Max, Min};
5859
pub use negative::{negative, NegativeExpr};
5960
pub use not::{not, NotExpr};
6061
pub use nullif::{nullif_func, SUPPORTED_NULLIF_TYPES};
62+
pub use row_number::RowNumber;
6163
pub use sum::{sum_return_type, Sum};
6264
pub use try_cast::{try_cast, TryCastExpr};
6365
/// returns the name of the state
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Defines physical expression for `row_number` that can evaluated at runtime during query execution
19+
20+
use crate::error::Result;
21+
use crate::physical_plan::{
22+
window_functions::BuiltInWindowFunctionExpr, PhysicalExpr, WindowAccumulator,
23+
};
24+
use crate::scalar::ScalarValue;
25+
use arrow::array::{ArrayRef, UInt64Array};
26+
use arrow::datatypes::{DataType, Field};
27+
use std::any::Any;
28+
use std::sync::Arc;
29+
30+
/// row_number expression
31+
#[derive(Debug)]
32+
pub struct RowNumber {
33+
name: String,
34+
}
35+
36+
impl RowNumber {
37+
/// Create a new ROW_NUMBER function
38+
pub fn new(name: String) -> Self {
39+
Self { name }
40+
}
41+
}
42+
43+
impl BuiltInWindowFunctionExpr for RowNumber {
44+
/// Return a reference to Any that can be used for downcasting
45+
fn as_any(&self) -> &dyn Any {
46+
self
47+
}
48+
49+
fn field(&self) -> Result<Field> {
50+
let nullable = false;
51+
let data_type = DataType::UInt64;
52+
Ok(Field::new(&self.name, data_type, nullable))
53+
}
54+
55+
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
56+
vec![]
57+
}
58+
59+
fn name(&self) -> &str {
60+
&self.name
61+
}
62+
63+
fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
64+
Ok(Box::new(RowNumberAccumulator::new()))
65+
}
66+
}
67+
68+
#[derive(Debug)]
69+
struct RowNumberAccumulator {
70+
row_number: u64,
71+
}
72+
73+
impl RowNumberAccumulator {
74+
/// new count accumulator
75+
pub fn new() -> Self {
76+
// row number is 1 based
77+
Self { row_number: 1 }
78+
}
79+
}
80+
81+
impl WindowAccumulator for RowNumberAccumulator {
82+
fn scan(&mut self, _values: &[ScalarValue]) -> Result<Option<ScalarValue>> {
83+
let result = Some(ScalarValue::UInt64(Some(self.row_number)));
84+
self.row_number += 1;
85+
Ok(result)
86+
}
87+
88+
fn scan_batch(
89+
&mut self,
90+
num_rows: usize,
91+
_values: &[ArrayRef],
92+
) -> Result<Option<ArrayRef>> {
93+
let new_row_number = self.row_number + (num_rows as u64);
94+
let result = UInt64Array::from_iter_values(self.row_number..new_row_number);
95+
self.row_number = new_row_number;
96+
Ok(Some(Arc::new(result)))
97+
}
98+
99+
fn evaluate(&self) -> Result<Option<ScalarValue>> {
100+
Ok(None)
101+
}
102+
}

datafusion/src/physical_plan/hash_aggregate.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -712,7 +712,7 @@ impl GroupedHashAggregateStream {
712712
tx.send(result)
713713
});
714714

715-
GroupedHashAggregateStream {
715+
Self {
716716
schema,
717717
output: rx,
718718
finished: false,
@@ -825,7 +825,8 @@ fn aggregate_expressions(
825825
}
826826

827827
pin_project! {
828-
struct HashAggregateStream {
828+
/// stream struct for hash aggregation
829+
pub struct HashAggregateStream {
829830
schema: SchemaRef,
830831
#[pin]
831832
output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
@@ -878,7 +879,7 @@ impl HashAggregateStream {
878879
tx.send(result)
879880
});
880881

881-
HashAggregateStream {
882+
Self {
882883
schema,
883884
output: rx,
884885
finished: false,

datafusion/src/physical_plan/mod.rs

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,10 +457,22 @@ pub trait WindowExpr: Send + Sync + Debug {
457457
fn name(&self) -> &str {
458458
"WindowExpr: default name"
459459
}
460+
461+
/// the accumulator used to accumulate values from the expressions.
462+
/// the accumulator expects the same number of arguments as `expressions` and must
463+
/// return states with the same description as `state_fields`
464+
fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>>;
465+
466+
/// expressions that are passed to the WindowAccumulator.
467+
/// Functions which take a single input argument, such as `sum`, return a single [`Expr`],
468+
/// others (e.g. `cov`) return many.
469+
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
460470
}
461471

462472
/// An accumulator represents a stateful object that lives throughout the evaluation of multiple rows and
463-
/// generically accumulates values. An accumulator knows how to:
473+
/// generically accumulates values.
474+
///
475+
/// An accumulator knows how to:
464476
/// * update its state from inputs via `update`
465477
/// * convert its internal state to a vector of scalar values
466478
/// * update its state from multiple accumulators' states via `merge`
@@ -509,6 +521,51 @@ pub trait Accumulator: Send + Sync + Debug {
509521
fn evaluate(&self) -> Result<ScalarValue>;
510522
}
511523

524+
/// A window accumulator represents a stateful object that lives throughout the evaluation of multiple
525+
/// rows and generically accumulates values.
526+
///
527+
/// An accumulator knows how to:
528+
/// * update its state from inputs via `update`
529+
/// * convert its internal state to a vector of scalar values
530+
/// * update its state from multiple accumulators' states via `merge`
531+
/// * compute the final value from its internal state via `evaluate`
532+
pub trait WindowAccumulator: Send + Sync + Debug {
533+
/// scans the accumulator's state from a vector of scalars, similar to Accumulator it also
534+
/// optionally generates values.
535+
fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>>;
536+
537+
/// scans the accumulator's state from a vector of arrays.
538+
fn scan_batch(
539+
&mut self,
540+
num_rows: usize,
541+
values: &[ArrayRef],
542+
) -> Result<Option<ArrayRef>> {
543+
if values.is_empty() {
544+
return Ok(None);
545+
};
546+
// transpose columnar to row based so that we can apply window
547+
let result = (0..num_rows)
548+
.map(|index| {
549+
let v = values
550+
.iter()
551+
.map(|array| ScalarValue::try_from_array(array, index))
552+
.collect::<Result<Vec<_>>>()?;
553+
self.scan(&v)
554+
})
555+
.into_iter()
556+
.collect::<Result<Option<Vec<_>>>>()?;
557+
558+
if let Some(arr) = result {
559+
Ok(Some(ScalarValue::iter_to_array(&arr)?))
560+
} else {
561+
Ok(None)
562+
}
563+
}
564+
565+
/// returns its value based on its current state.
566+
fn evaluate(&self) -> Result<Option<ScalarValue>>;
567+
}
568+
512569
pub mod aggregates;
513570
pub mod array_expressions;
514571
pub mod coalesce_batches;

datafusion/src/physical_plan/planner.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,10 @@ impl DefaultPhysicalPlanner {
147147
// Initially need to perform the aggregate and then merge the partitions
148148
let input_exec = self.create_initial_plan(input, ctx_state)?;
149149
let input_schema = input_exec.schema();
150-
let physical_input_schema = input_exec.as_ref().schema();
150+
151151
let logical_input_schema = input.as_ref().schema();
152+
let physical_input_schema = input_exec.as_ref().schema();
153+
152154
let window_expr = window_expr
153155
.iter()
154156
.map(|e| {

datafusion/src/physical_plan/sort.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ fn sort_batches(
227227
}
228228

229229
pin_project! {
230+
/// stream for sort plan
230231
struct SortStream {
231232
#[pin]
232233
output: futures::channel::oneshot::Receiver<ArrowResult<Option<RecordBatch>>>,

datafusion/src/physical_plan/window_functions.rs

Lines changed: 75 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@
2020
//!
2121
//! see also https://www.postgresql.org/docs/current/functions-window.html
2222
23+
use crate::arrow::datatypes::Field;
2324
use crate::error::{DataFusionError, Result};
2425
use crate::physical_plan::{
2526
aggregates, aggregates::AggregateFunction, functions::Signature,
26-
type_coercion::data_types,
27+
type_coercion::data_types, PhysicalExpr, WindowAccumulator,
2728
};
2829
use arrow::datatypes::DataType;
30+
use std::any::Any;
31+
use std::sync::Arc;
2932
use std::{fmt, str::FromStr};
3033

3134
/// WindowFunction
@@ -143,52 +146,92 @@ impl FromStr for BuiltInWindowFunction {
143146

144147
/// Returns the datatype of the window function
145148
pub fn return_type(fun: &WindowFunction, arg_types: &[DataType]) -> Result<DataType> {
149+
match fun {
150+
WindowFunction::AggregateFunction(fun) => aggregates::return_type(fun, arg_types),
151+
WindowFunction::BuiltInWindowFunction(fun) => {
152+
return_type_for_built_in(fun, arg_types)
153+
}
154+
}
155+
}
156+
157+
/// Returns the datatype of the built-in window function
158+
pub(super) fn return_type_for_built_in(
159+
fun: &BuiltInWindowFunction,
160+
arg_types: &[DataType],
161+
) -> Result<DataType> {
146162
// Note that this function *must* return the same type that the respective physical expression returns
147163
// or the execution panics.
148164

149165
// verify that this is a valid set of data types for this function
150-
data_types(arg_types, &signature(fun))?;
166+
data_types(arg_types, &signature_for_built_in(fun))?;
151167

152168
match fun {
153-
WindowFunction::AggregateFunction(fun) => aggregates::return_type(fun, arg_types),
154-
WindowFunction::BuiltInWindowFunction(fun) => match fun {
155-
BuiltInWindowFunction::RowNumber
156-
| BuiltInWindowFunction::Rank
157-
| BuiltInWindowFunction::DenseRank => Ok(DataType::UInt64),
158-
BuiltInWindowFunction::PercentRank | BuiltInWindowFunction::CumeDist => {
159-
Ok(DataType::Float64)
160-
}
161-
BuiltInWindowFunction::Ntile => Ok(DataType::UInt32),
162-
BuiltInWindowFunction::Lag
163-
| BuiltInWindowFunction::Lead
164-
| BuiltInWindowFunction::FirstValue
165-
| BuiltInWindowFunction::LastValue
166-
| BuiltInWindowFunction::NthValue => Ok(arg_types[0].clone()),
167-
},
169+
BuiltInWindowFunction::RowNumber
170+
| BuiltInWindowFunction::Rank
171+
| BuiltInWindowFunction::DenseRank => Ok(DataType::UInt64),
172+
BuiltInWindowFunction::PercentRank | BuiltInWindowFunction::CumeDist => {
173+
Ok(DataType::Float64)
174+
}
175+
BuiltInWindowFunction::Ntile => Ok(DataType::UInt32),
176+
BuiltInWindowFunction::Lag
177+
| BuiltInWindowFunction::Lead
178+
| BuiltInWindowFunction::FirstValue
179+
| BuiltInWindowFunction::LastValue
180+
| BuiltInWindowFunction::NthValue => Ok(arg_types[0].clone()),
168181
}
169182
}
170183

171184
/// the signatures supported by the function `fun`.
172-
fn signature(fun: &WindowFunction) -> Signature {
173-
// note: the physical expression must accept the type returned by this function or the execution panics.
185+
pub fn signature(fun: &WindowFunction) -> Signature {
174186
match fun {
175187
WindowFunction::AggregateFunction(fun) => aggregates::signature(fun),
176-
WindowFunction::BuiltInWindowFunction(fun) => match fun {
177-
BuiltInWindowFunction::RowNumber
178-
| BuiltInWindowFunction::Rank
179-
| BuiltInWindowFunction::DenseRank
180-
| BuiltInWindowFunction::PercentRank
181-
| BuiltInWindowFunction::CumeDist => Signature::Any(0),
182-
BuiltInWindowFunction::Lag
183-
| BuiltInWindowFunction::Lead
184-
| BuiltInWindowFunction::FirstValue
185-
| BuiltInWindowFunction::LastValue => Signature::Any(1),
186-
BuiltInWindowFunction::Ntile => Signature::Exact(vec![DataType::UInt64]),
187-
BuiltInWindowFunction::NthValue => Signature::Any(2),
188-
},
188+
WindowFunction::BuiltInWindowFunction(fun) => signature_for_built_in(fun),
189+
}
190+
}
191+
192+
/// the signatures supported by the built-in window function `fun`.
193+
pub(super) fn signature_for_built_in(fun: &BuiltInWindowFunction) -> Signature {
194+
// note: the physical expression must accept the type returned by this function or the execution panics.
195+
match fun {
196+
BuiltInWindowFunction::RowNumber
197+
| BuiltInWindowFunction::Rank
198+
| BuiltInWindowFunction::DenseRank
199+
| BuiltInWindowFunction::PercentRank
200+
| BuiltInWindowFunction::CumeDist => Signature::Any(0),
201+
BuiltInWindowFunction::Lag
202+
| BuiltInWindowFunction::Lead
203+
| BuiltInWindowFunction::FirstValue
204+
| BuiltInWindowFunction::LastValue => Signature::Any(1),
205+
BuiltInWindowFunction::Ntile => Signature::Exact(vec![DataType::UInt64]),
206+
BuiltInWindowFunction::NthValue => Signature::Any(2),
189207
}
190208
}
191209

210+
/// A window expression that is a built-in window function
211+
pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
212+
/// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be
213+
/// downcast to a specific implementation.
214+
fn as_any(&self) -> &dyn Any;
215+
216+
/// the field of the final result of this aggregation.
217+
fn field(&self) -> Result<Field>;
218+
219+
/// expressions that are passed to the Accumulator.
220+
/// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
221+
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
222+
223+
/// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default
224+
/// implementation returns placeholder text.
225+
fn name(&self) -> &str {
226+
"BuiltInWindowFunctionExpr: default name"
227+
}
228+
229+
/// the accumulator used to accumulate values from the expressions.
230+
/// the accumulator expects the same number of arguments as `expressions` and must
231+
/// return states with the same description as `state_fields`
232+
fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>>;
233+
}
234+
192235
#[cfg(test)]
193236
mod tests {
194237
use super::*;

0 commit comments

Comments
 (0)