Skip to content

Commit 4083048

Browse files
author
Jiayu Liu
committed
intake more comments
1 parent e412208 commit 4083048

File tree

4 files changed

+19
-16
lines changed

4 files changed

+19
-16
lines changed

datafusion/src/execution/context.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1275,7 +1275,9 @@ mod tests {
12751275
4,
12761276
)
12771277
.await?;
1278-
// result in one batch
1278+
// result in one batch, although e.g. having 2 batches do not change
1279+
// result semantics, having a len=1 assertion upfront keeps surprises
1280+
// at bay
12791281
assert_eq!(results.len(), 1);
12801282

12811283
let expected = vec![

datafusion/src/physical_plan/expressions/row_number.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,12 @@ use std::sync::Arc;
2929

3030
/// row_number expression
3131
#[derive(Debug)]
32-
pub struct RowNumber {
33-
name: String,
34-
}
32+
pub struct RowNumber {}
3533

3634
impl RowNumber {
3735
/// Create a new ROW_NUMBER function
38-
pub fn new(name: String) -> Self {
39-
Self { name }
36+
pub fn new() -> Self {
37+
Self {}
4038
}
4139
}
4240

@@ -49,15 +47,15 @@ impl BuiltInWindowFunctionExpr for RowNumber {
4947
fn field(&self) -> Result<Field> {
5048
let nullable = false;
5149
let data_type = DataType::UInt64;
52-
Ok(Field::new(&self.name, data_type, nullable))
50+
Ok(Field::new(&self.name(), data_type, nullable))
5351
}
5452

5553
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
5654
vec![]
5755
}
5856

5957
fn name(&self) -> &str {
60-
&self.name
58+
"row_number"
6159
}
6260

6361
fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
@@ -71,7 +69,7 @@ struct RowNumberAccumulator {
7169
}
7270

7371
impl RowNumberAccumulator {
74-
/// new count accumulator
72+
/// new row_number accumulator
7573
pub fn new() -> Self {
7674
// row number is 1 based
7775
Self { row_number: 1 }
@@ -91,6 +89,8 @@ impl WindowAccumulator for RowNumberAccumulator {
9189
_values: &[ArrayRef],
9290
) -> Result<Option<ArrayRef>> {
9391
let new_row_number = self.row_number + (num_rows as u64);
92+
// TODO: probably would be nice to have a (optimized) kernel for this at some point to
93+
// generate an array like this.
9494
let result = UInt64Array::from_iter_values(self.row_number..new_row_number);
9595
self.row_number = new_row_number;
9696
Ok(Some(Arc::new(result)))
@@ -116,7 +116,7 @@ mod tests {
116116
let schema = Schema::new(vec![Field::new("arr", DataType::Boolean, false)]);
117117
let batch = RecordBatch::try_new(Arc::new(schema), vec![arr])?;
118118

119-
let row_number = Arc::new(RowNumber::new("row_number".to_owned()));
119+
let row_number = Arc::new(RowNumber::new());
120120

121121
let mut acc = row_number.create_accumulator()?;
122122
let expr = row_number.expressions();
@@ -147,7 +147,7 @@ mod tests {
147147
let schema = Schema::new(vec![Field::new("arr", DataType::Boolean, false)]);
148148
let batch = RecordBatch::try_new(Arc::new(schema), vec![arr])?;
149149

150-
let row_number = Arc::new(RowNumber::new("row_number".to_owned()));
150+
let row_number = Arc::new(RowNumber::new());
151151

152152
let mut acc = row_number.create_accumulator()?;
153153
let expr = row_number.expressions();

datafusion/src/physical_plan/windows.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,10 @@ fn create_built_in_window_expr(
8484
fun: &BuiltInWindowFunction,
8585
_args: &[Arc<dyn PhysicalExpr>],
8686
_input_schema: &Schema,
87-
name: String,
87+
_name: String,
8888
) -> Result<Arc<dyn BuiltInWindowFunctionExpr>> {
8989
match fun {
90-
BuiltInWindowFunction::RowNumber => Ok(Arc::new(RowNumber::new(name))),
90+
BuiltInWindowFunction::RowNumber => Ok(Arc::new(RowNumber::new())),
9191
_ => Err(DataFusionError::NotImplemented(format!(
9292
"Window function with {:?} not yet implemented",
9393
fun
@@ -330,8 +330,7 @@ fn window_aggregate_batch(
330330
.collect::<Result<Vec<_>>>()
331331
}
332332

333-
/// returns a vector of ArrayRefs, where each entry corresponds to either the
334-
/// final value (mode = Final) or states (mode = Partial)
333+
/// returns a vector of ArrayRefs, where each entry corresponds to one window expr
335334
fn finalize_window_aggregation(
336335
window_accumulators: &[WindowAccumulatorItem],
337336
) -> Result<Vec<Option<ScalarValue>>> {

datafusion/tests/sql.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -809,7 +809,9 @@ async fn csv_query_window_with_empty_over() -> Result<()> {
809809
count(c3) over (), \
810810
max(c3) over (), \
811811
min(c3) over () \
812-
from aggregate_test_100 limit 5";
812+
from aggregate_test_100 \
813+
order by c1 \
814+
limit 5";
813815
let actual = execute(&mut ctx, sql).await;
814816
let expected = vec![
815817
vec!["2", "1", "781", "7.81", "100", "125", "-117"],

0 commit comments

Comments
 (0)