Skip to content

Commit 7a6ad4e

Browse files
committed
fix: resolve qualified column references after aggregation
Adds fallback logic to handle qualified column references when they fail to resolve directly in the schema. This commonly occurs when aggregations produce unqualified schemas but subsequent operations still reference qualified column names. The fix preserves original error messages and only applies the fallback for qualified columns that fail initial resolution.
1 parent 9e36ec4 commit 7a6ad4e

File tree

1 file changed

+161
-3
lines changed

1 file changed

+161
-3
lines changed

datafusion/expr/src/expr_schema.rs

Lines changed: 161 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,26 @@ impl ExprSchemable for Expr {
111111
_ => expr.get_type(schema),
112112
},
113113
Expr::Negative(expr) => expr.get_type(schema),
114-
Expr::Column(c) => Ok(schema.data_type(c)?.clone()),
114+
Expr::Column(c) => {
115+
// First try to resolve the column as-is
116+
match schema.data_type(c) {
117+
Ok(data_type) => Ok(data_type.clone()),
118+
Err(e) => {
119+
// If the column has a qualifier but wasn't found, try without the qualifier
120+
// This handles cases where aggregations produce unqualified schemas
121+
// but subsequent operations still reference the qualified names
122+
if c.relation.is_some() {
123+
let unqualified = Column::new_unqualified(&c.name);
124+
match schema.data_type(&unqualified) {
125+
Ok(data_type) => Ok(data_type.clone()),
126+
Err(_) => Err(e), // Return the original error
127+
}
128+
} else {
129+
Err(e)
130+
}
131+
}
132+
}
133+
}
115134
Expr::OuterReferenceColumn(field, _) => Ok(field.data_type().clone()),
116135
Expr::ScalarVariable(ty, _) => Ok(ty.clone()),
117136
Expr::Literal(l, _) => Ok(l.data_type()),
@@ -275,7 +294,26 @@ impl ExprSchemable for Expr {
275294
|| low.nullable(input_schema)?
276295
|| high.nullable(input_schema)?),
277296

