Skip to content

Commit 8b486d5

Browse files
author
Jiayu Liu
committed
adding more built-in functions
1 parent abf08cd commit 8b486d5

File tree

5 files changed

+99
-19
lines changed

5 files changed

+99
-19
lines changed

ballista/rust/core/proto/ballista.proto

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,14 @@ enum BuiltInWindowFunction {
157157
ROW_NUMBER = 0;
158158
RANK = 1;
159159
DENSE_RANK = 2;
160-
LAG = 3;
161-
LEAD = 4;
162-
FIRST_VALUE = 5;
163-
LAST_VALUE = 6;
160+
PERCENT_RANK = 3;
161+
CUME_DIST = 4;
162+
NTILE = 5;
163+
LAG = 6;
164+
LEAD = 7;
165+
FIRST_VALUE = 8;
166+
LAST_VALUE = 9;
167+
NTH_VALUE = 10;
164168
}
165169

166170
message WindowExprNode {

ballista/rust/core/src/serde/logical_plan/from_proto.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1356,6 +1356,9 @@ impl From<protobuf::BuiltInWindowFunction> for BuiltInWindowFunction {
13561356
BuiltInWindowFunction::RowNumber
13571357
}
13581358
protobuf::BuiltInWindowFunction::Rank => BuiltInWindowFunction::Rank,
1359+
protobuf::BuiltInWindowFunction::PercentRank => {
1360+
BuiltInWindowFunction::PercentRank
1361+
}
13591362
protobuf::BuiltInWindowFunction::DenseRank => {
13601363
BuiltInWindowFunction::DenseRank
13611364
}
@@ -1364,6 +1367,9 @@ impl From<protobuf::BuiltInWindowFunction> for BuiltInWindowFunction {
13641367
protobuf::BuiltInWindowFunction::FirstValue => {
13651368
BuiltInWindowFunction::FirstValue
13661369
}
1370+
protobuf::BuiltInWindowFunction::CumeDist => BuiltInWindowFunction::CumeDist,
1371+
protobuf::BuiltInWindowFunction::Ntile => BuiltInWindowFunction::Ntile,
1372+
protobuf::BuiltInWindowFunction::NthValue => BuiltInWindowFunction::NthValue,
13671373
protobuf::BuiltInWindowFunction::LastValue => {
13681374
BuiltInWindowFunction::LastValue
13691375
}

ballista/rust/core/src/serde/logical_plan/to_proto.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1255,6 +1255,10 @@ impl From<&BuiltInWindowFunction> for protobuf::BuiltInWindowFunction {
12551255
match value {
12561256
BuiltInWindowFunction::FirstValue => Self::FirstValue,
12571257
BuiltInWindowFunction::LastValue => Self::LastValue,
1258+
BuiltInWindowFunction::NthValue => Self::NthValue,
1259+
BuiltInWindowFunction::Ntile => Self::Ntile,
1260+
BuiltInWindowFunction::CumeDist => Self::CumeDist,
1261+
BuiltInWindowFunction::PercentRank => Self::PercentRank,
12581262
BuiltInWindowFunction::RowNumber => Self::RowNumber,
12591263
BuiltInWindowFunction::Rank => Self::Rank,
12601264
BuiltInWindowFunction::Lag => Self::Lag,

datafusion/src/logical_plan/builder.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -289,18 +289,30 @@ impl LogicalPlanBuilder {
289289
}
290290

291291
/// Apply a window
292-
///
293-
/// NOTE: this feature is under development and this API will be changing
292+
///
293+
/// NOTE: this feature is under development and this API will be changing
294+
///
295+
/// - https://github.com/apache/arrow-datafusion/issues/359 basic structure
296+
/// - https://github.com/apache/arrow-datafusion/issues/298 empty over clause
297+
/// - https://github.com/apache/arrow-datafusion/issues/299 with partition clause
298+
/// - https://github.com/apache/arrow-datafusion/issues/360 with order by
299+
/// - https://github.com/apache/arrow-datafusion/issues/361 with window frame
294300
pub fn window(
295301
&self,
296302
window_expr: impl IntoIterator<Item = Expr>,
297-
// filter: impl IntoIterator<Item = Expr>,
303+
// FIXME: implement next
304+
// filter_by_expr: impl IntoIterator<Item = Expr>,
305+
// FIXME: implement next
298306
// partition_by_expr: impl IntoIterator<Item = Expr>,
307+
// FIXME: implement next
299308
// order_by_expr: impl IntoIterator<Item = Expr>,
309+
// FIXME: implement next
300310
// window_frame: Option<WindowFrame>,
301311
) -> Result<Self> {
302312
let window_expr = window_expr.into_iter().collect::<Vec<Expr>>();
313+
// FIXME: implement next
303314
// let partition_by_expr = partition_by_expr.into_iter().collect::<Vec<Expr>>();
315+
// FIXME: implement next
304316
// let order_by_expr = order_by_expr.into_iter().collect::<Vec<Expr>>();
305317
let all_expr = window_expr.iter();
306318
validate_unique_names("Windows", all_expr.clone(), self.plan.schema())?;

datafusion/src/physical_plan/window_functions.rs

Lines changed: 66 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -72,33 +72,51 @@ impl fmt::Display for WindowFunction {
7272
/// An aggregate function that is part of a built-in window function
7373
#[derive(Debug, Clone, PartialEq, Eq)]
7474
pub enum BuiltInWindowFunction {
75-
/// row number
75+
/// number of the current row within its partition, counting from 1
7676
RowNumber,
77-
/// rank
77+
/// rank of the current row with gaps; same as row_number of its first peer
7878
Rank,
79-
/// dense rank
79+
/// ank of the current row without gaps; this function counts peer groups
8080
DenseRank,
81-
/// lag
81+
/// relative rank of the current row: (rank - 1) / (total rows - 1)
82+
PercentRank,
83+
/// relative rank of the current row: (number of rows preceding or peer with current row) / (total rows)
84+
CumeDist,
85+
/// integer ranging from 1 to the argument value, dividing the partition as equally as possible
86+
Ntile,
87+
/// returns value evaluated at the row that is offset rows before the current row within the partition;
88+
/// if there is no such row, instead return default (which must be of the same type as value).
89+
/// Both offset and default are evaluated with respect to the current row.
90+
/// If omitted, offset defaults to 1 and default to null
8291
Lag,
83-
/// lead
92+
/// returns value evaluated at the row that is offset rows after the current row within the partition;
93+
/// if there is no such row, instead return default (which must be of the same type as value).
94+
/// Both offset and default are evaluated with respect to the current row.
95+
/// If omitted, offset defaults to 1 and default to null
8496
Lead,
85-
/// first value
97+
/// returns value evaluated at the row that is the first row of the window frame
8698
FirstValue,
87-
/// last value
99+
/// returns value evaluated at the row that is the last row of the window frame
88100
LastValue,
101+
/// returns value evaluated at the row that is the nth row of the window frame (counting from 1); null if no such row
102+
NthValue,
89103
}
90104

91105
impl FromStr for BuiltInWindowFunction {
92106
type Err = DataFusionError;
93107
fn from_str(name: &str) -> Result<BuiltInWindowFunction> {
94-
Ok(match name {
108+
Ok(match name.to_lowercase().as_str() {
95109
"row_number" => BuiltInWindowFunction::RowNumber,
96110
"rank" => BuiltInWindowFunction::Rank,
97111
"dense_rank" => BuiltInWindowFunction::DenseRank,
98-
"first_value" => BuiltInWindowFunction::FirstValue,
99-
"last_value" => BuiltInWindowFunction::LastValue,
112+
"percent_rank" => BuiltInWindowFunction::PercentRank,
113+
"cume_dist" => BuiltInWindowFunction::CumeDist,
114+
"ntile" => BuiltInWindowFunction::Ntile,
100115
"lag" => BuiltInWindowFunction::Lag,
101116
"lead" => BuiltInWindowFunction::Lead,
117+
"first_value" => BuiltInWindowFunction::FirstValue,
118+
"last_value" => BuiltInWindowFunction::LastValue,
119+
"nth_value" => BuiltInWindowFunction::NthValue,
102120
_ => {
103121
return Err(DataFusionError::Plan(format!(
104122
"There is no built-in window function named {}",
@@ -123,10 +141,15 @@ pub fn return_type(fun: &WindowFunction, arg_types: &[DataType]) -> Result<DataT
123141
BuiltInWindowFunction::RowNumber
124142
| BuiltInWindowFunction::Rank
125143
| BuiltInWindowFunction::DenseRank => Ok(DataType::UInt64),
144+
BuiltInWindowFunction::PercentRank | BuiltInWindowFunction::CumeDist => {
145+
Ok(DataType::Float64)
146+
}
147+
BuiltInWindowFunction::Ntile => Ok(DataType::UInt32),
126148
BuiltInWindowFunction::Lag
127149
| BuiltInWindowFunction::Lead
128150
| BuiltInWindowFunction::FirstValue
129-
| BuiltInWindowFunction::LastValue => Ok(arg_types[0].clone()),
151+
| BuiltInWindowFunction::LastValue
152+
| BuiltInWindowFunction::NthValue => Ok(arg_types[0].clone()),
130153
},
131154
}
132155
}
@@ -139,11 +162,42 @@ fn signature(fun: &WindowFunction) -> Signature {
139162
WindowFunction::BuiltInWindowFunction(fun) => match fun {
140163
BuiltInWindowFunction::RowNumber
141164
| BuiltInWindowFunction::Rank
142-
| BuiltInWindowFunction::DenseRank => Signature::Any(0),
165+
| BuiltInWindowFunction::DenseRank
166+
| BuiltInWindowFunction::PercentRank
167+
| BuiltInWindowFunction::CumeDist => Signature::Any(0),
143168
BuiltInWindowFunction::Lag
144169
| BuiltInWindowFunction::Lead
145170
| BuiltInWindowFunction::FirstValue
146171
| BuiltInWindowFunction::LastValue => Signature::Any(1),
172+
BuiltInWindowFunction::Ntile => Signature::Exact(vec![DataType::UInt64]),
173+
BuiltInWindowFunction::NthValue => Signature::Any(2),
147174
},
148175
}
149176
}
177+
178+
#[cfg(test)]
179+
mod tests {
180+
use super::*;
181+
use arrow::datatypes::{DataType, Field};
182+
183+
#[test]
184+
fn test_window_function_from_str() -> Result<()> {
185+
assert_eq!(
186+
WindowFunction::from_str("max")?,
187+
WindowFunction::AggregateFunction(AggregateFunction::Max)
188+
);
189+
assert_eq!(
190+
WindowFunction::from_str("min")?,
191+
WindowFunction::AggregateFunction(AggregateFunction::Min)
192+
);
193+
assert_eq!(
194+
WindowFunction::from_str("avg")?,
195+
WindowFunction::AggregateFunction(AggregateFunction::Avg)
196+
);
197+
assert_eq!(
198+
WindowFunction::from_str("cum_dist")?,
199+
WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::CumeDist)
200+
);
201+
Ok(())
202+
}
203+
}

0 commit comments

Comments
 (0)