-
Notifications
You must be signed in to change notification settings - Fork 98
fix: correctly handle schema evolution in DF #5555
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
0a336bd
e549411
df70214
3c85227
ba46169
b9efe82
abfadb0
c79419a
0cd1a7d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -244,8 +244,8 @@ pub(crate) fn can_be_pushed_down(df_expr: &PhysicalExprRef, schema: &Schema) -> | |
| } else if let Some(in_list) = expr.downcast_ref::<df_expr::InListExpr>() { | ||
| can_be_pushed_down(in_list.expr(), schema) | ||
| && in_list.list().iter().all(|e| can_be_pushed_down(e, schema)) | ||
| } else if let Some(scalar_fn) = expr.downcast_ref::<ScalarFunctionExpr>() { | ||
| can_scalar_fn_be_pushed_down(scalar_fn, schema) | ||
| } else if ScalarFunctionExpr::try_downcast_func::<GetFieldFunc>(df_expr.as_ref()).is_some() { | ||
| true | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will returning One thing we've mentioned is to pass in the list of file schemas to the source so it can check for these cases so we can get the best of both worlds, not sure how you view that. I also am not very familiar with the |
||
| } else { | ||
| tracing::debug!(%df_expr, "DataFusion expression can't be pushed down"); | ||
| false | ||
|
|
@@ -292,60 +292,12 @@ fn supported_data_types(dt: &DataType) -> bool { | |
| is_supported | ||
| } | ||
|
|
||
| /// Checks if a GetField scalar function can be pushed down. | ||
| fn can_scalar_fn_be_pushed_down(scalar_fn: &ScalarFunctionExpr, schema: &Schema) -> bool { | ||
| let Some(get_field_fn) = ScalarFunctionExpr::try_downcast_func::<GetFieldFunc>(scalar_fn) | ||
| else { | ||
| // Only get_field pushdown is supported. | ||
| return false; | ||
| }; | ||
|
|
||
| let args = get_field_fn.args(); | ||
| if args.len() != 2 { | ||
| tracing::debug!( | ||
| "Expected 2 arguments for GetField, not pushing down {} arguments", | ||
| args.len() | ||
| ); | ||
| return false; | ||
| } | ||
| let source_expr = &args[0]; | ||
| let field_name_expr = &args[1]; | ||
| let Some(field_name) = field_name_expr | ||
| .as_any() | ||
| .downcast_ref::<df_expr::Literal>() | ||
| .and_then(|lit| lit.value().try_as_str().flatten()) | ||
| else { | ||
| return false; | ||
| }; | ||
|
|
||
| let Ok(source_dt) = source_expr.data_type(schema) else { | ||
| tracing::debug!( | ||
| field_name = field_name, | ||
| schema = ?schema, | ||
| source_expr = ?source_expr, | ||
| "Failed to get source type for GetField, not pushing down" | ||
| ); | ||
| return false; | ||
| }; | ||
| let DataType::Struct(fields) = source_dt else { | ||
| tracing::debug!( | ||
| field_name = field_name, | ||
| schema = ?schema, | ||
| source_expr = ?source_expr, | ||
| "Failed to get source type as struct for GetField, not pushing down" | ||
| ); | ||
| return false; | ||
| }; | ||
| fields.find(field_name).is_some() | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use std::sync::Arc; | ||
|
|
||
| use arrow_schema::DataType; | ||
| use arrow_schema::Field; | ||
| use arrow_schema::Fields; | ||
| use arrow_schema::Schema; | ||
| use arrow_schema::TimeUnit as ArrowTimeUnit; | ||
| use datafusion::functions::core::getfield::GetFieldFunc; | ||
|
|
@@ -673,34 +625,4 @@ mod tests { | |
| └── input: vortex.root | ||
| "#); | ||
| } | ||
|
|
||
| #[rstest] | ||
| #[case::valid_field("field1", true)] | ||
| #[case::missing_field("nonexistent_field", false)] | ||
| fn test_can_be_pushed_down_get_field(#[case] field_name: &str, #[case] expected: bool) { | ||
| let struct_fields = Fields::from(vec![ | ||
| Field::new("field1", DataType::Utf8, true), | ||
| Field::new("field2", DataType::Int32, true), | ||
| ]); | ||
| let schema = Schema::new(vec![Field::new( | ||
| "my_struct", | ||
| DataType::Struct(struct_fields), | ||
| true, | ||
| )]); | ||
|
|
||
| let struct_col = Arc::new(df_expr::Column::new("my_struct", 0)) as Arc<dyn PhysicalExpr>; | ||
| let field_name_lit = Arc::new(df_expr::Literal::new(ScalarValue::Utf8(Some( | ||
| field_name.to_string(), | ||
| )))) as Arc<dyn PhysicalExpr>; | ||
|
|
||
| let get_field_expr = Arc::new(ScalarFunctionExpr::new( | ||
| "get_field", | ||
| Arc::new(ScalarUDF::from(GetFieldFunc::new())), | ||
| vec![struct_col, field_name_lit], | ||
| Arc::new(Field::new(field_name, DataType::Utf8, true)), | ||
| Arc::new(ConfigOptions::new()), | ||
| )) as Arc<dyn PhysicalExpr>; | ||
|
|
||
| assert_eq!(can_be_pushed_down(&get_field_expr, &schema), expected); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as discussed in the other PR, this was never needed, and in fact is not possible to check in the source because we don't know which fields are present in the file or not.
When we've reached this point, DF planner has already determined that all column references are valid, we just need to ensure that the only scalarfunction that gets called is the GetFieldFunc