Skip to content

Commit 7fb3640

Browse files
author
Jiayu Liu
committed
row number done
1 parent 1723926 commit 7fb3640

File tree

5 files changed

+94
-45
lines changed

5 files changed

+94
-45
lines changed

datafusion/src/physical_plan/expressions/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ mod min_max;
4141
mod negative;
4242
mod not;
4343
mod nullif;
44+
mod row_number;
4445
mod sum;
4546
mod try_cast;
4647

@@ -58,6 +59,7 @@ pub use min_max::{Max, Min};
5859
pub use negative::{negative, NegativeExpr};
5960
pub use not::{not, NotExpr};
6061
pub use nullif::{nullif_func, SUPPORTED_NULLIF_TYPES};
62+
pub use row_number::RowNumber;
6163
pub use sum::{sum_return_type, Sum};
6264
pub use try_cast::{try_cast, TryCastExpr};
6365
/// returns the name of the state

datafusion/src/physical_plan/expressions/row_number.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ use std::convert::TryFrom;
2222
use std::sync::Arc;
2323

2424
use crate::error::{DataFusionError, Result};
25-
use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
25+
use crate::physical_plan::{
26+
Accumulator, AggregateExpr, BuiltInWindowFunctionExpr, PhysicalExpr,
27+
};
2628
use crate::scalar::ScalarValue;
2729
use arrow::compute;
2830
use arrow::datatypes::{DataType, TimeUnit};
@@ -36,15 +38,16 @@ use arrow::{
3638
datatypes::Field,
3739
};
3840

41+
/// row_number expression
42+
#[derive(Debug)]
3943
pub struct RowNumber {
4044
name: String,
41-
expr: Arc<dyn PhysicalSortExpr>,
4245
}
4346

4447
impl RowNumber {
4548
/// Create a new MAX aggregate function
46-
pub fn new(expr: Arc<dyn PhysicalExpr>, name: String) -> Self {
47-
Self { name, expr }
49+
pub fn new(name: String) -> Self {
50+
Self { name }
4851
}
4952
}
5053

@@ -61,7 +64,7 @@ impl BuiltInWindowFunctionExpr for RowNumber {
6164
}
6265

6366
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
64-
vec![self.expr.clone()]
67+
vec![]
6568
}
6669

