Skip to content

Conversation

@a10y
Copy link
Contributor

@a10y a10y commented Nov 19, 2025

In #5295, we accidentally broke nested filter pushdown. The issue is that FileSource::try_pushdown_filters seems like it's meant to evaluate using the whole file schema, rather than any projected schema. As an example, in the Github Archive benchmark dataset, we have the following query, which should trivially pushdown and be pruned, executing about 30ms or so:

SELECT COUNT(*) from events WHERE payload.ref = 'refs/head/main'

However, after this change, pushdown of this field was failing, pushing query time up 100x. The root cause is that the old logic attempted to apply the file schema to the source_expr directly.

Concretely, for the gharchive query, the whole expression is something like:

BinaryExpr {
    lhs: GetField {
    	source_expr: Column { name: "payload", index: 0 },
	field_expr: Literal { value: "ref" }
    }
    rhs: Literal { value: "refs/head/main" }
    operator: Eq
}

The issue is that the column index 0 is wrong for the whole file. Instead, we need to recursively ensure that the source_expr is a valid sequence of Column and GetField expressions that resolve properly.

Note how we already were doing this for checking if a standalone Column expression can be pushed down:

    } else if let Some(col) = expr.downcast_ref::<df_expr::Column>() {
        schema
            .field_with_name(col.name())
            .ok()
            .is_some_and(|field| supported_data_types(field.data_type()))

@a10y
Copy link
Contributor Author

a10y commented Nov 19, 2025

GH Archive query 0 before: 3 seconds

image

After: 30ms

image

@a10y a10y requested review from asubiotto and robert3005 November 19, 2025 19:46
@a10y a10y force-pushed the aduffy/filter-pushdown-fix branch from 09fcef5 to 7046d3a Compare November 19, 2025 19:48
@a10y a10y added the fix label Nov 19, 2025
@a10y a10y requested a review from gatesn November 19, 2025 19:50
@codecov
Copy link

codecov bot commented Nov 19, 2025

Codecov Report

❌ Patch coverage is 96.71533% with 9 lines in your changes missing coverage. Please review.
✅ Project coverage is 85.50%. Comparing base (fe4c81b) to head (17a6089).

Files with missing lines Patch % Lines
vortex-datafusion/src/convert/exprs.rs 92.59% 6 Missing ⚠️
vortex-datafusion/src/persistent/opener.rs 98.54% 2 Missing ⚠️
vortex-datafusion/src/persistent/source.rs 95.00% 1 Missing ⚠️

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@a10y
Copy link
Contributor Author

a10y commented Nov 19, 2025

Actually, I'm not sure why this check was added. AFAICT, attempts to create a GetFieldFunction should fail at planning time, before we even try and pushdown any filter expressions to the source

image

@a10y
Copy link
Contributor Author

a10y commented Nov 19, 2025

I'm actually fairly confident that we just need to validate the ScalarFunction being pushed down to us is GetFieldFunc and let DF do the rest.

@robert3005
Copy link
Contributor

The case I found while reading parquet logic for this is that there might be a constant synthetic column created and you might have a filter on it where you could get to see the column. There might be test cases in DataFusion parquet

@asubiotto
Copy link
Contributor

asubiotto commented Nov 20, 2025

The reason I added this check was that I was seeing getfield fail at execution time unexpectedly because the table schema (merged schema over all files) does have the field so the df planning works fine (as it should), but a specific file does not. I do think we need to check field existence.

@a10y
Copy link
Contributor Author

a10y commented Nov 20, 2025

I was seeing getfield fail at execution time unexpectedly because the table schema (merged schema over all files) does have the field so the df planning works fine (as it should), but a specific file does not.

That makes sense, wasn't thinking about schema evolution.

The Source reports what can be pushed down, and it has access to the table schema but doesn't know the individual file schemas. So, I think we should be adapting the predicate in the FileOpener instead of the FileSource.

I've also noticed that some of these APIs have changed in DF 51 so I can double-check that today.

@asubiotto
Copy link
Contributor

Ahh, that makes sense. We already do pushdown checks in the opener against the file schema, so can_be_pushed_down should return true in try_pushdown_filters at the source level against the table schema, but each individual opener should check field existence in the file schema. I think this should basically already work, can_be_pushed_down is called at both levels.

@a10y
Copy link
Contributor Author

a10y commented Nov 20, 2025

let filter = filter
.and_then(|f| {
let exprs = split_conjunction(&f)
.into_iter()
.filter(|expr| can_be_pushed_down(expr, &predicate_file_schema))
.collect::<Vec<_>>();
make_vortex_predicate(&exprs).transpose()
})
.transpose()
.map_err(|e| DataFusionError::External(e.into()))?;

I think we should error here instead of dropping the predicate silently after we had told DF that we're going to handle it.

By my reading of DF, when we report PushDownPredicate::supported back up to DF, it assumes that the filter is wholly executed by the DataSource. Here's how the Filter node for reference: https://github.com/apache/datafusion/blob/f17cc09fb839431b469e7c707364c1cf99042650/datafusion/physical-plan/src/filter.rs#L514

I think once we add some protection there this should be gtg

@a10y a10y force-pushed the aduffy/filter-pushdown-fix branch from eae7eed to 0426d3d Compare November 20, 2025 21:18
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::ops::Range;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this just got moved out of opener.rs since it was getting long, and having these tests there was distracting

| Timestamp(_, _)
| Time32(_)
| Time64(_)
| Struct(_)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should probably expand this list further? I just added Struct to make one of the tests pass

Copy link
Contributor

@robert3005 robert3005 Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are missing List/ListView/FixedSiedList and interval types. Map and Union as well

@codspeed-hq
Copy link

codspeed-hq bot commented Nov 20, 2025

CodSpeed Performance Report

Merging #5406 will improve performances by 17.18%

Comparing aduffy/filter-pushdown-fix (373a365) with develop (fe34efa)

Summary

⚡ 5 improvements
✅ 1473 untouched
⏩ 235 skipped1

Benchmarks breakdown

Benchmark BASE HEAD Change
slice_arrow_buffer[1024] 397.8 ns 339.4 ns +17.18%
slice_arrow_buffer[128] 397.8 ns 339.4 ns +17.18%
slice_arrow_buffer[16384] 397.8 ns 339.4 ns +17.18%
slice_arrow_buffer[2048] 397.8 ns 339.4 ns +17.18%
slice_arrow_buffer[65536] 397.8 ns 339.4 ns +17.18%

Footnotes

  1. 235 benchmarks were skipped, so the baseline results were used instead. If they were deleted from the codebase, click here and archive them to remove them from the performance reports.

Comment on lines 302 to 359
if !can_be_pushed_down(expr, &predicate_file_schema) {
internal_datafusion_err!("DataFusion predicate {expr} cannot be pushed down to Vortex file {} with schema {predicate_file_schema}",
file_meta.object_meta.location);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here be dragons when we have filters being applied to a column that doesn't exist in every file in the source.

if we have filters that touch columns which are not in the file's physical schema, we can't just skip it because the default value returned by the schema adapter might actually have failed the filter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this behavior though, doesn't this break cases where we would like to push down an expression as much as possible even if it operates on a column that doesn't exist in certain files? Wouldn't an error here error the whole query out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you're right, the correct thing to do is to ignore any predicates over missing columns in the opener (develop behavior) and just rely on DF to post filter for us

return false;
}

let _expr_str = format!("{:?}", df_expr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this is debugging? dbg! is useful in these cases

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was leftover from debugging, but the rationale was that I wanted a string variable when I stepped thru this in the debugger.

@asubiotto
Copy link
Contributor

I had a more general message on discord:

One other thing I'd like to add is that IIRC, if the file source returns that it cannot push down a filter, there is currently no way to communicate that filter to the vortex opener. This is currently done after a successful try_pushdown_filters by setting source.vortex_predicate. Parquet has a with_predicate option so whoever's creating a data source can independently pass down a filter that will be best-effort applied to the scan layer with no guarantees to readers.

Mis-applying filters on missing columns filled in by the schema adapter is a more general concern, so I think short-term we should just do what everyone else does. We just need to make sure that even if we return that we can't push down filters at the source level, we still apply these in the opener.

But related to this PR is the opener erroring out on the predicate. I think it should not error even if the expression is on a missing column.

@cloudflare-workers-and-pages
Copy link

cloudflare-workers-and-pages bot commented Nov 25, 2025

Deploying vortex-bench with  Cloudflare Pages  Cloudflare Pages

Latest commit: 0959703
Status: ✅  Deploy successful!
Preview URL: https://3e31eebe.vortex-93b.pages.dev
Branch Preview URL: https://aduffy-filter-pushdown-fix.vortex-93b.pages.dev

View logs

a10y and others added 7 commits November 25, 2025 14:33
In #5295, we accidentally broke nested filter pushdown. The issue is
that FileSource::try_pushdown_filters seems like it's meant to evaluate
using the whole file schema, rather than any projected schema. As an
example, in the Github Archive benchmark dataset, we have the following
query, which should trivially pushdown and be pruned, executing about
30ms or so:

```
SELECT COUNT(*) from events WHERE payload.ref = 'refs/head/main'
```

However, after this change, pushdown of this field was failing, pushing
query time up 100x. The root cause is that the old logic attempted to
apply the file schema to the source_expr directly.

Concretely, for the gharchive query, the whole expression is something
like:

```text
BinaryExpr {
    lhs: GetField {
    	source_expr: Column { name: "payload", index: 0 },
	field_expr: Literal { value: "ref" }
    }
    rhs: Literal { value: "refs/head/main" }
    operator: Eq
}
```

The issue is that the column index 0 is wrong for the whole file.
Instead, we need to recursively ensure that the source_expr is a valid
sequence of Column and GetField expressions that resolve properly.

Note how we already were doing this for checking if a standalone Column
expression can be pushed down:

```
    } else if let Some(col) = expr.downcast_ref::<df_expr::Column>() {
        schema
            .field_with_name(col.name())
            .ok()
            .is_some_and(|field| supported_data_types(field.data_type()))
```

Signed-off-by: Andrew Duffy <andrew@a10y.dev>
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
…tex (#5521)

requires some non-auto changes to to `bench-vortex`

---------

Signed-off-by: Joe Isaacs <joe.isaacs@live.co.uk>
@a10y a10y force-pushed the aduffy/filter-pushdown-fix branch from 0959703 to 5ddd409 Compare November 25, 2025 19:41
a10y added 8 commits November 25, 2025 14:43
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
@a10y
Copy link
Contributor Author

a10y commented Nov 25, 2025

Here's GHArchive query SELECT * FROM events WHERE payload.ref = 'main' with current state of this branch: https://share.firefox.dev/481IcOj

Post-filtering the string match in DF is actually a trivial amount of the overall runtime (0.3%)

image

The bigger problem is that when we tell DF that we can't push the filter, it prompts us to return a projection of payload, so it forces us to read and decode the entire payload column and all of its nested fields. So instead of getting a 10x speedup it's more like a 35% speedup. boo!

Signed-off-by: Andrew Duffy <andrew@a10y.dev>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants