Skip to content

Commit d567705

Browse files
author
Jiayu Liu
committed
Squashed commit of the following:
commit 7fb3640 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 16:38:25 2021 +0800 row number done commit 1723926 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 16:05:50 2021 +0800 add row number commit bf5b8a5 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 15:04:49 2021 +0800 save commit d2ce852 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 14:53:05 2021 +0800 add streams commit 0a861a7 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 20 22:28:34 2021 +0800 save stream commit a9121af Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 20 22:01:51 2021 +0800 update unit test commit 2af2a27 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 14:25:12 2021 +0800 fix unit test commit bb57c76 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 14:23:34 2021 +0800 use upper case commit 5d96e52 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 14:16:16 2021 +0800 fix unit test commit 1ecae8f Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 12:27:26 2021 +0800 fix unit test commit bc2271d Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 10:04:29 2021 +0800 fix error commit 880b94f Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 08:24:00 2021 +0800 fix unit test commit 4e792e1 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 08:05:17 2021 +0800 fix test commit c36c04a Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 00:07:54 2021 +0800 add more tests commit f5e64de Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 20 23:41:36 2021 +0800 update commit a1eae86 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 20 23:36:15 2021 +0800 enrich unit test commit 0d2a214 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 20 23:25:43 2021 +0800 adding filter by todo commit 8b486d5 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 20 23:17:22 2021 +0800 adding more built-in functions commit abf08cd Author: Jiayu Liu <Jimexist@users.noreply.github.com> Date: Thu May 20 22:36:27 2021 +0800 Update datafusion/src/physical_plan/window_functions.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> commit 0cbca53 Author: Jiayu Liu <Jimexist@users.noreply.github.com> Date: Thu May 20 22:34:57 2021 +0800 Update datafusion/src/physical_plan/window_functions.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> commit 831c069 Author: Jiayu Liu <Jimexist@users.noreply.github.com> Date: Thu May 20 22:34:04 2021 +0800 Update datafusion/src/logical_plan/builder.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> commit f70c739 Author: Jiayu Liu <Jimexist@users.noreply.github.com> Date: Thu May 20 22:33:04 2021 +0800 Update datafusion/src/logical_plan/builder.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> commit 3ee87aa Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Wed May 19 22:55:08 2021 +0800 fix unit test commit 5c4d92d Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Wed May 19 22:48:26 2021 +0800 fix clippy commit a0b7526 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Wed May 19 22:46:38 2021 +0800 fix unused imports commit 1d3b076 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 13 18:51:14 2021 +0800 add window expr
1 parent 68ad990 commit d567705

File tree

11 files changed

+876
-65
lines changed

11 files changed

+876
-65
lines changed

datafusion/src/execution/context.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1268,6 +1268,13 @@ mod tests {
12681268
Ok(())
12691269
}
12701270

