Skip to content

Commit 09fcef5

Browse files
committed
fix: filter pushdown for nested fields
In #5295, we accidentally broke nested filter pushdown. The issue is that FileSource::try_pushdown_filters seems like it's meant to evaluate using the whole file schema, rather than any projected schema. As an example, in the Github Archive benchmark dataset, we have the following query, which should trivially pushdown and be pruned, executing about 30ms or so: ``` SELECT COUNT(*) from events WHERE payload.ref = 'refs/head/main' ``` However, after this change, pushdown of this field was failing, pushing query time up 100x. The root cause is that the old logic attempted to apply the file schema to the source_expr directly. Concretely, for the gharchive query, the whole expression is something like: ```text BinaryExpr { lhs: GetField { source_expr: Column { name: "payload", index: 0 }, field_expr: Literal { value: "ref" } } rhs: Literal { value: "refs/head/main" } operator: Eq } ``` The issue is that the column index 0 is wrong for the whole file. Instead, we need to recursively ensure that the source_expr is a valid sequence of Column and GetField expressions that resolve properly. Note how we already were doing this for checking if a standalone Column expression can be pushed down: ``` } else if let Some(col) = expr.downcast_ref::<df_expr::Column>() { schema .field_with_name(col.name()) .ok() .is_some_and(|field| supported_data_types(field.data_type())) ``` Signed-off-by: Andrew Duffy <andrew@a10y.dev>
1 parent 3df9296 commit 09fcef5

File tree

1 file changed

+40
-45
lines changed

1 file changed

+40
-45
lines changed

vortex-datafusion/src/convert/exprs.rs

Lines changed: 40 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -228,8 +228,8 @@ pub(crate) fn can_be_pushed_down(df_expr: &PhysicalExprRef, schema: &Schema) ->
228228
} else if let Some(in_list) = expr.downcast_ref::<df_expr::InListExpr>() {
229229
can_be_pushed_down(in_list.expr(), schema)
230230
&& in_list.list().iter().all(|e| can_be_pushed_down(e, schema))
231-
} else if let Some(scalar_fn) = expr.downcast_ref::<ScalarFunctionExpr>() {
232-
can_scalar_fn_be_pushed_down(scalar_fn, schema)
231+
} else if let Some(_) = expr.downcast_ref::<ScalarFunctionExpr>() {
232+
get_source_data_type(df_expr, schema).is_some()
233233
} else {
234234
tracing::debug!(%df_expr, "DataFusion expression can't be pushed down");
235235
false
@@ -276,51 +276,46 @@ fn supported_data_types(dt: &DataType) -> bool {
276276
is_supported
277277
}
278278

279-
/// Checks if a GetField scalar function can be pushed down.
280-
fn can_scalar_fn_be_pushed_down(scalar_fn: &ScalarFunctionExpr, schema: &Schema) -> bool {
281-
let Some(get_field_fn) = ScalarFunctionExpr::try_downcast_func::<GetFieldFunc>(scalar_fn)
282-
else {
283-
// Only get_field pushdown is supported.
284-
return false;
285-
};
279+
/// Evaluate the source `expr` within the scope of `schema` and return its data type. If the source
280+
/// expression is not composed of valid field accesses that we can pushdown to Vortex, fail.
281+
fn get_source_data_type(expr: &Arc<dyn PhysicalExpr>, schema: &Schema) -> Option<DataType> {
282+
if let Some(col) = expr.as_any().downcast_ref::<df_expr::Column>() {
283+
// Column expression handler
284+
let Ok(field) = schema.field_with_name(col.name()) else {
285+
return None;
286+
};
287+
288+
// Get back the data type here instead.
289+
Some(field.data_type().clone())
290+
} else if let Some(scalar_fn) = expr.as_any().downcast_ref::<ScalarFunctionExpr>() {
291+
// Struct field access handler
292+
let get_field_fn = ScalarFunctionExpr::try_downcast_func::<GetFieldFunc>(scalar_fn)?;
293+
294+
let args = get_field_fn.args();
295+
if args.len() != 2 {
296+
return None;
297+
}
286298

287-
let args = get_field_fn.args();
288-
if args.len() != 2 {
289-
tracing::debug!(
290-
"Expected 2 arguments for GetField, not pushing down {} arguments",
291-
args.len()
292-
);
293-
return false;
299+
let source = &args[0];
300+
let field_name_expr = &args[1];
301+
302+
let DataType::Struct(fields) = get_source_data_type(source, schema)? else {
303+
return None;
304+
};
305+
306+
let field_name = field_name_expr
307+
.as_any()
308+
.downcast_ref::<df_expr::Literal>()
309+
.and_then(|l| l.value().try_as_str())
310+
.flatten()?;
311+
312+
// Extract the named field from the struct type
313+
fields
314+
.find(field_name)
315+
.map(|(_, dt)| dt.data_type().clone())
316+
} else {
317+
None
294318
}
295-
let source_expr = &args[0];
296-
let field_name_expr = &args[1];
297-
let Some(field_name) = field_name_expr
298-
.as_any()
299-
.downcast_ref::<df_expr::Literal>()
300-
.and_then(|lit| lit.value().try_as_str().flatten())
301-
else {
302-
return false;
303-
};
304-
305-
let Ok(source_dt) = source_expr.data_type(schema) else {
306-
tracing::debug!(
307-
field_name = field_name,
308-
schema = ?schema,
309-
source_expr = ?source_expr,
310-
"Failed to get source type for GetField, not pushing down"
311-
);
312-
return false;
313-
};
314-
let DataType::Struct(fields) = source_dt else {
315-
tracing::debug!(
316-
field_name = field_name,
317-
schema = ?schema,
318-
source_expr = ?source_expr,
319-
"Failed to get source type as struct for GetField, not pushing down"
320-
);
321-
return false;
322-
};
323-
fields.find(field_name).is_some()
324319
}
325320

326321
#[cfg(test)]

0 commit comments

Comments
 (0)