6770
fn name(&self) -> &str {

datafusion/src/physical_plan/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,10 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + Debug {
468468
/// the field of the final result of this aggregation.
469469
fn field(&self) -> Result<Field>;
470470

471+
/// expressions that are passed to the Accumulator.
472+
/// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
473+
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
474+
471475
/// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default
472476
/// implementation returns placeholder text.
473477
fn name(&self) -> &str {

datafusion/src/physical_plan/window_functions.rs

Lines changed: 46 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -143,49 +143,64 @@ impl FromStr for BuiltInWindowFunction {
143143

144144
/// Returns the datatype of the window function
145145
pub fn return_type(fun: &WindowFunction, arg_types: &[DataType]) -> Result<DataType> {
146+
match fun {
147+
WindowFunction::AggregateFunction(fun) => aggregates::return_type(fun, arg_types),
148+
WindowFunction::BuiltInWindowFunction(fun) => {
149+
return_type_for_built_in(fun, arg_types)
150+
}
151+
}
152+
}
153+
154+
/// Returns the datatype of the built-in window function
155+
pub(super) fn return_type_for_built_in(
156+
fun: &BuiltInWindowFunction,
157+
arg_types: &[DataType],
158+
) -> Result<DataType> {
146159
// Note that this function *must* return the same type that the respective physical expression returns
147160
// or the execution panics.
148161

149162
// verify that this is a valid set of data types for this function
150-
data_types(arg_types, &signature(fun))?;
163+
data_types(arg_types, &signature_for_built_in(fun))?;
151164

152165
match fun {
153-
WindowFunction::AggregateFunction(fun) => aggregates::return_type(fun, arg_types),
154-
WindowFunction::BuiltInWindowFunction(fun) => match fun {
155-
BuiltInWindowFunction::RowNumber
156-
| BuiltInWindowFunction::Rank
157-
| BuiltInWindowFunction::DenseRank => Ok(DataType::UInt64),
158-
BuiltInWindowFunction::PercentRank | BuiltInWindowFunction::CumeDist => {
159-
Ok(DataType::Float64)
160-
}
161-
BuiltInWindowFunction::Ntile => Ok(DataType::UInt32),
162-
BuiltInWindowFunction::Lag
163-
| BuiltInWindowFunction::Lead
164-
| BuiltInWindowFunction::FirstValue
165-
| BuiltInWindowFunction::LastValue
166-
| BuiltInWindowFunction::NthValue => Ok(arg_types[0].clone()),
167-
},
166+
BuiltInWindowFunction::RowNumber
167+
| BuiltInWindowFunction::Rank
168+
| BuiltInWindowFunction::DenseRank => Ok(DataType::UInt64),
169+
BuiltInWindowFunction::PercentRank | BuiltInWindowFunction::CumeDist => {
170+
Ok(DataType::Float64)
171+
}
172+
BuiltInWindowFunction::Ntile => Ok(DataType::UInt32),
173+
BuiltInWindowFunction::Lag
174+
| BuiltInWindowFunction::Lead
175+
| BuiltInWindowFunction::FirstValue
176+
| BuiltInWindowFunction::LastValue
177+
| BuiltInWindowFunction::NthValue => Ok(arg_types[0].clone()),
168178
}
169179
}
170180

171181
/// the signatures supported by the function `fun`.
172-
fn signature(fun: &WindowFunction) -> Signature {
173-
// note: the physical expression must accept the type returned by this function or the execution panics.
182+
pub fn signature(fun: &WindowFunction) -> Signature {
174183
match fun {
175184
WindowFunction::AggregateFunction(fun) => aggregates::signature(fun),
176-
WindowFunction::BuiltInWindowFunction(fun) => match fun {
177-
BuiltInWindowFunction::RowNumber
178-
| BuiltInWindowFunction::Rank
179-
| BuiltInWindowFunction::DenseRank
180-
| BuiltInWindowFunction::PercentRank
181-
| BuiltInWindowFunction::CumeDist => Signature::Any(0),
182-
BuiltInWindowFunction::Lag
183-
| BuiltInWindowFunction::Lead
184-
| BuiltInWindowFunction::FirstValue
185-
| BuiltInWindowFunction::LastValue => Signature::Any(1),
186-
BuiltInWindowFunction::Ntile => Signature::Exact(vec![DataType::UInt64]),
187-
BuiltInWindowFunction::NthValue => Signature::Any(2),
188-
},
185+
WindowFunction::BuiltInWindowFunction(fun) => signature_for_built_in(fun),
186+
}
187+
}
188+
189+
/// the signatures supported by the built-in window function `fun`.
190+
pub(super) fn signature_for_built_in(fun: &BuiltInWindowFunction) -> Signature {
191+
// note: the physical expression must accept the type returned by this function or the execution panics.
192+
match fun {
193+
BuiltInWindowFunction::RowNumber
194+
| BuiltInWindowFunction::Rank
195+
| BuiltInWindowFunction::DenseRank
196+
| BuiltInWindowFunction::PercentRank
197+
| BuiltInWindowFunction::CumeDist => Signature::Any(0),
198+
BuiltInWindowFunction::Lag
199+
| BuiltInWindowFunction::Lead
200+
| BuiltInWindowFunction::FirstValue
201+
| BuiltInWindowFunction::LastValue => Signature::Any(1),
202+
BuiltInWindowFunction::Ntile => Signature::Exact(vec![DataType::UInt64]),
203+
BuiltInWindowFunction::NthValue => Signature::Any(2),
189204
}
190205
}
191206

datafusion/src/physical_plan/windows.rs

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,15 @@
1919
2020
use crate::error::{DataFusionError, Result};
2121
use crate::physical_plan::{
22-
aggregates, expressions::Column, window_functions::WindowFunction, AggregateExpr,
23-
BuiltInWindowFunctionExpr, Distribution, ExecutionPlan, Partitioning, PhysicalExpr,
24-
RecordBatchStream, SendableRecordBatchStream, WindowExpr,
22+
aggregates,
23+
expressions::RowNumber,
24+
type_coercion::coerce,
25+
window_functions::{
26+
return_type_for_built_in, signature_for_built_in, BuiltInWindowFunction,
27+
WindowFunction,
28+
},
29+
AggregateExpr, BuiltInWindowFunctionExpr, Distribution, ExecutionPlan, Partitioning,
30+
PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, WindowExpr,
2531
};
2632
use arrow::{
2733
array::{Array, UInt32Builder},
@@ -69,12 +75,31 @@ pub fn create_window_expr(
6975
name,
7076
)?,
7177
})),
72-
WindowFunction::BuiltInWindowFunction(fun) => {
73-
Err(DataFusionError::NotImplemented(format!(
74-
"window function with {:?} not implemented",
75-
fun
76-
)))
77-
}
78+
WindowFunction::BuiltInWindowFunction(fun) => Ok(Arc::new(BuiltInWindowExpr {
79+
window: create_built_in_window_expr(fun, args, input_schema, name)?,
80+
})),
81+
}
82+
}
83+
84+
fn create_built_in_window_expr(
85+
fun: &BuiltInWindowFunction,
86+
args: &[Arc<dyn PhysicalExpr>],
87+
input_schema: &Schema,
88+
name: String,
89+
) -> Result<Arc<dyn BuiltInWindowFunctionExpr>> {
90+
// let arg_types = args
91+
// .iter()
92+
// .map(|e| e.data_type(input_schema))
93+
// .collect::<Result<Vec<_>>>()?;
94+
95+
// let return_type = return_type_for_built_in(&fun, &arg_types)?;
96+
97+
match fun {
98+
BuiltInWindowFunction::RowNumber => Ok(Arc::new(RowNumber::new(name))),
99+
_ => Err(DataFusionError::NotImplemented(format!(
100+
"window function with {:?} not implemented",
101+
fun
102+
))),
78103
}
79104
}
80105

0 commit comments

Comments
 (0)