Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1358,6 +1358,9 @@ impl DefaultPhysicalPlanner {
physical_name(expr),
))?])),
}
} else if group_expr.is_empty() {
// No GROUP BY clause - create empty PhysicalGroupBy
Ok(PhysicalGroupBy::new(vec![], vec![], vec![]))
Copy link
Contributor Author

@NGA-TRAN NGA-TRAN Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the places that fixes bug 2 that should return nothing for non-groupby

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should look into changing group_expr: Option<&[Expr]> or something so the compiler makes it more likely that we handle the difference between an emptyVecvsNone`

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me try

Copy link
Contributor Author

@NGA-TRAN NGA-TRAN Jul 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb : I had a look. This would be very challenging and a lot of changes needed in physical plan, logical plan, parsing logic, serialization/deserialization. I do not think we want to make a big change just for this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something we can ask an AI agent to try in a follow on PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did, and those were the results I got. It even flagged a semantic error due to not distinguishing between no group and an empty one. Coding without AI these days feels almost impossible. 😄

} else {
Ok(PhysicalGroupBy::new_single(
group_expr
Expand Down
1 change: 1 addition & 0 deletions datafusion/ffi/src/udaf/accumulator_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ impl TryFrom<AccumulatorArgs<'_>> for FFI_AccumulatorArgs {
ignore_nulls: args.ignore_nulls,
fun_definition: None,
aggregate_function: None,
human_display: args.name.to_string(),
};
let physical_expr_def = physical_expr_def.encode_to_vec().into();

Expand Down
9 changes: 8 additions & 1 deletion datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,17 @@ impl PhysicalGroupBy {
)
.collect();
let num_exprs = expr.len();
let groups = if self.expr.is_empty() {
// No GROUP BY expressions - should have no groups
vec![]
} else {
// Has GROUP BY expressions - create a single group
vec![vec![false; num_exprs]]
};
Self {
expr,
null_expr: vec![],
groups: vec![vec![false; num_exprs]],
groups,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,5 @@ datafusion-functions = { workspace = true, default-features = true }
datafusion-functions-aggregate = { workspace = true }
datafusion-functions-window-common = { workspace = true }
doc-comment = { workspace = true }
pretty_assertions = "1.4"
tokio = { workspace = true, features = ["rt-multi-thread"] }
2 changes: 2 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,7 @@ message PhysicalScalarUdfNode {
optional bytes fun_definition = 3;
datafusion_common.ArrowType return_type = 4;
bool nullable = 5;
string return_field_name = 6;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix for bug 3 that does not serialize ScarlarFunction name

}

message PhysicalAggregateExprNode {
Expand All @@ -870,6 +871,7 @@ message PhysicalAggregateExprNode {
bool distinct = 3;
bool ignore_nulls = 6;
optional bytes fun_definition = 7;
string human_display = 8;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix for bug 1 that does not serialize human_display for aggregate function

}

message PhysicalWindowExprNode {
Expand Down
36 changes: 36 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,12 @@ pub fn parse_physical_expr(
e.name.as_str(),
scalar_fun_def,
args,
Field::new("f", convert_required!(e.return_type)?, true).into(),
Field::new(
&e.return_field_name,
convert_required!(e.return_type)?,
true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this field could ever be non nullable, for example for udfs defined by DF users, then the deserialization would result in a mismatch (not in the scope of this fixes though)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change here is to use e.return_field_name in deserialization. So whatever name of the UDF we serialize at line 356 of the file to_proto.rs in this PR, it will be deserialized here; and it will match. Does this answer your concern?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was refering to the nullability of the return Field, that is always being set to true (even before this PR) i was wondering if this could be an issue later if the output of the scalar function could contain nulls, but is unrelated to the changes & fixes of this PR, sorry for the confusion 😄

)
.into(),
)
.with_nullable(e.nullable),
)
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1108,6 +1108,7 @@ impl protobuf::PhysicalPlanNode {
AggregateExprBuilder::new(agg_udf, input_phy_expr)
.schema(Arc::clone(&physical_schema))
.alias(name)
.human_display(agg_node.human_display.clone())
.with_ignore_nulls(agg_node.ignore_nulls)
.with_distinct(agg_node.distinct)
.order_by(order_bys)
Expand Down
6 changes: 6 additions & 0 deletions datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::sync::Arc;

use arrow::datatypes::Schema;
#[cfg(feature = "parquet")]
use datafusion::datasource::file_format::parquet::ParquetSink;
use datafusion::datasource::physical_plan::FileSink;
Expand Down Expand Up @@ -69,6 +70,7 @@ pub fn serialize_physical_aggr_expr(
distinct: aggr_expr.is_distinct(),
ignore_nulls: aggr_expr.ignore_nulls(),
fun_definition: (!buf.is_empty()).then_some(buf),
human_display: aggr_expr.human_display().to_string(),
},
)),
})
Expand Down Expand Up @@ -351,6 +353,10 @@ pub fn serialize_physical_expr(
fun_definition: (!buf.is_empty()).then_some(buf),
return_type: Some(expr.return_type().try_into()?),
nullable: expr.nullable(),
return_field_name: expr
.return_field(&Schema::empty())?
.name()
.to_string(),
},
)),
})
Expand Down
Loading