1271+
#[tokio::test]
1272+
async fn window() -> Result<()> {
1273+
let results = execute("SELECT c1, MAX(c2) OVER () FROM test", 4).await?;
1274+
assert_eq!(results.len(), 1);
1275+
Ok(())
1276+
}
1277+
12711278
#[tokio::test]
12721279
async fn aggregate() -> Result<()> {
12731280
let results = execute("SELECT SUM(c1), SUM(c2) FROM test", 4).await?;

datafusion/src/physical_plan/expressions/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ mod literal;
4040
mod min_max;
4141
mod negative;
4242
mod not;
43+
mod nth_value;
4344
mod nullif;
45+
mod row_number;
4446
mod sum;
4547
mod try_cast;
4648

@@ -57,7 +59,9 @@ pub use literal::{lit, Literal};
5759
pub use min_max::{Max, Min};
5860
pub use negative::{negative, NegativeExpr};
5961
pub use not::{not, NotExpr};
62+
pub use nth_value::{FirstValue, LastValue, NthValue};
6063
pub use nullif::{nullif_func, SUPPORTED_NULLIF_TYPES};
64+
pub use row_number::RowNumber;
6165
pub use sum::{sum_return_type, Sum};
6266
pub use try_cast::{try_cast, TryCastExpr};
6367
/// returns the name of the state
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Defines physical expressions that can evaluated at runtime during query execution
19+
20+
use crate::error::{DataFusionError, Result};
21+
use crate::physical_plan::{BuiltInWindowFunctionExpr, PhysicalExpr, WindowAccumulator};
22+
use crate::scalar::ScalarValue;
23+
use arrow::datatypes::{DataType, Field};
24+
use std::any::Any;
25+
use std::convert::TryFrom;
26+
use std::sync::Arc;
27+
28+
/// first_value expression
29+
#[derive(Debug)]
30+
pub struct FirstValue {
31+
name: String,
32+
data_type: DataType,
33+
expr: Arc<dyn PhysicalExpr>,
34+
}
35+
36+
impl FirstValue {
37+
/// Create a new FIRST_VALUE window aggregate function
38+
pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
39+
Self {
40+
name,
41+
data_type,
42+
expr,
43+
}
44+
}
45+
}
46+
47+
impl BuiltInWindowFunctionExpr for FirstValue {
48+
/// Return a reference to Any that can be used for downcasting
49+
fn as_any(&self) -> &dyn Any {
50+
self
51+
}
52+
53+
fn field(&self) -> Result<Field> {
54+
let nullable = true;
55+
Ok(Field::new(&self.name, self.data_type.clone(), nullable))
56+
}
57+
58+
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
59+
vec![self.expr.clone()]
60+
}
61+
62+
fn name(&self) -> &str {
63+
&self.name
64+
}
65+
66+
fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
67+
Ok(Box::new(NthValueAccumulator::try_new(
68+
1,
69+
self.data_type.clone(),
70+
)?))
71+
}
72+
}
73+
74+
// sql values start with 1, so we can use 0 to indicate the special last value behavior
75+
const SPECIAL_SIZE_VALUE_FOR_LAST: u32 = 0;
76+
77+
/// last_value expression
78+
#[derive(Debug)]
79+
pub struct LastValue {
80+
name: String,
81+
data_type: DataType,
82+
expr: Arc<dyn PhysicalExpr>,
83+
}
84+
85+
impl LastValue {
86+
/// Create a new FIRST_VALUE window aggregate function
87+
pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
88+
Self {
89+
name,
90+
data_type,
91+
expr,
92+
}
93+
}
94+
}
95+
96+
impl BuiltInWindowFunctionExpr for LastValue {
97+
/// Return a reference to Any that can be used for downcasting
98+
fn as_any(&self) -> &dyn Any {
99+
self
100+
}
101+
102+
fn field(&self) -> Result<Field> {
103+
let nullable = true;
104+
Ok(Field::new(&self.name, self.data_type.clone(), nullable))
105+
}
106+
107+
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
108+
vec![self.expr.clone()]
109+
}
110+
111+
fn name(&self) -> &str {
112+
&self.name
113+
}
114+
115+
fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
116+
Ok(Box::new(NthValueAccumulator::try_new(
117+
SPECIAL_SIZE_VALUE_FOR_LAST,
118+
self.data_type.clone(),
119+
)?))
120+
}
121+
}
122+
123+
/// nth_value expression
124+
#[derive(Debug)]
125+
pub struct NthValue {
126+
name: String,
127+
n: u32,
128+
data_type: DataType,
129+
expr: Arc<dyn PhysicalExpr>,
130+
}
131+
132+
impl NthValue {
133+
/// Create a new NTH_VALUE window aggregate function
134+
pub fn try_new(
135+
expr: Arc<dyn PhysicalExpr>,
136+
name: String,
137+
n: u32,
138+
data_type: DataType,
139+
) -> Result<Self> {
140+
if n == SPECIAL_SIZE_VALUE_FOR_LAST {
141+
Err(DataFusionError::Execution(
142+
"nth_value expect n to be > 0".to_owned(),
143+
))
144+
} else {
145+
Ok(Self {
146+
name,
147+
n,
148+
data_type,
149+
expr,
150+
})
151+
}
152+
}
153+
}
154+
155+
impl BuiltInWindowFunctionExpr for NthValue {
156+
/// Return a reference to Any that can be used for downcasting
157+
fn as_any(&self) -> &dyn Any {
158+
self
159+
}
160+
161+
fn field(&self) -> Result<Field> {
162+
let nullable = true;
163+
Ok(Field::new(&self.name, self.data_type.clone(), nullable))
164+
}
165+
166+
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
167+
vec![self.expr.clone()]
168+
}
169+
170+
fn name(&self) -> &str {
171+
&self.name
172+
}
173+
174+
fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
175+
Ok(Box::new(NthValueAccumulator::try_new(
176+
self.n,
177+
self.data_type.clone(),
178+
)?))
179+
}
180+
}
181+
182+
#[derive(Debug)]
183+
struct NthValueAccumulator {
184+
// n the target nth_value, however we'll reuse it for last_value acc, so when n == 0 it specifically
185+
// means last; also note that it is totally valid for n to be larger than the number of rows input
186+
// in which case all the values shall be null
187+
n: u32,
188+
offset: u32,
189+
value: ScalarValue,
190+
}
191+
192+
impl NthValueAccumulator {
193+
/// new count accumulator
194+
pub fn try_new(n: u32, data_type: DataType) -> Result<Self> {
195+
Ok(Self {
196+
n,
197+
offset: 0,
198+
// null value of that data_type by default
199+
value: ScalarValue::try_from(&data_type)?,
200+
})
201+
}
202+
}
203+
204+
impl WindowAccumulator for NthValueAccumulator {
205+
fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>> {
206+
if self.n == SPECIAL_SIZE_VALUE_FOR_LAST {
207+
// for last_value function
208+
self.value = values[0].clone();
209+
} else if self.offset < self.n {
210+
self.offset += 1;
211+
if self.offset == self.n {
212+
self.value = values[0].clone();
213+
}
214+
}
215+
Ok(None)
216+
}
217+
218+
fn evaluate(&self) -> Result<Option<ScalarValue>> {
219+
Ok(Some(self.value.clone()))
220+
}
221+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Defines physical expressions that can evaluated at runtime during query execution
19+
20+
use crate::error::Result;
21+
use crate::physical_plan::{BuiltInWindowFunctionExpr, PhysicalExpr, WindowAccumulator};
22+
use crate::scalar::ScalarValue;
23+
use arrow::array::ArrayRef;
24+
use arrow::datatypes::{DataType, Field};
25+
use std::any::Any;
26+
use std::sync::Arc;
27+
28+
/// row_number expression
29+
#[derive(Debug)]
30+
pub struct RowNumber {
31+
name: String,
32+
}
33+
34+
impl RowNumber {
35+
/// Create a new MAX aggregate function
36+
pub fn new(name: String) -> Self {
37+
Self { name }
38+
}
39+
}
40+
41+
impl BuiltInWindowFunctionExpr for RowNumber {
42+
/// Return a reference to Any that can be used for downcasting
43+
fn as_any(&self) -> &dyn Any {
44+
self
45+
}
46+
47+
fn field(&self) -> Result<Field> {
48+
let nullable = false;
49+
let data_type = DataType::UInt64;
50+
Ok(Field::new(&self.name, data_type, nullable))
51+
}
52+
53+
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
54+
vec![]
55+
}
56+
57+
fn name(&self) -> &str {
58+
&self.name
59+
}
60+
61+
fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
62+
Ok(Box::new(RowNumberAccumulator::new()))
63+
}
64+
}
65+
66+
#[derive(Debug)]
67+
struct RowNumberAccumulator {
68+
row_number: u64,
69+
}
70+
71+
impl RowNumberAccumulator {
72+
/// new count accumulator
73+
pub fn new() -> Self {
74+
// row number is 1 based
75+
Self { row_number: 1 }
76+
}
77+
}
78+
79+
impl WindowAccumulator for RowNumberAccumulator {
80+
fn scan(&mut self, _values: &[ScalarValue]) -> Result<Option<ScalarValue>> {
81+
let result = Some(ScalarValue::UInt64(Some(self.row_number)));
82+
self.row_number += 1;
83+
Ok(result)
84+
}
85+
86+
fn scan_batch(
87+
&mut self,
88+
num_rows: usize,
89+
_values: &[ArrayRef],
90+
) -> Result<Option<Vec<ScalarValue>>> {
91+
let new_row_number = self.row_number + (num_rows as u64);
92+
let result = (self.row_number..new_row_number)
93+
.map(|i| ScalarValue::UInt64(Some(i)))
94+
.collect();
95+
self.row_number = new_row_number;
96+
Ok(Some(result))
97+
}
98+
99+
fn evaluate(&self) -> Result<Option<ScalarValue>> {
100+
Ok(None)
101+
}
102+
}

datafusion/src/physical_plan/hash_aggregate.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -712,7 +712,7 @@ impl GroupedHashAggregateStream {
712712
tx.send(result)
713713
});
714714

715-
GroupedHashAggregateStream {
715+
Self {
716716
schema,
717717
output: rx,
718718
finished: false,
@@ -825,7 +825,8 @@ fn aggregate_expressions(
825825
}
826826

827827
pin_project! {
828-
struct HashAggregateStream {
828+
/// stream struct for hash aggregation
829+
pub struct HashAggregateStream {
829830
schema: SchemaRef,
830831
#[pin]
831832
output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
@@ -878,7 +879,7 @@ impl HashAggregateStream {
878879
tx.send(result)
879880
});
880881

881-
HashAggregateStream {
882+
Self {
882883
schema,
883884
output: rx,
884885
finished: false,

0 commit comments

Comments
 (0)