Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add schema_err! error macros with optional backtrace #8620

Merged
merged 9 commits into from
Jan 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ jobs:
RUSTFLAGS: "-C debuginfo=0 -C opt-level=0 -C incremental=false -C codegen-units=256"
RUST_BACKTRACE: "1"
# avoid rust stack overflows on tpc-ds tests
RUST_MINSTACK: "3000000"
RUST_MIN_STACK: "3000000"
- name: Verify Working Directory Clean
run: git diff --exit-code

Expand Down Expand Up @@ -316,7 +316,7 @@ jobs:
RUSTFLAGS: "-C debuginfo=line-tables-only"
RUST_BACKTRACE: "1"
# avoid rust stack overflows on tpc-ds tests
RUST_MINSTACK: "3000000"
RUST_MIN_STACK: "3000000"
macos:
name: cargo test (mac)
runs-on: macos-latest
Expand Down Expand Up @@ -356,7 +356,7 @@ jobs:
RUSTFLAGS: "-C debuginfo=0 -C opt-level=0 -C incremental=false -C codegen-units=256"
RUST_BACKTRACE: "1"
# avoid rust stack overflows on tpc-ds tests
RUST_MINSTACK: "3000000"
RUST_MIN_STACK: "3000000"

test-datafusion-pyarrow:
name: cargo test pyarrow (amd64)
Expand Down
17 changes: 8 additions & 9 deletions datafusion/common/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Column

use crate::error::_schema_err;
use crate::utils::{parse_identifiers_normalized, quote_identifier};
use crate::{DFSchema, DataFusionError, OwnedTableReference, Result, SchemaError};
use std::collections::HashSet;
Expand Down Expand Up @@ -211,13 +212,13 @@ impl Column {
}
}

Err(DataFusionError::SchemaError(SchemaError::FieldNotFound {
_schema_err!(SchemaError::FieldNotFound {
field: Box::new(Column::new(self.relation.clone(), self.name)),
valid_fields: schemas
.iter()
.flat_map(|s| s.fields().iter().map(|f| f.qualified_column()))
.collect(),
}))
})
}

/// Qualify column if not done yet.
Expand Down Expand Up @@ -299,23 +300,21 @@ impl Column {
}

// If not due to USING columns then due to ambiguous column name
return Err(DataFusionError::SchemaError(
SchemaError::AmbiguousReference {
field: Column::new_unqualified(self.name),
},
));
return _schema_err!(SchemaError::AmbiguousReference {
field: Column::new_unqualified(self.name),
});
}
}
}

Err(DataFusionError::SchemaError(SchemaError::FieldNotFound {
_schema_err!(SchemaError::FieldNotFound {
field: Box::new(self),
valid_fields: schemas
.iter()
.flat_map(|s| s.iter())
.flat_map(|s| s.fields().iter().map(|f| f.qualified_column()))
.collect(),
}))
})
}
}

Expand Down
35 changes: 15 additions & 20 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::sync::Arc;

