Skip to content

Conversation

@a10y
Copy link
Contributor

@a10y a10y commented Nov 27, 2025

This PR fixes a handful of broken things in the overlapping venn diagrams of

  1. vortex-datafusion
  2. pushdown over nested fields
  3. schema evolution

See #5406 and #5550 for context on recent issues.

This PR brings in a modified version of the DefaultPhysicalExprRewriter from DataFusion main, and makes a fix that I've filed apache/datafusion#18957 to upstream

Concretely

  • Fixes the bug where we weren't pushing down filters in VortexSource over nested struct fields
  • Fixes issue found in add test to demonstrate schema evolution fail #5550 where filtering over a table field that is not in every file would cause runtime error
  • Make a couple of breaking changes to the VortexSource struct by removing the pub fields for overriding the SchemaAdapterFactory and PhysicalExprAdapterFactory, since we need to use our vendored version with the fix to ensure correctness.

I can also confirm that this fixes the gharchive benchmark, bringing it from 3 seconds back down to 30ms

image

a10y added 6 commits November 26, 2025 12:36
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
@a10y a10y added performance Release label indicating an improvement to performance fix labels Nov 27, 2025
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
@codspeed-hq
Copy link

codspeed-hq bot commented Nov 27, 2025

CodSpeed Performance Report

Merging #5555 will not alter performance

Comparing aduffy/fix-schema-adapters (0cd1a7d) with develop (91a62f3)

Summary

✅ 1158 untouched
⏩ 617 skipped1

Footnotes

  1. 617 benchmarks were skipped, so the baseline results were used instead. If they were deleted from the codebase, click here and archive them to remove them from the performance reports.

can_be_pushed_down(in_list.expr(), schema)
&& in_list.list().iter().all(|e| can_be_pushed_down(e, schema))
} else if let Some(scalar_fn) = expr.downcast_ref::<ScalarFunctionExpr>() {
can_scalar_fn_be_pushed_down(scalar_fn, schema)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed in the other PR, this was never needed, and in fact is not possible to check in the source because we don't know which fields are present in the file or not.

When we've reached this point, DF planner has already determined that all column references are valid, we just need to ensure that the only scalarfunction that gets called is the GetFieldFunc

//!
//! `datafusion/datasource/src/schema_adapter.rs` -> for can_cast_field (which is crate-private)
//!
//! See https://github.com/apache/datafusion/issues/18957
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +289 to +291
// This has been changed to replace the DF upstream version which just does can_cast_types,
// which ignores struct fields with compatible but differing columns.
let is_compatible = can_cast_field(physical_field, logical_field)?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the only line of this file that I have changed from the original

Comment on lines +52 to +56
static DEFAULT_EXPR_ADAPTER: LazyLock<Arc<dyn PhysicalExprAdapterFactory>> =
LazyLock::new(|| Arc::new(crate::adapter::DefaultPhysicalExprAdapterFactory));

static DEFAULT_SCHEMA_ADAPTER: LazyLock<Arc<dyn SchemaAdapterFactory>> =
LazyLock::new(|| Arc::new(DefaultSchemaAdapterFactory));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we override to use the vendored adapter here

Comment on lines -64 to -71
pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
pub schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
/// Hive-style partitioning columns
pub partition_fields: Vec<Arc<Field>>,
pub file_cache: VortexFileCache,
/// This is the table's schema without partition columns. It might be different than
/// the physical schema, and the stream's type will be a projection of it.
pub logical_schema: SchemaRef,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are not breaking changes b/c VortexOpener is pub(crate)

Signed-off-by: Andrew Duffy <andrew@a10y.dev>
@codecov
Copy link

codecov bot commented Nov 27, 2025

Codecov Report

❌ Patch coverage is 78.61446% with 71 lines in your changes missing coverage. Please review.
✅ Project coverage is 85.44%. Comparing base (7e1d377) to head (c79419a).
⚠️ Report is 3 commits behind head on develop.

Files with missing lines Patch % Lines
vortex-datafusion/src/persistent/adapter.rs 58.57% 70 Missing ⚠️
vortex-datafusion/src/convert/exprs.rs 50.00% 1 Missing ⚠️

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@AdamGS
Copy link
Contributor

AdamGS commented Nov 27, 2025

I'll try and review everything on Friday when I'll probably have some time to burn at the airport, but from the description, shouldn't we rely on a fix upstream?

} else if let Some(scalar_fn) = expr.downcast_ref::<ScalarFunctionExpr>() {
can_scalar_fn_be_pushed_down(scalar_fn, schema)
} else if ScalarFunctionExpr::try_downcast_func::<GetFieldFunc>(df_expr.as_ref()).is_some() {
true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will returning true here avoid the planning of a downstream filter? We've talked about edge cases before, i.e. is not null on a non-existent column in one file that is filled with nulls. I don't have a good answer for this and tbh care more about perf than correctness for these edge cases.

One thing we've mentioned is to pass in the list of file schemas to the source so it can check for these cases so we can get the best of both worlds, not sure how you view that.

I also am not very familiar with the try_pushdown_filters interface so this might not be a problem. Maybe the issue is that we can only return PushedDown::Yes or PushedDown::No and might need to upstream an equivalent of Inexact that table providers return for filters.

let partition_fields = self.partition_fields.clone();
let file_cache = self.file_cache.clone();
let logical_schema = self.logical_schema.clone();
let table_schema = self.table_schema.clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I like the nomenclature change

metadata_size_hint: None,
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add the test that was deleted from source.rs that checked the getfield pushdown on a valid table schema field doesn't error out if that field is missing from the file?

&record_batch!(("b", Utf8, vec![Some("four"), Some("five"), Some("six")])).unwrap(),
)
.await;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to enhance this test a little you could also:

  • Add file3 with ["b", "a"] and file4 with ["a", "b"] in that order or just one file with a different order of fields than the table schema.
  • include a struct column in the table schema and have different orderings/missing fields there too.

Copy link
Contributor

@brancz brancz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! I'm hoping that the unit tests I added adequately tested the behavior we were seeing, if not I'll make sure to open follow-ups.

@brancz
Copy link
Contributor

brancz commented Nov 27, 2025

btw this might be relevant in case you haven't seen it already: apache/arrow-rs#8871

(I suppose it's only half of the equation though, iiuc you are looking to "cast" to a struct that has a new field that didn't exist before, so apache/arrow-rs#7176 is more relevant, but there the recommendation was to make it the adapter responsibility haha .. you may also be interested in apache/arrow-rs#8840)

Signed-off-by: Andrew Duffy <andrew@a10y.dev>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

fix performance Release label indicating an improvement to performance

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants