Skip to content

Commit ebf32b0

Browse files
committed
fix: resolve qualified column references after aggregation
Adds fallback logic to handle qualified column references when they fail to resolve directly in the schema. This commonly occurs when aggregations produce unqualified schemas but subsequent operations still reference qualified column names. The fix preserves original error messages and only applies the fallback for qualified columns that fail initial resolution.
1 parent e2a5b57 commit ebf32b0

File tree

1 file changed

+164
-3
lines changed

1 file changed

+164
-3
lines changed

datafusion/expr/src/expr_schema.rs

Lines changed: 164 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,26 @@ impl ExprSchemable for Expr {
111111
_ => expr.get_type(schema),
112112
},
113113
Expr::Negative(expr) => expr.get_type(schema),
114-
Expr::Column(c) => Ok(schema.data_type(c)?.clone()),
114+
Expr::Column(c) => {
115+
// First try to resolve the column as-is
116+
match schema.data_type(c) {
117+
Ok(data_type) => Ok(data_type.clone()),
118+
Err(e) => {
119+
// If the column has a qualifier but wasn't found, try without the qualifier
120+
// This handles cases where aggregations produce unqualified schemas
121+
// but subsequent operations still reference the qualified names
122+
if c.relation.is_some() {
123+
let unqualified = Column::new_unqualified(&c.name);
124+
match schema.data_type(&unqualified) {
125+
Ok(data_type) => Ok(data_type.clone()),
126+
Err(_) => Err(e), // Return the original error
127+
}
128+
} else {
129+
Err(e)
130+
}
131+
}
132+
}
133+
}
115134
Expr::OuterReferenceColumn(field, _) => Ok(field.data_type().clone()),
116135
Expr::ScalarVariable(ty, _) => Ok(ty.clone()),
117136
Expr::Literal(l, _) => Ok(l.data_type()),
@@ -275,7 +294,26 @@ impl ExprSchemable for Expr {
275294
|| low.nullable(input_schema)?
276295
|| high.nullable(input_schema)?),
277296

