Skip to content

Commit d1f53ce

Browse files
author
Jiayu Liu
committed
Squashed commit of the following:
commit 7fb3640 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 16:38:25 2021 +0800 row number done commit 1723926 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 16:05:50 2021 +0800 add row number commit bf5b8a5 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 15:04:49 2021 +0800 save commit d2ce852 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 14:53:05 2021 +0800 add streams commit 0a861a7 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 20 22:28:34 2021 +0800 save stream commit a9121af Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 20 22:01:51 2021 +0800 update unit test commit 2af2a27 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 14:25:12 2021 +0800 fix unit test commit bb57c76 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 14:23:34 2021 +0800 use upper case commit 5d96e52 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 14:16:16 2021 +0800 fix unit test commit 1ecae8f Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 12:27:26 2021 +0800 fix unit test commit bc2271d Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 10:04:29 2021 +0800 fix error commit 880b94f Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 08:24:00 2021 +0800 fix unit test commit 4e792e1 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 08:05:17 2021 +0800 fix test commit c36c04a Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Fri May 21 00:07:54 2021 +0800 add more tests commit f5e64de Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 20 23:41:36 2021 +0800 update commit a1eae86 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 20 23:36:15 2021 +0800 enrich unit test commit 0d2a214 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 20 23:25:43 2021 +0800 adding filter by todo commit 8b486d5 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 20 23:17:22 2021 +0800 adding more built-in functions commit abf08cd Author: Jiayu Liu <Jimexist@users.noreply.github.com> Date: Thu May 20 22:36:27 2021 +0800 Update datafusion/src/physical_plan/window_functions.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> commit 0cbca53 Author: Jiayu Liu <Jimexist@users.noreply.github.com> Date: Thu May 20 22:34:57 2021 +0800 Update datafusion/src/physical_plan/window_functions.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> commit 831c069 Author: Jiayu Liu <Jimexist@users.noreply.github.com> Date: Thu May 20 22:34:04 2021 +0800 Update datafusion/src/logical_plan/builder.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> commit f70c739 Author: Jiayu Liu <Jimexist@users.noreply.github.com> Date: Thu May 20 22:33:04 2021 +0800 Update datafusion/src/logical_plan/builder.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> commit 3ee87aa Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Wed May 19 22:55:08 2021 +0800 fix unit test commit 5c4d92d Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Wed May 19 22:48:26 2021 +0800 fix clippy commit a0b7526 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Wed May 19 22:46:38 2021 +0800 fix unused imports commit 1d3b076 Author: Jiayu Liu <jiayu.liu@airbnb.com> Date: Thu May 13 18:51:14 2021 +0800 add window expr
1 parent ad3d9c1 commit d1f53ce

File tree

5 files changed

+372
-52
lines changed

5 files changed

+372
-52
lines changed

