Skip to content

Commit bddd426

Browse files
author
Jiayu Liu
committed
field rename
1 parent a6d18ef commit bddd426

File tree

8 files changed

+72
-35
lines changed

8 files changed

+72
-35
lines changed

ballista/rust/core/proto/ballista.proto

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,8 +312,8 @@ message AggregateNode {
312312

313313
message WindowNode {
314314
LogicalPlanNode input = 1;
315-
repeated LogicalExprNode group_expr = 2;
316-
repeated LogicalExprNode aggr_expr = 3;
315+
repeated LogicalExprNode partition_by_expr = 2;
316+
repeated LogicalExprNode order_by_expr = 3;
317317
}
318318

319319
enum JoinType {

ballista/rust/core/src/serde/logical_plan/from_proto.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,18 +78,18 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
7878
}
7979
LogicalPlanType::Window(window) => {
8080
let input: LogicalPlan = convert_box_required!(window.input)?;
81-
let group_expr = window
82-
.group_expr
81+
let partition_by_expr = window
82+
.partition_by_expr
8383
.iter()
8484
.map(|expr| expr.try_into())
8585
.collect::<Result<Vec<_>, _>>()?;
86-
let aggr_expr = window
87-
.aggr_expr
86+
let order_by_expr = window
87+
.order_by_expr
8888
.iter()
8989
.map(|expr| expr.try_into())
9090
.collect::<Result<Vec<_>, _>>()?;
9191
LogicalPlanBuilder::from(&input)
92-
.window(group_expr, aggr_expr)?
92+
.window(partition_by_expr, order_by_expr)?
9393
.build()
9494
.map_err(|e| e.into())
9595
}

ballista/rust/core/src/serde/logical_plan/to_proto.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -775,20 +775,20 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
775775
}
776776
LogicalPlan::Window {
777777
input,
778-
group_expr,
779-
aggr_expr,
778+
partition_by_expr,
779+
order_by_expr,
780780
..
781781
} => {
782782
let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?;
783783
Ok(protobuf::LogicalPlanNode {
784784
logical_plan_type: Some(LogicalPlanType::Window(Box::new(
785785
protobuf::WindowNode {
786786
input: Some(Box::new(input)),
787-
group_expr: group_expr
787+
partition_by_expr: partition_by_expr
788788
.iter()
789789
.map(|expr| expr.try_into())
790790
.collect::<Result<Vec<_>, BallistaError>>()?,
791-
aggr_expr: aggr_expr
791+
order_by_expr: order_by_expr
792792
.iter()
793793
.map(|expr| expr.try_into())
794794
.collect::<Result<Vec<_>, BallistaError>>()?,

datafusion/src/logical_plan/builder.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -289,12 +289,29 @@ impl LogicalPlanBuilder {
289289
}))
290290
}
291291

292+
/// Apply a window: partition by the `partition_by_expr` expressions
293+
/// and calculating the window spec.
292294
pub fn window(
293295
&self,
294-
group_expr: impl IntoIterator<Item = Expr>,
295-
aggr_expr: impl IntoIterator<Item = Expr>,
296+
partition_by_expr: impl IntoIterator<Item = Expr>,
297+
order_by_expr: impl IntoIterator<Item = Expr>,
296298
) -> Result<Self> {
297-
unimplemented!()
299+
let partition_by_expr = partition_by_expr.into_iter().collect::<Vec<Expr>>();
300+
let order_by_expr = order_by_expr.into_iter().collect::<Vec<Expr>>();
301+
302+
let all_expr = partition_by_expr.iter().chain(order_by_expr.iter());
303+
304+
validate_unique_names("Windows", all_expr.clone(), self.plan.schema())?;
305+
306+
let window_schema =
307+
DFSchema::new(exprlist_to_fields(all_expr, self.plan.schema())?)?;
308+
309+
Ok(Self::from(&LogicalPlan::Window {
310+
input: Arc::new(self.plan.clone()),
311+
partition_by_expr,
312+
order_by_expr,
313+
schema: DFSchemaRef::new(window_schema),
314+
}))
298315
}
299316

300317
/// Apply an aggregate: grouping on the `group_expr` expressions

datafusion/src/logical_plan/plan.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,10 @@ pub enum LogicalPlan {
8787
Window {
8888
/// The incoming logical plan
8989
input: Arc<LogicalPlan>,
90-
/// Grouping expressions
91-
group_expr: Vec<Expr>,
92-
/// Aggregate expressions
93-
aggr_expr: Vec<Expr>,
90+
/// Partition by expressions
91+
partition_by_expr: Vec<Expr>,
92+
/// Order by expressions
93+
order_by_expr: Vec<Expr>,
9494
/// The schema description of the aggregate output
9595
schema: DFSchemaRef,
9696
},
@@ -302,11 +302,15 @@ impl LogicalPlan {
302302
_ => vec![],
303303
},
304304
LogicalPlan::Window {
305-
group_expr,
306-
aggr_expr,
305+
partition_by_expr,
306+
order_by_expr,
307307
..
308+
} => {
309+
let mut result = partition_by_expr.clone();
310+
result.extend(order_by_expr.clone());
311+
result
308312
}
309-
| LogicalPlan::Aggregate {
313+
LogicalPlan::Aggregate {
310314
group_expr,
311315
aggr_expr,
312316
..
@@ -688,13 +692,13 @@ impl LogicalPlan {
688692
..
689693
} => write!(f, "Filter: {:?}", expr),
690694
LogicalPlan::Window {
691-
ref group_expr,
692-
ref aggr_expr,
695+
ref partition_by_expr,
696+
ref order_by_expr,
693697
..
694698
} => write!(
695699
f,
696-
"Window: groupBy=[{:?}], aggr=[{:?}]",
697-
group_expr, aggr_expr
700+
"Window: partitionBy=[{:?}], orderBy=[{:?}]",
701+
partition_by_expr, order_by_expr
698702
),
699703
LogicalPlan::Aggregate {
700704
ref group_expr,

datafusion/src/optimizer/projection_push_down.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -196,11 +196,22 @@ fn optimize_plan(
196196
LogicalPlan::Window {
197197
schema,
198198
input,
199-
group_expr,
200-
aggr_expr,
199+
partition_by_expr,
200+
order_by_expr,
201201
..
202-
}
203-
| LogicalPlan::Aggregate {
202+
} => Ok(LogicalPlan::Window {
203+
partition_by_expr: partition_by_expr.clone(),
204+
order_by_expr: order_by_expr.clone(),
205+
input: Arc::new(optimize_plan(
206+
optimizer,
207+
&input,
208+
&new_required_columns,
209+
true,
210+
execution_props,
211+
)?),
212+
schema: schema.clone(),
213+
}),
214+
LogicalPlan::Aggregate {
204215
schema,
205216
input,
206217
group_expr,

datafusion/src/optimizer/utils.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,10 +190,13 @@ pub fn from_plan(
190190
}),
191191
},
192192
LogicalPlan::Window {
193-
group_expr, schema, ..
193+
partition_by_expr,
194+
order_by_expr,
195+
schema,
196+
..
194197
} => Ok(LogicalPlan::Window {
195-
group_expr: expr[0..group_expr.len()].to_vec(),
196-
aggr_expr: expr[group_expr.len()..].to_vec(),
198+
partition_by_expr: expr[0..partition_by_expr.len()].to_vec(),
199+
order_by_expr: expr[order_by_expr.len()..].to_vec(),
197200
input: Arc::new(inputs[0].clone()),
198201
schema: schema.clone(),
199202
}),

datafusion/src/physical_plan/planner.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,13 @@ impl DefaultPhysicalPlanner {
141141
} => source.scan(projection, batch_size, filters, *limit),
142142
LogicalPlan::Window {
143143
input,
144-
group_expr,
145-
aggr_expr,
144+
partition_by_expr,
145+
order_by_expr,
146146
..
147+
} => {
148+
unimplemented!()
147149
}
148-
| LogicalPlan::Aggregate {
150+
LogicalPlan::Aggregate {
149151
input,
150152
group_expr,
151153
aggr_expr,

0 commit comments

Comments
 (0)