-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use PhysicalExtensionCodec consistently #10075
Changes from 2 commits
a6993eb
7d16e38
280feda
1f0040c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -76,9 +76,10 @@ impl From<&protobuf::PhysicalColumn> for Column { | |
/// # Arguments | ||
/// | ||
/// * `proto` - Input proto with physical sort expression node | ||
/// * `registry` - A registry knows how to build logical expressions out of user-defined function' names | ||
/// * `registry` - A registry knows how to build logical expressions out of user-defined function names | ||
/// * `input_schema` - The Arrow schema for the input, used for determining expression data types | ||
/// when performing type coercion. | ||
/// * `codec` - An extension codec used to decode custom UDFs. | ||
pub fn parse_physical_sort_expr( | ||
proto: &protobuf::PhysicalSortExprNode, | ||
registry: &dyn FunctionRegistry, | ||
|
@@ -102,9 +103,10 @@ pub fn parse_physical_sort_expr( | |
/// # Arguments | ||
/// | ||
/// * `proto` - Input proto with vector of physical sort expression node | ||
/// * `registry` - A registry knows how to build logical expressions out of user-defined function' names | ||
/// * `registry` - A registry knows how to build logical expressions out of user-defined function names | ||
/// * `input_schema` - The Arrow schema for the input, used for determining expression data types | ||
/// when performing type coercion. | ||
/// * `codec` - An extension codec used to decode custom UDFs. | ||
pub fn parse_physical_sort_exprs( | ||
proto: &[protobuf::PhysicalSortExprNode], | ||
registry: &dyn FunctionRegistry, | ||
|
@@ -123,25 +125,39 @@ pub fn parse_physical_sort_exprs( | |
/// | ||
/// # Arguments | ||
/// | ||
/// * `proto` - Input proto with physical window exprression node. | ||
/// * `proto` - Input proto with physical window expression node. | ||
/// * `name` - Name of the window expression. | ||
/// * `registry` - A registry knows how to build logical expressions out of user-defined function' names | ||
/// * `registry` - A registry knows how to build logical expressions out of user-defined function names | ||
/// * `input_schema` - The Arrow schema for the input, used for determining expression data types | ||
/// when performing type coercion. | ||
pub fn parse_physical_window_expr( | ||
proto: &protobuf::PhysicalWindowExprNode, | ||
registry: &dyn FunctionRegistry, | ||
input_schema: &Schema, | ||
) -> Result<Arc<dyn WindowExpr>> { | ||
let codec = DefaultPhysicalExtensionCodec {}; | ||
parse_physical_window_expr_ext( | ||
proto, | ||
registry, | ||
input_schema, | ||
&DefaultPhysicalExtensionCodec {}, | ||
) | ||
} | ||
|
||
// TODO: Make this the public function on next major release. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think what is on main will effectively become the next major release ( If we want to backport this to a stable There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I see so if we want the backwards compatible changes it would be against There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added the breaking changes here in 1f0040c |
||
pub(crate) fn parse_physical_window_expr_ext( | ||
proto: &protobuf::PhysicalWindowExprNode, | ||
registry: &dyn FunctionRegistry, | ||
input_schema: &Schema, | ||
codec: &dyn PhysicalExtensionCodec, | ||
) -> Result<Arc<dyn WindowExpr>> { | ||
let window_node_expr = | ||
parse_physical_exprs(&proto.args, registry, input_schema, &codec)?; | ||
parse_physical_exprs(&proto.args, registry, input_schema, codec)?; | ||
|
||
let partition_by = | ||
parse_physical_exprs(&proto.partition_by, registry, input_schema, &codec)?; | ||
parse_physical_exprs(&proto.partition_by, registry, input_schema, codec)?; | ||
|
||
let order_by = | ||
parse_physical_sort_exprs(&proto.order_by, registry, input_schema, &codec)?; | ||
parse_physical_sort_exprs(&proto.order_by, registry, input_schema, codec)?; | ||
|
||
let window_frame = proto | ||
.window_frame | ||
|
@@ -187,9 +203,10 @@ where | |
/// # Arguments | ||
/// | ||
/// * `proto` - Input proto with physical expression node | ||
/// * `registry` - A registry knows how to build logical expressions out of user-defined function' names | ||
/// * `registry` - A registry knows how to build logical expressions out of user-defined function names | ||
/// * `input_schema` - The Arrow schema for the input, used for determining expression data types | ||
/// when performing type coercion. | ||
/// * `codec` - An extension codec used to decode custom UDFs. | ||
pub fn parse_physical_expr( | ||
proto: &protobuf::PhysicalExprNode, | ||
registry: &dyn FunctionRegistry, | ||
|
@@ -213,13 +230,15 @@ pub fn parse_physical_expr( | |
registry, | ||
"left", | ||
input_schema, | ||
codec, | ||
)?, | ||
logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?, | ||
parse_required_physical_expr( | ||
binary_expr.r.as_deref(), | ||
registry, | ||
"right", | ||
input_schema, | ||
codec, | ||
)?, | ||
)), | ||
ExprType::AggregateExpr(_) => { | ||
|
@@ -241,6 +260,7 @@ pub fn parse_physical_expr( | |
registry, | ||
"expr", | ||
input_schema, | ||
codec, | ||
)?)) | ||
} | ||
ExprType::IsNotNullExpr(e) => { | ||
|
@@ -249,20 +269,23 @@ pub fn parse_physical_expr( | |
registry, | ||
"expr", | ||
input_schema, | ||
codec, | ||
)?)) | ||
} | ||
ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr( | ||
e.expr.as_deref(), | ||
registry, | ||
"expr", | ||
input_schema, | ||
codec, | ||
)?)), | ||
ExprType::Negative(e) => { | ||
Arc::new(NegativeExpr::new(parse_required_physical_expr( | ||
e.expr.as_deref(), | ||
registry, | ||
"expr", | ||
input_schema, | ||
codec, | ||
)?)) | ||
} | ||
ExprType::InList(e) => in_list( | ||
|
@@ -271,6 +294,7 @@ pub fn parse_physical_expr( | |
registry, | ||
"expr", | ||
input_schema, | ||
codec, | ||
)?, | ||
parse_physical_exprs(&e.list, registry, input_schema, codec)?, | ||
&e.negated, | ||
|
@@ -290,12 +314,14 @@ pub fn parse_physical_expr( | |
registry, | ||
"when_expr", | ||
input_schema, | ||
codec, | ||
)?, | ||
parse_required_physical_expr( | ||
e.then_expr.as_ref(), | ||
registry, | ||
"then_expr", | ||
input_schema, | ||
codec, | ||
)?, | ||
)) | ||
}) | ||
|
@@ -311,6 +337,7 @@ pub fn parse_physical_expr( | |
registry, | ||
"expr", | ||
input_schema, | ||
codec, | ||
)?, | ||
convert_required!(e.arrow_type)?, | ||
None, | ||
|
@@ -321,6 +348,7 @@ pub fn parse_physical_expr( | |
registry, | ||
"expr", | ||
input_schema, | ||
codec, | ||
)?, | ||
convert_required!(e.arrow_type)?, | ||
)), | ||
|
@@ -371,12 +399,14 @@ pub fn parse_physical_expr( | |
registry, | ||
"expr", | ||
input_schema, | ||
codec, | ||
)?, | ||
parse_required_physical_expr( | ||
like_expr.pattern.as_deref(), | ||
registry, | ||
"pattern", | ||
input_schema, | ||
codec, | ||
)?, | ||
)), | ||
}; | ||
|
@@ -389,9 +419,9 @@ fn parse_required_physical_expr( | |
registry: &dyn FunctionRegistry, | ||
field: &str, | ||
input_schema: &Schema, | ||
codec: &dyn PhysicalExtensionCodec, | ||
) -> Result<Arc<dyn PhysicalExpr>> { | ||
let codec = DefaultPhysicalExtensionCodec {}; | ||
expr.map(|e| parse_physical_expr(e, registry, input_schema, &codec)) | ||
expr.map(|e| parse_physical_expr(e, registry, input_schema, codec)) | ||
.transpose()? | ||
.ok_or_else(|| { | ||
DataFusionError::Internal(format!("Missing required field {field:?}")) | ||
|
@@ -433,15 +463,29 @@ pub fn parse_protobuf_hash_partitioning( | |
partitioning: Option<&protobuf::PhysicalHashRepartition>, | ||
registry: &dyn FunctionRegistry, | ||
input_schema: &Schema, | ||
) -> Result<Option<Partitioning>> { | ||
parse_protobuf_hash_partitioning_ext( | ||
partitioning, | ||
registry, | ||
input_schema, | ||
&DefaultPhysicalExtensionCodec {}, | ||
) | ||
} | ||
|
||
// TODO: Make this the public function on next major release. | ||
fn parse_protobuf_hash_partitioning_ext( | ||
partitioning: Option<&protobuf::PhysicalHashRepartition>, | ||
registry: &dyn FunctionRegistry, | ||
input_schema: &Schema, | ||
codec: &dyn PhysicalExtensionCodec, | ||
) -> Result<Option<Partitioning>> { | ||
match partitioning { | ||
Some(hash_part) => { | ||
let codec = DefaultPhysicalExtensionCodec {}; | ||
let expr = parse_physical_exprs( | ||
&hash_part.hash_expr, | ||
registry, | ||
input_schema, | ||
&codec, | ||
codec, | ||
)?; | ||
|
||
Ok(Some(Partitioning::Hash( | ||
|
@@ -456,6 +500,19 @@ pub fn parse_protobuf_hash_partitioning( | |
pub fn parse_protobuf_file_scan_config( | ||
proto: &protobuf::FileScanExecConf, | ||
registry: &dyn FunctionRegistry, | ||
) -> Result<FileScanConfig> { | ||
parse_protobuf_file_scan_config_ext( | ||
proto, | ||
registry, | ||
&DefaultPhysicalExtensionCodec {}, | ||
) | ||
} | ||
|
||
// TODO: Make this the public function on next major release. | ||
pub(crate) fn parse_protobuf_file_scan_config_ext( | ||
proto: &protobuf::FileScanExecConf, | ||
registry: &dyn FunctionRegistry, | ||
codec: &dyn PhysicalExtensionCodec, | ||
) -> Result<FileScanConfig> { | ||
let schema: Arc<Schema> = Arc::new(convert_required!(proto.schema)?); | ||
let projection = proto | ||
|
@@ -489,7 +546,7 @@ pub fn parse_protobuf_file_scan_config( | |
.collect::<Result<Vec<_>>>()?; | ||
|
||
// Remove partition columns from the schema after recreating table_partition_cols | ||
// because the partition columns are not in the file. They are present to allow the | ||
// because the partition columns are not in the file. They are present to allow | ||
// the partition column types to be reconstructed after serde. | ||
let file_schema = Arc::new(Schema::new( | ||
schema | ||
|
@@ -502,12 +559,11 @@ pub fn parse_protobuf_file_scan_config( | |
|
||
let mut output_ordering = vec![]; | ||
for node_collection in &proto.output_ordering { | ||
let codec = DefaultPhysicalExtensionCodec {}; | ||
let sort_expr = parse_physical_sort_exprs( | ||
&node_collection.physical_sort_expr_nodes, | ||
registry, | ||
&schema, | ||
&codec, | ||
codec, | ||
)?; | ||
output_ordering.push(sort_expr); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you for improving the comments here as well ❤️