Skip to content

Commit 1723926

Browse files
author
Jiayu Liu
committed
add row number
1 parent bf5b8a5 commit 1723926

File tree

3 files changed

+106
-3
lines changed

3 files changed

+106
-3
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Defines physical expressions that can evaluated at runtime during query execution
19+
20+
use std::any::Any;
21+
use std::convert::TryFrom;
22+
use std::sync::Arc;
23+
24+
use crate::error::{DataFusionError, Result};
25+
use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
26+
use crate::scalar::ScalarValue;
27+
use arrow::compute;
28+
use arrow::datatypes::{DataType, TimeUnit};
29+
use arrow::{
30+
array::{
31+
ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
32+
LargeStringArray, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray,
33+
TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array,
34+
UInt64Array, UInt8Array,
35+
},
36+
datatypes::Field,
37+
};
38+
39+
pub struct RowNumber {
40+
name: String,
41+
expr: Arc<dyn PhysicalSortExpr>,
42+
}
43+
44+
impl RowNumber {
45+
/// Create a new MAX aggregate function
46+
pub fn new(expr: Arc<dyn PhysicalExpr>, name: String) -> Self {
47+
Self { name, expr }
48+
}
49+
}
50+
51+
impl BuiltInWindowFunctionExpr for RowNumber {
52+
/// Return a reference to Any that can be used for downcasting
53+
fn as_any(&self) -> &dyn Any {
54+
self
55+
}
56+
57+
fn field(&self) -> Result<Field> {
58+
let nullable = false;
59+
let data_type = DataType::UInt64;
60+
Ok(Field::new(&self.name, data_type, nullable))
61+
}
62+
63+
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
64+
vec![self.expr.clone()]
65+
}
66+
67+
fn name(&self) -> &str {
68+
&self.name
69+
}
70+
}

datafusion/src/physical_plan/mod.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,22 @@ pub trait WindowExpr: Send + Sync + Debug {
459459
}
460460
}
461461

462+
/// A window expression that is a built-in window function
463+
pub trait BuiltInWindowFunctionExpr: Send + Sync + Debug {
464+
/// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be
465+
/// downcast to a specific implementation.
466+
fn as_any(&self) -> &dyn Any;
467+
468+
/// the field of the final result of this aggregation.
469+
fn field(&self) -> Result<Field>;
470+
471+
/// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default
472+
/// implementation returns placeholder text.
473+
fn name(&self) -> &str {
474+
"BuiltInWindowFunctionExpr: default name"
475+
}
476+
}
477+
462478
/// An accumulator represents a stateful object that lives throughout the evaluation of multiple rows and
463479
/// generically accumulates values. An accumulator knows how to:
464480
/// * update its state from inputs via `update`

datafusion/src/physical_plan/windows.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
use crate::error::{DataFusionError, Result};
2121
use crate::physical_plan::{
2222
aggregates, expressions::Column, window_functions::WindowFunction, AggregateExpr,
23-
Distribution, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
24-
SendableRecordBatchStream, WindowExpr,
23+
BuiltInWindowFunctionExpr, Distribution, ExecutionPlan, Partitioning, PhysicalExpr,
24+
RecordBatchStream, SendableRecordBatchStream, WindowExpr,
2525
};
2626
use arrow::{
2727
array::{Array, UInt32Builder},
@@ -80,7 +80,24 @@ pub fn create_window_expr(
8080

8181
/// A window expr that takes the form of a built in window function
8282
#[derive(Debug)]
83-
pub struct BuiltInWindowExpr {}
83+
pub struct BuiltInWindowExpr {
84+
window: Arc<dyn BuiltInWindowFunctionExpr>,
85+
}
86+
87+
impl WindowExpr for BuiltInWindowExpr {
88+
/// Return a reference to Any that can be used for downcasting
89+
fn as_any(&self) -> &dyn Any {
90+
self
91+
}
92+
93+
fn name(&self) -> &str {
94+
&self.window.name()
95+
}
96+
97+
fn field(&self) -> Result<Field> {
98+
self.window.field()
99+
}
100+
}
84101

85102
/// A window expr that takes the form of an aggregate function
86103
#[derive(Debug)]

0 commit comments

Comments
 (0)