datafusion/src/physical_plan/expressions/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ mod literal;
4040
mod min_max;
4141
mod negative;
4242
mod not;
43+
mod nth_value;
4344
mod nullif;
4445
mod row_number;
4546
mod sum;
@@ -58,6 +59,7 @@ pub use literal::{lit, Literal};
5859
pub use min_max::{Max, Min};
5960
pub use negative::{negative, NegativeExpr};
6061
pub use not::{not, NotExpr};
62+
pub use nth_value::{FirstValue, LastValue, NthValue};
6163
pub use nullif::{nullif_func, SUPPORTED_NULLIF_TYPES};
6264
pub use row_number::RowNumber;
6365
pub use sum::{sum_return_type, Sum};
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Defines physical expressions that can evaluated at runtime during query execution
19+
20+
use crate::error::{DataFusionError, Result};
21+
use crate::physical_plan::{
22+
window_functions::BuiltInWindowFunctionExpr, PhysicalExpr, WindowAccumulator,
23+
};
24+
use crate::scalar::ScalarValue;
25+
use arrow::datatypes::{DataType, Field};
26+
use std::any::Any;
27+
use std::convert::TryFrom;
28+
use std::sync::Arc;
29+
30+
/// first_value expression
31+
#[derive(Debug)]
32+
pub struct FirstValue {
33+
name: String,
34+
data_type: DataType,
35+
expr: Arc<dyn PhysicalExpr>,
36+
}
37+
38+
impl FirstValue {
39+
/// Create a new FIRST_VALUE window aggregate function
40+
pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
41+
Self {
42+
name,
43+
data_type,
44+
expr,
45+
}
46+
}
47+
}
48+
49+
impl BuiltInWindowFunctionExpr for FirstValue {
50+
/// Return a reference to Any that can be used for downcasting
51+
fn as_any(&self) -> &dyn Any {
52+
self
53+
}
54+
55+
fn field(&self) -> Result<Field> {
56+
let nullable = true;
57+
Ok(Field::new(&self.name, self.data_type.clone(), nullable))
58+
}
59+
60+
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
61+
vec![self.expr.clone()]
62+
}
63+
64+
fn name(&self) -> &str {
65+
&self.name
66+
}
67+
68+
fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
69+
Ok(Box::new(NthValueAccumulator::try_new(
70+
1,
71+
self.data_type.clone(),
72+
)?))
73+
}
74+
}
75+
76+
// sql values start with 1, so we can use 0 to indicate the special last value behavior
77+
const SPECIAL_SIZE_VALUE_FOR_LAST: u32 = 0;
78+
79+
/// last_value expression
80+
#[derive(Debug)]
81+
pub struct LastValue {
82+
name: String,
83+
data_type: DataType,
84+
expr: Arc<dyn PhysicalExpr>,
85+
}
86+
87+
impl LastValue {
88+
/// Create a new FIRST_VALUE window aggregate function
89+
pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
90+
Self {
91+
name,
92+
data_type,
93+
expr,
94+
}
95+
}
96+
}
97+
98+
impl BuiltInWindowFunctionExpr for LastValue {
99+
/// Return a reference to Any that can be used for downcasting
100+
fn as_any(&self) -> &dyn Any {
101+
self
102+
}
103+
104+
fn field(&self) -> Result<Field> {
105+
let nullable = true;
106+
Ok(Field::new(&self.name, self.data_type.clone(), nullable))
107+
}
108+
109+
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
110+
vec![self.expr.clone()]
111+
}
112+
113+
fn name(&self) -> &str {
114+
&self.name
115+
}
116+
117+
fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
118+
Ok(Box::new(NthValueAccumulator::try_new(
119+
SPECIAL_SIZE_VALUE_FOR_LAST,
120+
self.data_type.clone(),
121+
)?))
122+
}
123+
}
124+
125+
/// nth_value expression
126+
#[derive(Debug)]
127+
pub struct NthValue {
128+
name: String,
129+
n: u32,
130+
data_type: DataType,
131+
expr: Arc<dyn PhysicalExpr>,
132+
}
133+
134+
impl NthValue {
135+
/// Create a new NTH_VALUE window aggregate function
136+
pub fn try_new(
137+
expr: Arc<dyn PhysicalExpr>,
138+
name: String,
139+
n: u32,
140+
data_type: DataType,
141+
) -> Result<Self> {
142+
if n == SPECIAL_SIZE_VALUE_FOR_LAST {
143+
Err(DataFusionError::Execution(
144+
"nth_value expect n to be > 0".to_owned(),
145+
))
146+
} else {
147+
Ok(Self {
148+
name,
149+
n,
150+
data_type,
151+
expr,
152+
})
153+
}
154+
}
155+
}
156+
157+
impl BuiltInWindowFunctionExpr for NthValue {
158+
/// Return a reference to Any that can be used for downcasting
159+
fn as_any(&self) -> &dyn Any {
160+
self
161+
}
162+
163+
fn field(&self) -> Result<Field> {
164+
let nullable = true;
165+
Ok(Field::new(&self.name, self.data_type.clone(), nullable))
166+
}
167+
168+
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
169+
vec![self.expr.clone()]
170+
}
171+
172+
fn name(&self) -> &str {
173+
&self.name
174+
}
175+
176+
fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
177+
Ok(Box::new(NthValueAccumulator::try_new(
178+
self.n,
179+
self.data_type.clone(),
180+
)?))
181+
}
182+
}
183+
184+
#[derive(Debug)]
185+
struct NthValueAccumulator {
186+
// n the target nth_value, however we'll reuse it for last_value acc, so when n == 0 it specifically
187+
// means last; also note that it is totally valid for n to be larger than the number of rows input
188+
// in which case all the values shall be null
189+
n: u32,
190+
offset: u32,
191+
value: ScalarValue,
192+
}
193+
194+
impl NthValueAccumulator {
195+
/// new count accumulator
196+
pub fn try_new(n: u32, data_type: DataType) -> Result<Self> {
197+
Ok(Self {
198+
n,
199+
offset: 0,
200+
// null value of that data_type by default
201+
value: ScalarValue::try_from(&data_type)?,
202+
})
203+
}
204+
}
205+
206+
impl WindowAccumulator for NthValueAccumulator {
207+
fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>> {
208+
if self.n == SPECIAL_SIZE_VALUE_FOR_LAST {
209+
// for last_value function
210+
self.value = values[0].clone();
211+
} else if self.offset < self.n {
212+
self.offset += 1;
213+
if self.offset == self.n {
214+
self.value = values[0].clone();
215+
}
216+
}
217+
Ok(None)
218+
}
219+
220+
fn evaluate(&self) -> Result<Option<ScalarValue>> {
221+
Ok(Some(self.value.clone()))
222+
}
223+
}

0 commit comments

Comments
 (0)