-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: serialize user-defined window functions to proto #13421
Changes from 10 commits
603a8a0
aca8085
7d0176a
e579ec0
46f4212
880b3f2
30538fb
46a00e8
496151a
74e547b
9caec41
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,14 +19,14 @@ use std::sync::Arc; | |
|
||
#[cfg(feature = "parquet")] | ||
use datafusion::datasource::file_format::parquet::ParquetSink; | ||
use datafusion::physical_expr::window::SlidingAggregateWindowExpr; | ||
use datafusion::physical_expr::window::{BuiltInWindowExpr, SlidingAggregateWindowExpr}; | ||
use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr}; | ||
use datafusion::physical_plan::expressions::{ | ||
BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr, | ||
Literal, NegativeExpr, NotExpr, TryCastExpr, | ||
}; | ||
use datafusion::physical_plan::udaf::AggregateFunctionExpr; | ||
use datafusion::physical_plan::windows::PlainAggregateWindowExpr; | ||
use datafusion::physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr}; | ||
use datafusion::physical_plan::{Partitioning, PhysicalExpr, WindowExpr}; | ||
use datafusion::{ | ||
datasource::{ | ||
|
@@ -68,7 +68,7 @@ pub fn serialize_physical_aggr_expr( | |
ordering_req, | ||
distinct: aggr_expr.is_distinct(), | ||
ignore_nulls: aggr_expr.ignore_nulls(), | ||
fun_definition: (!buf.is_empty()).then_some(buf) | ||
fun_definition: (!buf.is_empty()).then_some(buf), | ||
}, | ||
)), | ||
}) | ||
|
@@ -120,6 +120,25 @@ pub fn serialize_physical_window_expr( | |
window_frame, | ||
codec, | ||
)? | ||
} else if let Some(built_in_window_expr) = expr.downcast_ref::<BuiltInWindowExpr>() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The naming of these structures is wild (the physical exprs aren't actually BuiltInWindowExprs) but that is an issue to fix for another day |
||
if let Some(expr) = built_in_window_expr | ||
.get_built_in_func_expr() | ||
.as_any() | ||
.downcast_ref::<WindowUDFExpr>() | ||
{ | ||
let mut buf = Vec::new(); | ||
codec.try_encode_udwf(expr.fun(), &mut buf)?; | ||
( | ||
physical_window_expr_node::WindowFunction::UserDefinedWindowFunction( | ||
expr.fun().name().to_string(), | ||
), | ||
(!buf.is_empty()).then_some(buf), | ||
) | ||
} else { | ||
return not_impl_err!( | ||
"User-defined window function not supported: {window_expr:?}" | ||
); | ||
} | ||
} else { | ||
return not_impl_err!("WindowExpr not supported: {window_expr:?}"); | ||
}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tested by
roundtrip_physical_plan::roundtrip_udwf_extension_codec
.