278-
Expr::Column(c) => input_schema.nullable(c),
297+
Expr::Column(c) => {
298+
// First try to resolve the column as-is
299+
match input_schema.nullable(c) {
300+
Ok(nullable) => Ok(nullable),
301+
Err(e) => {
302+
// If the column has a qualifier but wasn't found, try without the qualifier
303+
// This handles cases where aggregations produce unqualified schemas
304+
// but subsequent operations still reference the qualified names
305+
if c.relation.is_some() {
306+
let unqualified = Column::new_unqualified(&c.name);
307+
match input_schema.nullable(&unqualified) {
308+
Ok(nullable) => Ok(nullable),
309+
Err(_) => Err(e), // Return the original error
310+
}
311+
} else {
312+
Err(e)
313+
}
314+
}
315+
}
316+
}
279317
Expr::OuterReferenceColumn(field, _) => Ok(field.is_nullable()),
280318
Expr::Literal(value, _) => Ok(value.is_null()),
281319
Expr::Case(case) => {
@@ -777,9 +815,10 @@ pub fn cast_subquery(subquery: Subquery, cast_to_type: &DataType) -> Result<Subq
777815
#[cfg(test)]
778816
mod tests {
779817
use super::*;
818+
use crate::test::function_stub::avg;
780819
use crate::{col, lit, out_ref_col_with_metadata};
781820

782-
use datafusion_common::{internal_err, DFSchema, HashMap, ScalarValue};
821+
use datafusion_common::{internal_err, DFSchema, HashMap, ScalarValue, Column, TableReference};
783822

784823
macro_rules! test_is_expr_nullable {
785824
($EXPR_TYPE:ident) => {{
@@ -881,6 +920,125 @@ mod tests {
881920
);
882921
}
883922

923+
#[test]
924+
fn test_qualified_column_after_aggregation() {
925+
// Test for qualified column reference resolution after aggregation
926+
// This test verifies the fix for the issue where binary expressions
927+
// fail when referencing qualified column names after aggregation
928+
// produces unqualified schemas.
929+
930+
// Create a schema that simulates the result of an aggregation
931+
// where the output field is unqualified (just "value")
932+
let unqualified_schema = DFSchema::from_unqualified_fields(
933+
vec![Field::new("value", DataType::Float64, false)].into(),
934+
std::collections::HashMap::new(),
935+
)
936+
.unwrap();
937+
938+
// Create a qualified column reference as would be produced
939+
// in a query like: avg(memory_usage_bytes) / 1024
940+
// where the aggregation produces "value" but the binary expression
941+
// still references the original qualified name
942+
let qualified_col = col("memory_usage_bytes.value");
943+
944+
// Before the fix, this would fail with:
945+
// "No field named memory_usage_bytes.value. Valid fields are value."
946+
// After the fix, it should successfully resolve to the unqualified "value" field
947+
let data_type = qualified_col.get_type(&unqualified_schema).unwrap();
948+
assert_eq!(data_type, DataType::Float64);
949+
950+
// Test nullable resolution as well
951+
let nullable = qualified_col.nullable(&unqualified_schema).unwrap();
952+
assert!(!nullable);
953+
954+
// Test with binary expression
955+
let expr = qualified_col / lit(1024);
956+
let data_type = expr.get_type(&unqualified_schema).unwrap();
957+
assert_eq!(data_type, DataType::Float64);
958+
}
959+
960+
#[test]
961+
fn test_qualified_column_fallback_behavior() {
962+
// Test that the fallback only happens for qualified columns and preserves error messages
963+
let unqualified_schema = DFSchema::from_unqualified_fields(
964+
vec![Field::new("existing_col", DataType::Int32, true)].into(),
965+
std::collections::HashMap::new(),
966+
)
967+
.unwrap();
968+
969+
// Test 1: Qualified column that exists unqualified should work
970+
let qualified_existing = col("table.existing_col");
971+
assert!(qualified_existing.get_type(&unqualified_schema).is_ok());
972+
assert!(qualified_existing.nullable(&unqualified_schema).is_ok());
973+
974+
// Test 2: Qualified column that doesn't exist should return original error
975+
let qualified_nonexistent = col("table.nonexistent_col");
976+
let error = qualified_nonexistent
977+
.get_type(&unqualified_schema)
978+
.unwrap_err();
979+
assert!(error.to_string().contains("table.nonexistent_col"));
980+
981+
// Test 3: Unqualified column that doesn't exist should return original error (no fallback)
982+
let unqualified_nonexistent = col("nonexistent_col");
983+
let error = unqualified_nonexistent
984+
.get_type(&unqualified_schema)
985+
.unwrap_err();
986+
assert!(error.to_string().contains("nonexistent_col"));
987+
// Make sure it's not mentioning a qualified table prefix
988+
assert!(!error.to_string().contains("table.nonexistent_col"));
989+
}
990+
991+
#[test]
992+
fn test_aggregation_scenario() {
993+
// Test a realistic aggregation scenario
994+
use crate::logical_plan::builder::LogicalPlanBuilder;
995+
use arrow::datatypes::Schema;
996+
use crate::logical_plan::builder::LogicalTableSource;
997+
use std::sync::Arc;
998+
999+
// Create input table schema with qualified columns
1000+
let table_schema = Arc::new(Schema::new(vec![
1001+
Field::new("usage_bytes", DataType::Int64, false),
1002+
Field::new(
1003+
"timestamp",
1004+
DataType::Timestamp(arrow::datatypes::TimeUnit::Second, None),
1005+
false,
1006+
),
1007+
]));
1008+
1009+
// Build a plan that does aggregation
1010+
let plan = LogicalPlanBuilder::scan(
1011+
"metrics",
1012+
Arc::new(LogicalTableSource::new(table_schema)),
1013+
None,
1014+
)
1015+
.unwrap()
1016+
.aggregate(
1017+
Vec::<Expr>::new(), // no group by
1018+
vec![avg(col("metrics.usage_bytes"))], // avg with qualified column
1019+
)
1020+
.unwrap()
1021+
.build()
1022+
.unwrap();
1023+
1024+
// Get the output schema from the aggregation
1025+
let agg_schema = plan.schema();
1026+
1027+
// The aggregation output should have unqualified column names
1028+
// Let's create a qualified reference to test the fallback mechanism
1029+
let actual_column_name = agg_schema.field(0).name();
1030+
let qualified_ref = Column::new(Some(TableReference::bare("metrics")), actual_column_name);
1031+
1032+
// This should work due to the fallback mechanism
1033+
let result = Expr::Column(qualified_ref).get_type(agg_schema);
1034+
assert!(
1035+
result.is_ok(),
1036+
"Failed to resolve qualified column after aggregation: {:?}",
1037+
result.err()
1038+
);
1039+
assert_eq!(result.unwrap(), DataType::Float64);
1040+
}
1041+
8841042
#[test]
8851043
fn test_expr_metadata() {
8861044
let mut meta = HashMap::new();

0 commit comments

Comments
 (0)