-
Notifications
You must be signed in to change notification settings - Fork 98
fix: correctly handle schema evolution in DF #5555
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
CodSpeed Performance ReportMerging #5555 will not alter performanceComparing Summary
Footnotes
|
| can_be_pushed_down(in_list.expr(), schema) | ||
| && in_list.list().iter().all(|e| can_be_pushed_down(e, schema)) | ||
| } else if let Some(scalar_fn) = expr.downcast_ref::<ScalarFunctionExpr>() { | ||
| can_scalar_fn_be_pushed_down(scalar_fn, schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as discussed in the other PR, this was never needed, and in fact is not possible to check in the source because we don't know which fields are present in the file or not.
When we've reached this point, DF planner has already determined that all column references are valid, we just need to ensure that the only scalarfunction that gets called is the GetFieldFunc
| //! | ||
| //! `datafusion/datasource/src/schema_adapter.rs` -> for can_cast_field (which is crate-private) | ||
| //! | ||
| //! See https://github.com/apache/datafusion/issues/18957 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // This has been changed to replace the DF upstream version which just does can_cast_types, | ||
| // which ignores struct fields with compatible but differing columns. | ||
| let is_compatible = can_cast_field(physical_field, logical_field)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the only line of this file that I have changed from the original
| static DEFAULT_EXPR_ADAPTER: LazyLock<Arc<dyn PhysicalExprAdapterFactory>> = | ||
| LazyLock::new(|| Arc::new(crate::adapter::DefaultPhysicalExprAdapterFactory)); | ||
|
|
||
| static DEFAULT_SCHEMA_ADAPTER: LazyLock<Arc<dyn SchemaAdapterFactory>> = | ||
| LazyLock::new(|| Arc::new(DefaultSchemaAdapterFactory)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we override to use the vendored adapter here
| pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>, | ||
| pub schema_adapter_factory: Arc<dyn SchemaAdapterFactory>, | ||
| /// Hive-style partitioning columns | ||
| pub partition_fields: Vec<Arc<Field>>, | ||
| pub file_cache: VortexFileCache, | ||
| /// This is the table's schema without partition columns. It might be different than | ||
| /// the physical schema, and the stream's type will be a projection of it. | ||
| pub logical_schema: SchemaRef, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these are not breaking changes b/c VortexOpener is pub(crate)
Codecov Report❌ Patch coverage is
☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
I'll try and review everything on Friday when I'll probably have some time to burn at the airport, but from the description, shouldn't we rely on a fix upstream? |
| } else if let Some(scalar_fn) = expr.downcast_ref::<ScalarFunctionExpr>() { | ||
| can_scalar_fn_be_pushed_down(scalar_fn, schema) | ||
| } else if ScalarFunctionExpr::try_downcast_func::<GetFieldFunc>(df_expr.as_ref()).is_some() { | ||
| true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will returning true here avoid the planning of a downstream filter? We've talked about edge cases before, i.e. is not null on a non-existent column in one file that is filled with nulls. I don't have a good answer for this and tbh care more about perf than correctness for these edge cases.
One thing we've mentioned is to pass in the list of file schemas to the source so it can check for these cases so we can get the best of both worlds, not sure how you view that.
I also am not very familiar with the try_pushdown_filters interface so this might not be a problem. Maybe the issue is that we can only return PushedDown::Yes or PushedDown::No and might need to upstream an equivalent of Inexact that table providers return for filters.
| let partition_fields = self.partition_fields.clone(); | ||
| let file_cache = self.file_cache.clone(); | ||
| let logical_schema = self.logical_schema.clone(); | ||
| let table_schema = self.table_schema.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, I like the nomenclature change
| metadata_size_hint: None, | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add the test that was deleted from source.rs that checked the getfield pushdown on a valid table schema field doesn't error out if that field is missing from the file?
| &record_batch!(("b", Utf8, vec![Some("four"), Some("five"), Some("six")])).unwrap(), | ||
| ) | ||
| .await; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want to enhance this test a little you could also:
- Add file3 with ["b", "a"] and file4 with ["a", "b"] in that order or just one file with a different order of fields than the table schema.
- include a struct column in the table schema and have different orderings/missing fields there too.
brancz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! I'm hoping that the unit tests I added adequately tested the behavior we were seeing, if not I'll make sure to open follow-ups.
|
btw this might be relevant in case you haven't seen it already: apache/arrow-rs#8871 (I suppose it's only half of the equation though, iiuc you are looking to "cast" to a struct that has a new field that didn't exist before, so apache/arrow-rs#7176 is more relevant, but there the recommendation was to make it the adapter responsibility haha .. you may also be interested in apache/arrow-rs#8840) |
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
This PR fixes a handful of broken things in the overlapping venn diagrams of
See #5406 and #5550 for context on recent issues.
This PR brings in a modified version of the
DefaultPhysicalExprRewriterfrom DataFusion main, and makes a fix that I've filed apache/datafusion#18957 to upstreamConcretely
VortexSourcestruct by removing thepubfields for overriding the SchemaAdapterFactory and PhysicalExprAdapterFactory, since we need to use our vendored version with the fix to ensure correctness.I can also confirm that this fixes the gharchive benchmark, bringing it from 3 seconds back down to 30ms