use crate::error::{
unqualified_field_not_found, DataFusionError, Result, SchemaError, _plan_err,
_schema_err,
};
use crate::{
field_not_found, Column, FunctionalDependencies, OwnedTableReference, TableReference,
Expand Down Expand Up @@ -141,11 +142,9 @@ impl DFSchema {
if let Some(qualifier) = field.qualifier() {
qualified_names.insert((qualifier, field.name()));
} else if !unqualified_names.insert(field.name()) {
return Err(DataFusionError::SchemaError(
SchemaError::DuplicateUnqualifiedField {
name: field.name().to_string(),
},
));
return _schema_err!(SchemaError::DuplicateUnqualifiedField {
name: field.name().to_string(),
});
}
}

Expand All @@ -159,14 +158,12 @@ impl DFSchema {
qualified_names.sort();
for (qualifier, name) in &qualified_names {
if unqualified_names.contains(name) {
return Err(DataFusionError::SchemaError(
SchemaError::AmbiguousReference {
field: Column {
relation: Some((*qualifier).clone()),
name: name.to_string(),
},
},
));
return _schema_err!(SchemaError::AmbiguousReference {
field: Column {
relation: Some((*qualifier).clone()),
name: name.to_string(),
}
});
}
}
Ok(Self {
Expand Down Expand Up @@ -392,14 +389,12 @@ impl DFSchema {
if fields_without_qualifier.len() == 1 {
Ok(fields_without_qualifier[0])
} else {
Err(DataFusionError::SchemaError(
SchemaError::AmbiguousReference {
field: Column {
relation: None,
name: name.to_string(),
},
_schema_err!(SchemaError::AmbiguousReference {
field: Column {
relation: None,
name: name.to_string(),
},
))
})
}
}
}
Expand Down
93 changes: 60 additions & 33 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ pub enum DataFusionError {
Configuration(String),
/// This error happens with schema-related errors, such as schema inference not possible
/// and non-unique column names.
SchemaError(SchemaError),
/// 2nd argument is for optional backtrace
/// Boxing the optional backtrace to prevent <https://rust-lang.github.io/rust-clippy/master/index.html#/result_large_err>
SchemaError(SchemaError, Box<Option<String>>),
/// Error returned during execution of the query.
/// Examples include files not found, errors in parsing certain types.
Execution(String),
Expand Down Expand Up @@ -125,34 +127,6 @@ pub enum SchemaError {
},
}

/// Create a "field not found" DataFusion::SchemaError
pub fn field_not_found<R: Into<OwnedTableReference>>(
qualifier: Option<R>,
name: &str,
schema: &DFSchema,
) -> DataFusionError {
DataFusionError::SchemaError(SchemaError::FieldNotFound {
field: Box::new(Column::new(qualifier, name)),
valid_fields: schema
.fields()
.iter()
.map(|f| f.qualified_column())
.collect(),
})
}

/// Convenience wrapper over [`field_not_found`] for when there is no qualifier
pub fn unqualified_field_not_found(name: &str, schema: &DFSchema) -> DataFusionError {
DataFusionError::SchemaError(SchemaError::FieldNotFound {
field: Box::new(Column::new_unqualified(name)),
valid_fields: schema
.fields()
.iter()
.map(|f| f.qualified_column())
.collect(),
})
}

impl Display for SchemaError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Expand Down Expand Up @@ -298,7 +272,7 @@ impl Display for DataFusionError {
write!(f, "IO error: {desc}")
}
DataFusionError::SQL(ref desc, ref backtrace) => {
let backtrace = backtrace.clone().unwrap_or("".to_owned());
let backtrace: String = backtrace.clone().unwrap_or("".to_owned());
write!(f, "SQL error: {desc:?}{backtrace}")
}
DataFusionError::Configuration(ref desc) => {
Expand All @@ -314,8 +288,10 @@ impl Display for DataFusionError {
DataFusionError::Plan(ref desc) => {
write!(f, "Error during planning: {desc}")
}
DataFusionError::SchemaError(ref desc) => {
write!(f, "Schema error: {desc}")
DataFusionError::SchemaError(ref desc, ref backtrace) => {
let backtrace: &str =
&backtrace.as_ref().clone().unwrap_or("".to_owned());
write!(f, "Schema error: {desc}{backtrace}")
}
DataFusionError::Execution(ref desc) => {
write!(f, "Execution error: {desc}")
Expand Down Expand Up @@ -356,7 +332,7 @@ impl Error for DataFusionError {
DataFusionError::Internal(_) => None,
DataFusionError::Configuration(_) => None,
DataFusionError::Plan(_) => None,
DataFusionError::SchemaError(e) => Some(e),
DataFusionError::SchemaError(e, _) => Some(e),
DataFusionError::Execution(_) => None,
DataFusionError::ResourcesExhausted(_) => None,
DataFusionError::External(e) => Some(e.as_ref()),
Expand Down Expand Up @@ -556,12 +532,63 @@ macro_rules! arrow_err {
};
}

// Exposes a macro to create `DataFusionError::SchemaError` with optional backtrace
#[macro_export]
macro_rules! schema_datafusion_err {
($ERR:expr) => {
DataFusionError::SchemaError(
$ERR,
Box::new(Some(DataFusionError::get_back_trace())),
)
};
}

// Exposes a macro to create `Err(DataFusionError::SchemaError)` with optional backtrace
#[macro_export]
macro_rules! schema_err {
($ERR:expr) => {
Err(DataFusionError::SchemaError(
$ERR,
Box::new(Some(DataFusionError::get_back_trace())),
))
};
}

// To avoid compiler error when using macro in the same crate:
// macros from the current crate cannot be referred to by absolute paths
pub use internal_datafusion_err as _internal_datafusion_err;
pub use internal_err as _internal_err;
pub use not_impl_err as _not_impl_err;
pub use plan_err as _plan_err;
pub use schema_err as _schema_err;

/// Create a "field not found" DataFusion::SchemaError
pub fn field_not_found<R: Into<OwnedTableReference>>(
qualifier: Option<R>,
name: &str,
schema: &DFSchema,
) -> DataFusionError {
schema_datafusion_err!(SchemaError::FieldNotFound {
field: Box::new(Column::new(qualifier, name)),
valid_fields: schema
.fields()
.iter()
.map(|f| f.qualified_column())
.collect(),
})
}

/// Convenience wrapper over [`field_not_found`] for when there is no qualifier
pub fn unqualified_field_not_found(name: &str, schema: &DFSchema) -> DataFusionError {
schema_datafusion_err!(SchemaError::FieldNotFound {
field: Box::new(Column::new_unqualified(name)),
valid_fields: schema
.fields()
.iter()
.map(|f| f.qualified_column())
.collect(),
})
}

#[cfg(test)]
mod test {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1186,7 +1186,7 @@ impl DataFrame {
let field_to_rename = match self.plan.schema().field_from_column(&old_column) {
Ok(field) => field,
// no-op if field not found
Err(DataFusionError::SchemaError(SchemaError::FieldNotFound { .. })) => {
Err(DataFusionError::SchemaError(SchemaError::FieldNotFound { .. }, _)) => {
return Ok(self)
}
Err(err) => return Err(err),
Expand Down
34 changes: 20 additions & 14 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1845,13 +1845,16 @@ mod tests {
.project(vec![col("id"), col("first_name").alias("id")]);

match plan {
Err(DataFusionError::SchemaError(SchemaError::AmbiguousReference {
field:
Column {
relation: Some(OwnedTableReference::Bare { table }),
name,
},
})) => {
Err(DataFusionError::SchemaError(
SchemaError::AmbiguousReference {
field:
Column {
relation: Some(OwnedTableReference::Bare { table }),
name,
},
},
_,
)) => {
assert_eq!("employee_csv", table);
assert_eq!("id", &name);
Ok(())
Expand All @@ -1872,13 +1875,16 @@ mod tests {
.aggregate(vec![col("state")], vec![sum(col("salary")).alias("state")]);

match plan {
Err(DataFusionError::SchemaError(SchemaError::AmbiguousReference {
field:
Column {
relation: Some(OwnedTableReference::Bare { table }),
name,
},
})) => {
Err(DataFusionError::SchemaError(
SchemaError::AmbiguousReference {
field:
Column {
relation: Some(OwnedTableReference::Bare { table }),
name,
},
},
_,
)) => {
assert_eq!("employee_csv", table);
assert_eq!("state", &name);
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// Default expressions are restricted, column references are not allowed
let empty_schema = DFSchema::empty();
let error_desc = |e: DataFusionError| match e {
DataFusionError::SchemaError(SchemaError::FieldNotFound { .. }) => {
DataFusionError::SchemaError(SchemaError::FieldNotFound { .. }, _) => {
plan_datafusion_err!(
"Column reference is not allowed in the DEFAULT expression : {}",
e
Expand Down
15 changes: 7 additions & 8 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ use arrow_schema::DataType;
use datafusion_common::file_options::StatementOptions;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{
not_impl_err, plan_datafusion_err, plan_err, unqualified_field_not_found, Column,
Constraints, DFField, DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference,
Result, ScalarValue, SchemaReference, TableReference, ToDFSchema,
not_impl_err, plan_datafusion_err, plan_err, schema_err, unqualified_field_not_found,
Column, Constraints, DFField, DFSchema, DFSchemaRef, DataFusionError,
OwnedTableReference, Result, ScalarValue, SchemaError, SchemaReference,
TableReference, ToDFSchema,
};
use datafusion_expr::dml::{CopyOptions, CopyTo};
use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
Expand Down Expand Up @@ -1138,11 +1139,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.index_of_column_by_name(None, &c)?
.ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?;
if value_indices[column_index].is_some() {
return Err(DataFusionError::SchemaError(
datafusion_common::SchemaError::DuplicateUnqualifiedField {
name: c,
},
));
return schema_err!(SchemaError::DuplicateUnqualifiedField {
name: c,
});
} else {
value_indices[column_index] = Some(i);
}
Expand Down
6 changes: 4 additions & 2 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,9 +756,11 @@ fn join_with_ambiguous_column() {
#[test]
fn where_selection_with_ambiguous_column() {
let sql = "SELECT * FROM person a, person b WHERE id = id + 1";
let err = logical_plan(sql).expect_err("query should have failed");
let err = logical_plan(sql)
.expect_err("query should have failed")
.strip_backtrace();
assert_eq!(
"SchemaError(AmbiguousReference { field: Column { relation: None, name: \"id\" } })",
"\"Schema error: Ambiguous reference to unqualified field id\"",
format!("{err:?}")
);
}
Expand Down