Skip to content

Commit 5fcdb8f

Browse files
author
Jiayu Liu
committed
adding window spec
1 parent 49f0694 commit 5fcdb8f

File tree

8 files changed

+156
-19
lines changed

8 files changed

+156
-19
lines changed

ballista/rust/core/proto/ballista.proto

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,42 @@ message WindowNode {
314314
LogicalPlanNode input = 1;
315315
repeated LogicalExprNode partition_by_expr = 2;
316316
repeated LogicalExprNode order_by_expr = 3;
317+
// "optional" keyword is stable in protoc 3.15 but prost is still on 3.14 (see https://github.com/danburkert/prost/issues/430)
318+
// this syntax is ugly but is binary compatible with the "optional" keyword (see https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3)
319+
oneof window_frame {
320+
WindowFrame frame = 4;
321+
}
322+
}
323+
324+
enum WindowFrameUnits {
325+
ROWS = 0;
326+
RANGE = 1;
327+
GROUPS = 2;
328+
}
329+
330+
message WindowFrame {
331+
WindowFrameUnits window_frame_units = 1;
332+
WindowFrameBound start_bound = 2;
333+
// "optional" keyword is stable in protoc 3.15 but prost is still on 3.14 (see https://github.com/danburkert/prost/issues/430)
334+
// this syntax is ugly but is binary compatible with the "optional" keyword (see https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3)
335+
oneof end_bound {
336+
WindowFrameBound bound = 3;
337+
}
338+
}
339+
340+
enum WindowFrameBoundType {
341+
CURRENT_ROW = 0;
342+
PRECEDING = 1;
343+
FOLLOWING = 2;
344+
}
345+
346+
message WindowFrameBound {
347+
WindowFrameBoundType window_frame_bound_type = 1;
348+
// "optional" keyword is stable in protoc 3.15 but prost is still on 3.14 (see https://github.com/danburkert/prost/issues/430)
349+
// this syntax is ugly but is binary compatible with the "optional" keyword (see https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3)
350+
oneof bound_value {
351+
uint64 value = 2;
352+
}
317353
}
318354

319355
enum JoinType {

ballista/rust/core/src/serde/logical_plan/from_proto.rs

Lines changed: 73 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@
1717

1818
//! Serde code to convert from protocol buffers to Rust data structures.
1919
20+
use crate::error::BallistaError;
21+
use crate::serde::{proto_error, protobuf};
22+
use crate::{convert_box_required, convert_required};
23+
use sqlparser::ast::{WindowFrame, WindowFrameBound, WindowFrameUnits};
2024
use std::{
2125
convert::{From, TryInto},
2226
unimplemented,
2327
};
2428

25-
use crate::error::BallistaError;
26-
use crate::serde::{proto_error, protobuf};
27-
use crate::{convert_box_required, convert_required};
28-
2929
use arrow::datatypes::{DataType, Field, Schema};
3030
use datafusion::logical_plan::{
3131
abs, acos, asin, atan, ceil, cos, exp, floor, ln, log10, log2, round, signum, sin,
@@ -88,8 +88,10 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
8888
.iter()
8989
.map(|expr| expr.try_into())
9090
.collect::<Result<Vec<_>, _>>()?;
91+
// FIXME parse window frame
92+
let window_frame = None;
9193
LogicalPlanBuilder::from(&input)
92-
.window(partition_by_expr, order_by_expr)?
94+
.window(partition_by_expr, order_by_expr, window_frame)?
9395
.build()
9496
.map_err(|e| e.into())
9597
}
@@ -1260,6 +1262,72 @@ fn parse_optional_expr(
12601262
}
12611263
}
12621264

