Skip to content

Commit f5df767

Browse files
committed
Reduce size of Expr
1 parent 2797cf7 commit f5df767

File tree

22 files changed

+229
-172
lines changed

22 files changed

+229
-172
lines changed

datafusion-examples/examples/advanced_udwf.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ impl WindowUDFImpl for SimplifySmoothItUdf {
190190
/// default implementation will not be called (left as `todo!()`)
191191
fn simplify(&self) -> Option<WindowFunctionSimplification> {
192192
let simplify = |window_function: WindowFunction, _: &dyn SimplifyInfo| {
193-
Ok(Expr::WindowFunction(WindowFunction {
193+
Ok(Expr::from(WindowFunction {
194194
fun: WindowFunctionDefinition::AggregateUDF(avg_udaf()),
195195
args: window_function.args,
196196
partition_by: window_function.partition_by,

datafusion/core/src/physical_planner.rs

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -587,19 +587,17 @@ impl DefaultPhysicalPlanner {
587587
};
588588

589589
let get_sort_keys = |expr: &Expr| match expr {
590-
Expr::WindowFunction(WindowFunction {
591-
ref partition_by,
592-
ref order_by,
593-
..
594-
}) => generate_sort_key(partition_by, order_by),
590+
Expr::WindowFunction(window_function) => generate_sort_key(
591+
window_function.partition_by(),
592+
window_function.order_by(),
593+
),
595594
Expr::Alias(Alias { expr, .. }) => {
596595
// Convert &Box<T> to &T
597596
match &**expr {
598-
Expr::WindowFunction(WindowFunction {
599-
ref partition_by,
600-
ref order_by,
601-
..
602-
}) => generate_sort_key(partition_by, order_by),
597+
Expr::WindowFunction(window_function) => generate_sort_key(
598+
window_function.partition_by(),
599+
window_function.order_by(),
600+
),
603601
_ => unreachable!(),
604602
}
605603
}
@@ -1520,14 +1518,15 @@ pub fn create_window_expr_with_name(
15201518
let name = name.into();
15211519
let physical_schema: &Schema = &logical_schema.into();
15221520
match e {
1523-
Expr::WindowFunction(WindowFunction {
1524-
fun,
1525-
args,
1526-
partition_by,
1527-
order_by,
1528-
window_frame,
1529-
null_treatment,
1530-
}) => {
1521+
Expr::WindowFunction(window_function) => {
1522+
let WindowFunction {
1523+
fun,
1524+
args,
1525+
partition_by,
1526+
order_by,
1527+
window_frame,
1528+
null_treatment,
1529+
} = window_function.as_ref();
15311530
let physical_args =
15321531
create_physical_exprs(args, logical_schema, execution_props)?;
15331532
let partition_by =

datafusion/core/tests/dataframe/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -888,7 +888,7 @@ async fn window_using_aggregates() -> Result<()> {
888888
vec![col("c3")],
889889
);
890890

891-
Expr::WindowFunction(w)
891+
Expr::from(w)
892892
.null_treatment(NullTreatment::IgnoreNulls)
893893
.order_by(vec![col("c2").sort(true, true), col("c3").sort(true, true)])
894894
.window_frame(WindowFrame::new_bounds(
@@ -2519,7 +2519,7 @@ async fn test_count_wildcard_on_window() -> Result<()> {
25192519
let df_results = ctx
25202520
.table("t1")
25212521
.await?
2522-
.select(vec![Expr::WindowFunction(WindowFunction::new(
2522+
.select(vec![Expr::from(WindowFunction::new(
25232523
WindowFunctionDefinition::AggregateUDF(count_udaf()),
25242524
vec![wildcard()],
25252525
))

datafusion/expr/src/expr.rs

Lines changed: 75 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use crate::utils::expr_to_columns;
2929
use crate::Volatility;
3030
use crate::{udaf, ExprSchemable, Operator, Signature, WindowFrame, WindowUDF};
3131

32+
use crate::function::WindowFunctionSimplification;
3233
use arrow::datatypes::{DataType, FieldRef};
3334
use datafusion_common::cse::{HashNode, NormalizeEq, Normalizeable};
3435
use datafusion_common::tree_node::{
@@ -297,7 +298,7 @@ pub enum Expr {
297298
/// [`ExprFunctionExt`]: crate::expr_fn::ExprFunctionExt
298299
AggregateFunction(AggregateFunction),
299300
/// Represents the call of a window function with arguments.
300-
WindowFunction(WindowFunction),
301+
WindowFunction(Box<WindowFunction>), // Boxed as it is large (272 bytes)
301302
/// Returns whether the list contains the expr value.
302303
InList(InList),
303304
/// EXISTS subquery
@@ -341,6 +342,13 @@ impl From<Column> for Expr {
341342
}
342343
}
343344

345+
/// Create an [`Expr`] from a [`WindowFunction`]
346+
impl From<WindowFunction> for Expr {
347+
fn from(value: WindowFunction) -> Self {
348+
Expr::WindowFunction(Box::new(value))
349+
}
350+
}
351+
344352
/// Create an [`Expr`] from an optional qualifier and a [`FieldRef`]. This is
345353
/// useful for creating [`Expr`] from a [`DFSchema`].
346354
///
@@ -774,6 +782,16 @@ impl WindowFunctionDefinition {
774782
WindowFunctionDefinition::AggregateUDF(fun) => fun.name(),
775783
}
776784
}
785+
786+
/// Return the the inner window simplification function, if any
787+
///
788+
/// See [`WindowFunctionSimplification`] for more information
789+
pub fn simplify(&self) -> Option<WindowFunctionSimplification> {
790+
match self {
791+
WindowFunctionDefinition::AggregateUDF(_) => None,
792+
WindowFunctionDefinition::WindowUDF(udwf) => udwf.simplify(),
793+
}
794+
}
777795
}
778796

779797
impl Display for WindowFunctionDefinition {
@@ -838,6 +856,23 @@ impl WindowFunction {
838856
null_treatment: None,
839857
}
840858
}
859+
860+
/// return the partition by expressions
861+
pub fn partition_by(&self) -> &Vec<Expr> {
862+
&self.partition_by
863+
}
864+
865+
/// return the order by expressions
866+
pub fn order_by(&self) -> &Vec<Sort> {
867+
&self.order_by
868+
}
869+
870+
/// Return the the inner window simplification function, if any
871+
///
872+
/// See [`WindowFunctionSimplification`] for more information
873+
pub fn simplify(&self) -> Option<WindowFunctionSimplification> {
874+
self.fun.simplify()
875+
}
841876
}
842877

843878
/// EXISTS expression
@@ -1893,24 +1928,24 @@ impl NormalizeEq for Expr {
18931928
_ => false,
18941929
}
18951930
}
1896-
(
1897-
Expr::WindowFunction(WindowFunction {
1931+
(Expr::WindowFunction(left), Expr::WindowFunction(right)) => {
1932+
let WindowFunction {
18981933
fun: self_fun,
18991934
args: self_args,
19001935
partition_by: self_partition_by,
19011936
order_by: self_order_by,
19021937
window_frame: self_window_frame,
19031938
null_treatment: self_null_treatment,
1904-
}),
1905-
Expr::WindowFunction(WindowFunction {
1939+
} = left.as_ref();
1940+
let WindowFunction {
19061941
fun: other_fun,
19071942
args: other_args,
19081943
partition_by: other_partition_by,
19091944
order_by: other_order_by,
19101945
window_frame: other_window_frame,
19111946
null_treatment: other_null_treatment,
1912-
}),
1913-
) => {
1947+
} = right.as_ref();
1948+
19141949
self_fun.name() == other_fun.name()
19151950
&& self_window_frame == other_window_frame
19161951
&& self_null_treatment == other_null_treatment
@@ -2150,14 +2185,15 @@ impl HashNode for Expr {
21502185
distinct.hash(state);
21512186
null_treatment.hash(state);
21522187
}
2153-
Expr::WindowFunction(WindowFunction {
2154-
fun,
2155-
args: _args,
2156-
partition_by: _partition_by,
2157-
order_by: _order_by,
2158-
window_frame,
2159-
null_treatment,
2160-
}) => {
2188+
Expr::WindowFunction(window_func) => {
2189+
let WindowFunction {
2190+
fun,
2191+
args: _args,
2192+
partition_by: _partition_by,
2193+
order_by: _order_by,
2194+
window_frame,
2195+
null_treatment,
2196+
} = window_func.as_ref();
21612197
fun.hash(state);
21622198
window_frame.hash(state);
21632199
null_treatment.hash(state);
@@ -2458,14 +2494,15 @@ impl Display for SchemaDisplay<'_> {
24582494

24592495
Ok(())
24602496
}
2461-
Expr::WindowFunction(WindowFunction {
2462-
fun,
2463-
args,
2464-
partition_by,
2465-
order_by,
2466-
window_frame,
2467-
null_treatment,
2468-
}) => {
2497+
Expr::WindowFunction(window_func) => {
2498+
let WindowFunction {
2499+
fun,
2500+
args,
2501+
partition_by,
2502+
order_by,
2503+
window_frame,
2504+
null_treatment,
2505+
} = window_func.as_ref();
24692506
write!(
24702507
f,
24712508
"{}({})",
@@ -2612,14 +2649,16 @@ impl Display for Expr {
26122649
// Expr::ScalarFunction(ScalarFunction { func, args }) => {
26132650
// write!(f, "{}", func.display_name(args).unwrap())
26142651
// }
2615-
Expr::WindowFunction(WindowFunction {
2616-
fun,
2617-
args,
2618-
partition_by,
2619-
order_by,
2620-
window_frame,
2621-
null_treatment,
2622-
}) => {
2652+
Expr::WindowFunction(window_func) => {
2653+
let WindowFunction {
2654+
fun,
2655+
args,
2656+
partition_by,
2657+
order_by,
2658+
window_frame,
2659+
null_treatment,
2660+
} = window_func.as_ref();
2661+
26232662
fmt_function(f, &fun.to_string(), false, args, true)?;
26242663

26252664
if let Some(nt) = null_treatment {
@@ -3076,6 +3115,10 @@ mod test {
30763115
// If this test fails when you change `Expr`, please try
30773116
// `Box`ing the fields to make `Expr` smaller
30783117
// See https://github.com/apache/datafusion/issues/14256 for details
3079-
assert_eq!(size_of::<Expr>(), 272);
3118+
assert_eq!(size_of::<Expr>(), 112);
3119+
assert_eq!(size_of::<ScalarValue>(), 64);
3120+
assert_eq!(size_of::<DataType>(), 24); // 3 ptrs
3121+
assert_eq!(size_of::<Vec<Expr>>(), 24);
3122+
assert_eq!(size_of::<Arc<Expr>>(), 8);
30803123
}
30813124
}

datafusion/expr/src/expr_fn.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -843,7 +843,7 @@ impl ExprFuncBuilder {
843843
udwf.window_frame =
844844
window_frame.unwrap_or(WindowFrame::new(has_order_by));
845845
udwf.null_treatment = null_treatment;
846-
Expr::WindowFunction(udwf)
846+
Expr::from(udwf)
847847
}
848848
};
849849

@@ -897,7 +897,7 @@ impl ExprFunctionExt for Expr {
897897
ExprFuncBuilder::new(Some(ExprFuncKind::Aggregate(udaf)))
898898
}
899899
Expr::WindowFunction(udwf) => {
900-
ExprFuncBuilder::new(Some(ExprFuncKind::Window(udwf)))
900+
ExprFuncBuilder::new(Some(ExprFuncKind::Window(*udwf)))
901901
}
902902
_ => ExprFuncBuilder::new(None),
903903
};
@@ -937,7 +937,7 @@ impl ExprFunctionExt for Expr {
937937
ExprFuncBuilder::new(Some(ExprFuncKind::Aggregate(udaf)))
938938
}
939939
Expr::WindowFunction(udwf) => {
940-
ExprFuncBuilder::new(Some(ExprFuncKind::Window(udwf)))
940+
ExprFuncBuilder::new(Some(ExprFuncKind::Window(*udwf)))
941941
}
942942
_ => ExprFuncBuilder::new(None),
943943
};
@@ -950,7 +950,7 @@ impl ExprFunctionExt for Expr {
950950
fn partition_by(self, partition_by: Vec<Expr>) -> ExprFuncBuilder {
951951
match self {
952952
Expr::WindowFunction(udwf) => {
953-
let mut builder = ExprFuncBuilder::new(Some(ExprFuncKind::Window(udwf)));
953+
let mut builder = ExprFuncBuilder::new(Some(ExprFuncKind::Window(*udwf)));
954954
builder.partition_by = Some(partition_by);
955955
builder
956956
}
@@ -961,7 +961,7 @@ impl ExprFunctionExt for Expr {
961961
fn window_frame(self, window_frame: WindowFrame) -> ExprFuncBuilder {
962962
match self {
963963
Expr::WindowFunction(udwf) => {
964-
let mut builder = ExprFuncBuilder::new(Some(ExprFuncKind::Window(udwf)));
964+
let mut builder = ExprFuncBuilder::new(Some(ExprFuncKind::Window(*udwf)));
965965
builder.window_frame = Some(window_frame);
966966
builder
967967
}

datafusion/expr/src/function.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ pub type AggregateFunctionSimplification = Box<
7878
>;
7979

8080
/// [crate::udwf::WindowUDFImpl::simplify] simplifier closure
81+
///
8182
/// A closure with two arguments:
8283
/// * 'window_function': [crate::expr::WindowFunction] for which simplified has been invoked
8384
/// * 'info': [crate::simplify::SimplifyInfo]

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2420,19 +2420,24 @@ impl Window {
24202420
.iter()
24212421
.enumerate()
24222422
.filter_map(|(idx, expr)| {
2423-
if let Expr::WindowFunction(WindowFunction {
2423+
let Expr::WindowFunction(window_func) = expr else {
2424+
return None;
2425+
};
2426+
let WindowFunction {
24242427
fun: WindowFunctionDefinition::WindowUDF(udwf),
24252428
partition_by,
24262429
..
2427-
}) = expr
2428-
{
2429-
// When there is no PARTITION BY, row number will be unique
2430-
// across the entire table.
2431-
if udwf.name() == "row_number" && partition_by.is_empty() {
2432-
return Some(idx + input_len);
2433-
}
2430+
} = window_func.as_ref()
2431+
else {
2432+
return None;
2433+
};
2434+
// When there is no PARTITION BY, row number will be unique
2435+
// across the entire table.
2436+
if udwf.name() == "row_number" && partition_by.is_empty() {
2437+
Some(idx + input_len)
2438+
} else {
2439+
None
24342440
}
2435-
None
24362441
})
24372442
.map(|idx| {
24382443
FunctionalDependence::new(vec![idx], vec![], false)

0 commit comments

Comments
 (0)