Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use PhysicalExtensionCodec consistently #10075

Merged
merged 4 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 72 additions & 16 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ impl From<&protobuf::PhysicalColumn> for Column {
/// # Arguments
///
/// * `proto` - Input proto with physical sort expression node
/// * `registry` - A registry knows how to build logical expressions out of user-defined function' names
/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
/// when performing type coercion.
/// * `codec` - An extension codec used to decode custom UDFs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for improving the comments here as well ❤️

pub fn parse_physical_sort_expr(
proto: &protobuf::PhysicalSortExprNode,
registry: &dyn FunctionRegistry,
Expand All @@ -102,9 +103,10 @@ pub fn parse_physical_sort_expr(
/// # Arguments
///
/// * `proto` - Input proto with vector of physical sort expression node
/// * `registry` - A registry knows how to build logical expressions out of user-defined function' names
/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
/// when performing type coercion.
/// * `codec` - An extension codec used to decode custom UDFs.
pub fn parse_physical_sort_exprs(
proto: &[protobuf::PhysicalSortExprNode],
registry: &dyn FunctionRegistry,
Expand All @@ -123,25 +125,39 @@ pub fn parse_physical_sort_exprs(
///
/// # Arguments
///
/// * `proto` - Input proto with physical window exprression node.
/// * `proto` - Input proto with physical window expression node.
/// * `name` - Name of the window expression.
/// * `registry` - A registry knows how to build logical expressions out of user-defined function' names
/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
/// when performing type coercion.
pub fn parse_physical_window_expr(
proto: &protobuf::PhysicalWindowExprNode,
registry: &dyn FunctionRegistry,
input_schema: &Schema,
) -> Result<Arc<dyn WindowExpr>> {
let codec = DefaultPhysicalExtensionCodec {};
parse_physical_window_expr_ext(
proto,
registry,
input_schema,
&DefaultPhysicalExtensionCodec {},
)
}

// TODO: Make this the public function on next major release.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what is on main will effectively become the next major release (38.0.0) so there is no reason to avoid API changes here.

If we want to backport this to a stable 37.1.0 release (e.g. #9904 ) then we should avoid API changes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see so if we want the backwards compatible changes it would be against branch-37

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the breaking changes here in 1f0040c

pub(crate) fn parse_physical_window_expr_ext(
proto: &protobuf::PhysicalWindowExprNode,
registry: &dyn FunctionRegistry,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result<Arc<dyn WindowExpr>> {
let window_node_expr =
parse_physical_exprs(&proto.args, registry, input_schema, &codec)?;
parse_physical_exprs(&proto.args, registry, input_schema, codec)?;

let partition_by =
parse_physical_exprs(&proto.partition_by, registry, input_schema, &codec)?;
parse_physical_exprs(&proto.partition_by, registry, input_schema, codec)?;

let order_by =
parse_physical_sort_exprs(&proto.order_by, registry, input_schema, &codec)?;
parse_physical_sort_exprs(&proto.order_by, registry, input_schema, codec)?;

let window_frame = proto
.window_frame
Expand Down Expand Up @@ -187,9 +203,10 @@ where
/// # Arguments
///
/// * `proto` - Input proto with physical expression node
/// * `registry` - A registry knows how to build logical expressions out of user-defined function' names
/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
/// when performing type coercion.
/// * `codec` - An extension codec used to decode custom UDFs.
pub fn parse_physical_expr(
proto: &protobuf::PhysicalExprNode,
registry: &dyn FunctionRegistry,
Expand All @@ -213,13 +230,15 @@ pub fn parse_physical_expr(
registry,
"left",
input_schema,
codec,
)?,
logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?,
parse_required_physical_expr(
binary_expr.r.as_deref(),
registry,
"right",
input_schema,
codec,
)?,
)),
ExprType::AggregateExpr(_) => {
Expand All @@ -241,6 +260,7 @@ pub fn parse_physical_expr(
registry,
"expr",
input_schema,
codec,
)?))
}
ExprType::IsNotNullExpr(e) => {
Expand All @@ -249,20 +269,23 @@ pub fn parse_physical_expr(
registry,
"expr",
input_schema,
codec,
)?))
}
ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr(
e.expr.as_deref(),
registry,
"expr",
input_schema,
codec,
)?)),
ExprType::Negative(e) => {
Arc::new(NegativeExpr::new(parse_required_physical_expr(
e.expr.as_deref(),
registry,
"expr",
input_schema,
codec,
)?))
}
ExprType::InList(e) => in_list(
Expand All @@ -271,6 +294,7 @@ pub fn parse_physical_expr(
registry,
"expr",
input_schema,
codec,
)?,
parse_physical_exprs(&e.list, registry, input_schema, codec)?,
&e.negated,
Expand All @@ -290,12 +314,14 @@ pub fn parse_physical_expr(
registry,
"when_expr",
input_schema,
codec,
)?,
parse_required_physical_expr(
e.then_expr.as_ref(),
registry,
"then_expr",
input_schema,
codec,
)?,
))
})
Expand All @@ -311,6 +337,7 @@ pub fn parse_physical_expr(
registry,
"expr",
input_schema,
codec,
)?,
convert_required!(e.arrow_type)?,
None,
Expand All @@ -321,6 +348,7 @@ pub fn parse_physical_expr(
registry,
"expr",
input_schema,
codec,
)?,
convert_required!(e.arrow_type)?,
)),
Expand Down Expand Up @@ -371,12 +399,14 @@ pub fn parse_physical_expr(
registry,
"expr",
input_schema,
codec,
)?,
parse_required_physical_expr(
like_expr.pattern.as_deref(),
registry,
"pattern",
input_schema,
codec,
)?,
)),
};
Expand All @@ -389,9 +419,9 @@ fn parse_required_physical_expr(
registry: &dyn FunctionRegistry,
field: &str,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result<Arc<dyn PhysicalExpr>> {
let codec = DefaultPhysicalExtensionCodec {};
expr.map(|e| parse_physical_expr(e, registry, input_schema, &codec))
expr.map(|e| parse_physical_expr(e, registry, input_schema, codec))
.transpose()?
.ok_or_else(|| {
DataFusionError::Internal(format!("Missing required field {field:?}"))
Expand Down Expand Up @@ -433,15 +463,29 @@ pub fn parse_protobuf_hash_partitioning(
partitioning: Option<&protobuf::PhysicalHashRepartition>,
registry: &dyn FunctionRegistry,
input_schema: &Schema,
) -> Result<Option<Partitioning>> {
parse_protobuf_hash_partitioning_ext(
partitioning,
registry,
input_schema,
&DefaultPhysicalExtensionCodec {},
)
}

// TODO: Make this the public function on next major release.
fn parse_protobuf_hash_partitioning_ext(
partitioning: Option<&protobuf::PhysicalHashRepartition>,
registry: &dyn FunctionRegistry,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result<Option<Partitioning>> {
match partitioning {
Some(hash_part) => {
let codec = DefaultPhysicalExtensionCodec {};
let expr = parse_physical_exprs(
&hash_part.hash_expr,
registry,
input_schema,
&codec,
codec,
)?;

Ok(Some(Partitioning::Hash(
Expand All @@ -456,6 +500,19 @@ pub fn parse_protobuf_hash_partitioning(
pub fn parse_protobuf_file_scan_config(
proto: &protobuf::FileScanExecConf,
registry: &dyn FunctionRegistry,
) -> Result<FileScanConfig> {
parse_protobuf_file_scan_config_ext(
proto,
registry,
&DefaultPhysicalExtensionCodec {},
)
}

// TODO: Make this the public function on next major release.
pub(crate) fn parse_protobuf_file_scan_config_ext(
proto: &protobuf::FileScanExecConf,
registry: &dyn FunctionRegistry,
codec: &dyn PhysicalExtensionCodec,
) -> Result<FileScanConfig> {
let schema: Arc<Schema> = Arc::new(convert_required!(proto.schema)?);
let projection = proto
Expand Down Expand Up @@ -489,7 +546,7 @@ pub fn parse_protobuf_file_scan_config(
.collect::<Result<Vec<_>>>()?;

// Remove partition columns from the schema after recreating table_partition_cols
// because the partition columns are not in the file. They are present to allow the
// because the partition columns are not in the file. They are present to allow
// the partition column types to be reconstructed after serde.
let file_schema = Arc::new(Schema::new(
schema
Expand All @@ -502,12 +559,11 @@ pub fn parse_protobuf_file_scan_config(

let mut output_ordering = vec![];
for node_collection in &proto.output_ordering {
let codec = DefaultPhysicalExtensionCodec {};
let sort_expr = parse_physical_sort_exprs(
&node_collection.physical_sort_expr_nodes,
registry,
&schema,
&codec,
codec,
)?;
output_ordering.push(sort_expr);
}
Expand Down
Loading