1265+
impl From<protobuf::WindowFrameUnits> for WindowFrameUnits {
1266+
fn from(units: protobuf::WindowFrameUnits) -> Self {
1267+
match units {
1268+
protobuf::WindowFrameUnits::Rows => WindowFrameUnits::Rows,
1269+
protobuf::WindowFrameUnits::Range => WindowFrameUnits::Range,
1270+
protobuf::WindowFrameUnits::Groups => WindowFrameUnits::Groups,
1271+
}
1272+
}
1273+
}
1274+
1275+
impl TryFrom<protobuf::WindowFrameBound> for WindowFrameBound {
1276+
type Error = BallistaError;
1277+
1278+
fn try_from(bound: protobuf::WindowFrameBound) -> Result<Self, Self::Error> {
1279+
let bound_type = protobuf::WindowFrameBoundType::from_i32(bound.window_frame_bound_type).ok_or_else(|| {
1280+
proto_error(format!(
1281+
"Received a WindowFrameBound message with unknown WindowFrameBoundType {}",
1282+
bound.window_frame_bound_type
1283+
))
1284+
})?.into();
1285+
match bound_type {
1286+
protobuf::WindowFrameBoundType::CurrentRow => {
1287+
Ok(WindowFrameBound::CurrentRow)
1288+
}
1289+
protobuf::WindowFrameBoundType::Preceding => {
1290+
// FIXME implement bound value parsing
1291+
Ok(WindowFrameBound::Preceding(Some(1)))
1292+
}
1293+
protobuf::WindowFrameBoundType::Following => {
1294+
// FIXME implement bound value parsing
1295+
Ok(WindowFrameBound::Following(Some(1)))
1296+
}
1297+
}
1298+
}
1299+
}
1300+
1301+
impl TryFrom<protobuf::WindowFrame> for WindowFrame {
1302+
type Error = BallistaError;
1303+
1304+
fn try_from(window: protobuf::WindowFrame) -> Result<Self, Self::Error> {
1305+
let units = protobuf::WindowFrameUnits::from_i32(window.window_frame_units)
1306+
.ok_or_else(|| {
1307+
proto_error(format!(
1308+
"Received a WindowFrame message with unknown WindowFrameUnits {}",
1309+
window.window_frame_units
1310+
))
1311+
})?
1312+
.into();
1313+
let start_bound = window
1314+
.start_bound
1315+
.ok_or_else(|| {
1316+
proto_error(
1317+
"Received a WindowFrame message with no start_bound".to_owned(),
1318+
)
1319+
})?
1320+
.try_into()?;
1321+
// FIXME parse end bound
1322+
let end_bound = None;
1323+
Ok(WindowFrame {
1324+
units,
1325+
start_bound,
1326+
end_bound,
1327+
})
1328+
}
1329+
}
1330+
12631331
impl From<protobuf::AggregateFunction> for AggregateFunction {
12641332
fn from(aggr_function: protobuf::AggregateFunction) -> Self {
12651333
match aggr_function {

ballista/rust/core/src/serde/logical_plan/to_proto.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use std::{
2626

2727
use crate::datasource::DfTableAdapter;
2828
use crate::serde::{protobuf, BallistaError};
29-
3029
use arrow::datatypes::{DataType, Schema};
3130
use datafusion::datasource::CsvFile;
3231
use datafusion::logical_plan::{Expr, JoinType, LogicalPlan};
@@ -37,6 +36,7 @@ use protobuf::{
3736
arrow_type, logical_expr_node::ExprType, scalar_type, DateUnit, Field,
3837
PrimitiveScalarType, ScalarListValue, ScalarType,
3938
};
39+
use sqlparser::ast::{WindowFrame, WindowFrameBound, WindowFrameUnits};
4040

4141
use super::super::proto_error;
4242
use datafusion::physical_plan::functions::BuiltinScalarFunction;
@@ -777,9 +777,12 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
777777
input,
778778
partition_by_expr,
779779
order_by_expr,
780+
window_frame,
780781
..
781782
} => {
782783
let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?;
784+
// FIXME: implement
785+
let window_frame = None;
783786
Ok(protobuf::LogicalPlanNode {
784787
logical_plan_type: Some(LogicalPlanType::Window(Box::new(
785788
protobuf::WindowNode {
@@ -792,6 +795,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
792795
.iter()
793796
.map(|expr| expr.try_into())
794797
.collect::<Result<Vec<_>, BallistaError>>()?,
798+
window_frame,
795799
},
796800
))),
797801
})
@@ -1252,6 +1256,32 @@ impl From<&BuiltInWindowFunction> for protobuf::BuiltInWindowFunction {
12521256
}
12531257
}
12541258

1259+
impl From<WindowFrameUnits> for protobuf::WindowFrameUnits {
1260+
fn from(units: WindowFrameUnits) -> Self {
1261+
match units {
1262+
WindowFrameUnits::Rows => protobuf::WindowFrameUnits::Rows,
1263+
WindowFrameUnits::Range => protobuf::WindowFrameUnits::Range,
1264+
WindowFrameUnits::Groups => protobuf::WindowFrameUnits::Groups,
1265+
}
1266+
}
1267+
}
1268+
1269+
impl TryFrom<WindowFrameBound> for protobuf::WindowFrameBound {
1270+
type Error = BallistaError;
1271+
1272+
fn try_from(bound: WindowFrameBound) -> Result<Self, Self::Error> {
1273+
unimplemented!("not implemented")
1274+
}
1275+
}
1276+
1277+
impl TryFrom<WindowFrame> for protobuf::WindowFrame {
1278+
type Error = BallistaError;
1279+
1280+
fn try_from(window: WindowFrame) -> Result<Self, Self::Error> {
1281+
unimplemented!("not implemented")
1282+
}
1283+
}
1284+
12551285
impl TryFrom<&arrow::datatypes::DataType> for protobuf::ScalarType {
12561286
type Error = BallistaError;
12571287
fn try_from(value: &arrow::datatypes::DataType) -> Result<Self, Self::Error> {

datafusion/src/logical_plan/builder.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,18 @@ use arrow::{
2424
record_batch::RecordBatch,
2525
};
2626

27+
use super::dfschema::ToDFSchema;
28+
use super::{
29+
col, exprlist_to_fields, Expr, JoinType, LogicalPlan, PlanType, StringifiedPlan,
30+
};
2731
use crate::datasource::TableProvider;
2832
use crate::error::{DataFusionError, Result};
33+
use crate::logical_plan::{DFField, DFSchema, DFSchemaRef, Partitioning};
2934
use crate::{
3035
datasource::{empty::EmptyTable, parquet::ParquetTable, CsvFile, MemTable},
3136
prelude::CsvReadOptions,
3237
};
33-
34-
use super::dfschema::ToDFSchema;
35-
use super::{
36-
col, exprlist_to_fields, Expr, JoinType, LogicalPlan, PlanType, StringifiedPlan,
37-
};
38-
use crate::logical_plan::{DFField, DFSchema, DFSchemaRef, Partitioning};
38+
use sqlparser::ast::WindowFrame;
3939
use std::collections::HashSet;
4040

4141
/// Builder for logical plans
@@ -295,21 +295,19 @@ impl LogicalPlanBuilder {
295295
&self,
296296
partition_by_expr: impl IntoIterator<Item = Expr>,
297297
order_by_expr: impl IntoIterator<Item = Expr>,
298+
window_frame: Option<WindowFrame>,
298299
) -> Result<Self> {
299300
let partition_by_expr = partition_by_expr.into_iter().collect::<Vec<Expr>>();
300301
let order_by_expr = order_by_expr.into_iter().collect::<Vec<Expr>>();
301-
302302
let all_expr = partition_by_expr.iter().chain(order_by_expr.iter());
303-
304303
validate_unique_names("Windows", all_expr.clone(), self.plan.schema())?;
305-
306304
let window_schema =
307305
DFSchema::new(exprlist_to_fields(all_expr, self.plan.schema())?)?;
308-
309306
Ok(Self::from(&LogicalPlan::Window {
310307
input: Arc::new(self.plan.clone()),
311308
partition_by_expr,
312309
order_by_expr,
310+
window_frame,
313311
schema: DFSchemaRef::new(window_schema),
314312
}))
315313
}

datafusion/src/logical_plan/expr.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,6 @@ pub enum Expr {
197197
fun: windows::WindowFunction,
198198
/// List of expressions to feed to the functions as arguments
199199
args: Vec<Expr>,
200-
// TODO windowexec
201200
},
202201
/// aggregate function
203202
AggregateUDF {

datafusion/src/logical_plan/plan.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ use std::{
2323
sync::Arc,
2424
};
2525

26-
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
27-
2826
use crate::datasource::TableProvider;
2927
use crate::sql::parser::FileType;
28+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
29+
use sqlparser::ast::WindowFrame;
3030

3131
use super::expr::Expr;
3232
use super::extension::UserDefinedLogicalNode;
@@ -91,6 +91,8 @@ pub enum LogicalPlan {
9191
partition_by_expr: Vec<Expr>,
9292
/// Order by expressions
9393
order_by_expr: Vec<Expr>,
94+
/// Window Frame
95+
window_frame: Option<WindowFrame>,
9496
/// The schema description of the window output
9597
schema: DFSchemaRef,
9698
},

datafusion/src/optimizer/projection_push_down.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ fn optimize_plan(
198198
input,
199199
partition_by_expr,
200200
order_by_expr,
201+
window_frame,
201202
..
202203
} => {
203204
let new_schema = DFSchema::new(
@@ -211,6 +212,7 @@ fn optimize_plan(
211212
Ok(LogicalPlan::Window {
212213
partition_by_expr: partition_by_expr.clone(),
213214
order_by_expr: order_by_expr.clone(),
215+
window_frame: window_frame.clone(),
214216
input: Arc::new(optimize_plan(
215217
optimizer,
216218
&input,

datafusion/src/optimizer/utils.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,11 +192,13 @@ pub fn from_plan(
192192
LogicalPlan::Window {
193193
partition_by_expr,
194194
order_by_expr,
195+
window_frame,
195196
schema,
196197
..
197198
} => Ok(LogicalPlan::Window {
198199
partition_by_expr: expr[0..partition_by_expr.len()].to_vec(),
199200
order_by_expr: expr[order_by_expr.len()..].to_vec(),
201+
window_frame: window_frame.clone(),
200202
input: Arc::new(inputs[0].clone()),
201203
schema: schema.clone(),
202204
}),

0 commit comments

Comments
 (0)