278-
Expr::Column(c) => input_schema.nullable(c),
297+
Expr::Column(c) => {
298+
// First try to resolve the column as-is
299+
match input_schema.nullable(c) {
300+
Ok(nullable) => Ok(nullable),
301+
Err(e) => {
302+
// If the column has a qualifier but wasn't found, try without the qualifier
303+
// This handles cases where aggregations produce unqualified schemas
304+
// but subsequent operations still reference the qualified names
305+
if c.relation.is_some() {
306+
let unqualified = Column::new_unqualified(&c.name);
307+
match input_schema.nullable(&unqualified) {
308+
Ok(nullable) => Ok(nullable),
309+
Err(_) => Err(e), // Return the original error
310+
}
311+
} else {
312+
Err(e)
313+
}
314+
}
315+
}
316+
}
279317
Expr::OuterReferenceColumn(field, _) => Ok(field.is_nullable()),
280318
Expr::Literal(value, _) => Ok(value.is_null()),
281319
Expr::Case(case) => {
@@ -777,9 +815,12 @@ pub fn cast_subquery(subquery: Subquery, cast_to_type: &DataType) -> Result<Subq
777815
#[cfg(test)]
778816
mod tests {
779817
use super::*;
818+
use crate::test::function_stub::avg;
780819
use crate::{col, lit, out_ref_col_with_metadata};
781820

782-
use datafusion_common::{internal_err, DFSchema, HashMap, ScalarValue};
821+
use datafusion_common::{
822+
internal_err, Column, DFSchema, HashMap, ScalarValue, TableReference,
823+
};
783824

784825
macro_rules! test_is_expr_nullable {
785826
($EXPR_TYPE:ident) => {{
@@ -881,6 +922,126 @@ mod tests {
881922
);
882923
}
883924

925+
#[test]
926+
fn test_qualified_column_after_aggregation() {
927+
// Test for qualified column reference resolution after aggregation
928+
// This test verifies the fix for the issue where binary expressions
929+
// fail when referencing qualified column names after aggregation
930+
// produces unqualified schemas.
931+
932+
// Create a schema that simulates the result of an aggregation
933+
// where the output field is unqualified (just "value")
934+
let unqualified_schema = DFSchema::from_unqualified_fields(
935+
vec![Field::new("value", DataType::Float64, false)].into(),
936+
std::collections::HashMap::new(),
937+
)
938+
.unwrap();
939+
940+
// Create a qualified column reference as would be produced
941+
// in a query like: avg(memory_usage_bytes) / 1024
942+
// where the aggregation produces "value" but the binary expression
943+
// still references the original qualified name
944+
let qualified_col = col("memory_usage_bytes.value");
945+
946+
// Before the fix, this would fail with:
947+
// "No field named memory_usage_bytes.value. Valid fields are value."
948+
// After the fix, it should successfully resolve to the unqualified "value" field
949+
let data_type = qualified_col.get_type(&unqualified_schema).unwrap();
950+
assert_eq!(data_type, DataType::Float64);
951+
952+
// Test nullable resolution as well
953+
let nullable = qualified_col.nullable(&unqualified_schema).unwrap();
954+
assert!(!nullable);
955+
956+
// Test with binary expression
957+
let expr = qualified_col / lit(1024);
958+
let data_type = expr.get_type(&unqualified_schema).unwrap();
959+
assert_eq!(data_type, DataType::Float64);
960+
}
961+
962+
#[test]
963+
fn test_qualified_column_fallback_behavior() {
964+
// Test that the fallback only happens for qualified columns and preserves error messages
965+
let unqualified_schema = DFSchema::from_unqualified_fields(
966+
vec![Field::new("existing_col", DataType::Int32, true)].into(),
967+
std::collections::HashMap::new(),
968+
)
969+
.unwrap();
970+
971+
// Test 1: Qualified column that exists unqualified should work
972+
let qualified_existing = col("table.existing_col");
973+
assert!(qualified_existing.get_type(&unqualified_schema).is_ok());
974+
assert!(qualified_existing.nullable(&unqualified_schema).is_ok());
975+
976+
// Test 2: Qualified column that doesn't exist should return original error
977+
let qualified_nonexistent = col("table.nonexistent_col");
978+
let error = qualified_nonexistent
979+
.get_type(&unqualified_schema)
980+
.unwrap_err();
981+
assert!(error.to_string().contains("table.nonexistent_col"));
982+
983+
// Test 3: Unqualified column that doesn't exist should return original error (no fallback)
984+
let unqualified_nonexistent = col("nonexistent_col");
985+
let error = unqualified_nonexistent
986+
.get_type(&unqualified_schema)
987+
.unwrap_err();
988+
assert!(error.to_string().contains("nonexistent_col"));
989+
// Make sure it's not mentioning a qualified table prefix
990+
assert!(!error.to_string().contains("table.nonexistent_col"));
991+
}
992+
993+
#[test]
994+
fn test_aggregation_scenario() {
995+
// Test a realistic aggregation scenario
996+
use crate::logical_plan::builder::LogicalPlanBuilder;
997+
use crate::logical_plan::builder::LogicalTableSource;
998+
use arrow::datatypes::Schema;
999+
use std::sync::Arc;
1000+
1001+
// Create input table schema with qualified columns
1002+
let table_schema = Arc::new(Schema::new(vec![
1003+
Field::new("usage_bytes", DataType::Int64, false),
1004+
Field::new(
1005+
"timestamp",
1006+
DataType::Timestamp(arrow::datatypes::TimeUnit::Second, None),
1007+
false,
1008+
),
1009+
]));
1010+
1011+
// Build a plan that does aggregation
1012+
let plan = LogicalPlanBuilder::scan(
1013+
"metrics",
1014+
Arc::new(LogicalTableSource::new(table_schema)),
1015+
None,
1016+
)
1017+
.unwrap()
1018+
.aggregate(
1019+
Vec::<Expr>::new(), // no group by
1020+
vec![avg(col("metrics.usage_bytes"))], // avg with qualified column
1021+
)
1022+
.unwrap()
1023+
.build()
1024+
.unwrap();
1025+
1026+
// Get the output schema from the aggregation
1027+
let agg_schema = plan.schema();
1028+
1029+
// The aggregation output should have unqualified column names
1030+
// Let's create a qualified reference to test the fallback mechanism
1031+
let actual_column_name = agg_schema.field(0).name();
1032+
let qualified_ref =
1033+
Column::new(Some(TableReference::bare("metrics")), actual_column_name);
1034+
1035+
// This should work due to the fallback mechanism
1036+
let result = Expr::Column(qualified_ref).get_type(agg_schema);
1037+
assert!(
1038+
result.is_ok(),
1039+
"Failed to resolve qualified column after aggregation: {:?}",
1040+
result.err()
1041+
);
1042+
assert_eq!(result.unwrap(), DataType::Float64);
1043+
}
1044+
8841045
#[test]
8851046
fn test_expr_metadata() {
8861047
let mut meta = HashMap::new();

0 commit comments

Comments
 (0)