Skip to content

Commit 14495a3

Browse files
committed
move column, dfschema, etc. to common module
1 parent b15f973 commit 14495a3

File tree

6 files changed

+72
-857
lines changed

6 files changed

+72
-857
lines changed

datafusion-common/src/column.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ impl Column {
7575
}
7676

7777
// Internal implementation of normalize
78-
fn normalize_with_schemas(
78+
pub fn normalize_with_schemas(
7979
self,
8080
schemas: &[&Arc<DFSchema>],
8181
using_columns: &[HashSet<Column>],

datafusion-common/src/dfschema.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,40 @@ impl Display for DFSchema {
402402
}
403403
}
404404

405+
/// Provides schema information needed by [Expr] methods such as
406+
/// [Expr::nullable] and [Expr::data_type].
407+
///
408+
/// Note that this trait is implemented for &[DFSchema] which is
409+
/// widely used in the DataFusion codebase.
410+
pub trait ExprSchema {
411+
/// Is this column reference nullable?
412+
fn nullable(&self, col: &Column) -> Result<bool>;
413+
414+
/// What is the datatype of this column?
415+
fn data_type(&self, col: &Column) -> Result<&DataType>;
416+
}
417+
418+
// Implement `ExprSchema` for `Arc<DFSchema>`
419+
impl<P: AsRef<DFSchema>> ExprSchema for P {
420+
fn nullable(&self, col: &Column) -> Result<bool> {
421+
self.as_ref().nullable(col)
422+
}
423+
424+
fn data_type(&self, col: &Column) -> Result<&DataType> {
425+
self.as_ref().data_type(col)
426+
}
427+
}
428+
429+
impl ExprSchema for DFSchema {
430+
fn nullable(&self, col: &Column) -> Result<bool> {
431+
Ok(self.field_from_column(col)?.is_nullable())
432+
}
433+
434+
fn data_type(&self, col: &Column) -> Result<&DataType> {
435+
Ok(self.field_from_column(col)?.data_type())
436+
}
437+
}
438+
405439
/// DFField wraps an Arrow field and adds an optional qualifier
406440
#[derive(Debug, Clone, PartialEq, Eq)]
407441
pub struct DFField {

datafusion-common/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,5 @@ mod dfschema;
2020
mod error;
2121

2222
pub use column::Column;
23-
pub use dfschema::{DFField, DFSchema, DFSchemaRef, ToDFSchema};
23+
pub use dfschema::{DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema};
2424
pub use error::{DataFusionError, Result};

datafusion/src/logical_plan/builder.rs

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,17 @@ impl LogicalPlanBuilder {
595595
self.join_detailed(right, join_type, join_keys, false)
596596
}
597597

598+
fn normalize(
599+
plan: &LogicalPlan,
600+
column: impl Into<Column> + Clone,
601+
) -> Result<Column> {
602+
let schemas = plan.all_schemas();
603+
let using_columns = plan.using_columns()?;
604+
column
605+
.into()
606+
.normalize_with_schemas(&schemas, &using_columns)
607+
}
608+
598609
/// Apply a join with on constraint and specified null equality
599610
/// If null_equals_null is true then null == null, else null != null
600611
pub fn join_detailed(
@@ -633,7 +644,10 @@ impl LogicalPlanBuilder {
633644
match (l_is_left, l_is_right, r_is_left, r_is_right) {
634645
(_, Ok(_), Ok(_), _) => (Ok(r), Ok(l)),
635646
(Ok(_), _, _, Ok(_)) => (Ok(l), Ok(r)),
636-
_ => (l.normalize(&self.plan), r.normalize(right)),
647+
_ => (
648+
Self::normalize(&self.plan, l),
649+
Self::normalize(right, r),
650+
),
637651
}
638652
}
639653
(Some(lr), None) => {
@@ -643,9 +657,12 @@ impl LogicalPlanBuilder {
643657
right.schema().field_with_qualified_name(lr, &l.name);
644658

645659
match (l_is_left, l_is_right) {
646-
(Ok(_), _) => (Ok(l), r.normalize(right)),
647-
(_, Ok(_)) => (r.normalize(&self.plan), Ok(l)),
648-
_ => (l.normalize(&self.plan), r.normalize(right)),
660+
(Ok(_), _) => (Ok(l), Self::normalize(right, r)),
661+
(_, Ok(_)) => (Self::normalize(&self.plan, r), Ok(l)),
662+
_ => (
663+
Self::normalize(&self.plan, l),
664+
Self::normalize(right, r),
665+
),
649666
}
650667
}
651668
(None, Some(rr)) => {
@@ -655,22 +672,25 @@ impl LogicalPlanBuilder {
655672
right.schema().field_with_qualified_name(rr, &r.name);
656673

657674
match (r_is_left, r_is_right) {
658-
(Ok(_), _) => (Ok(r), l.normalize(right)),
659-
(_, Ok(_)) => (l.normalize(&self.plan), Ok(r)),
660-
_ => (l.normalize(&self.plan), r.normalize(right)),
675+
(Ok(_), _) => (Ok(r), Self::normalize(right, l)),
676+
(_, Ok(_)) => (Self::normalize(&self.plan, l), Ok(r)),
677+
_ => (
678+
Self::normalize(&self.plan, l),
679+
Self::normalize(right, r),
680+
),
661681
}
662682
}
663683
(None, None) => {
664684
let mut swap = false;
665-
let left_key =
666-
l.clone().normalize(&self.plan).or_else(|_| {
685+
let left_key = Self::normalize(&self.plan, l.clone())
686+
.or_else(|_| {
667687
swap = true;
668-
l.normalize(right)
688+
Self::normalize(right, l)
669689
});
670690
if swap {
671-
(r.normalize(&self.plan), left_key)
691+
(Self::normalize(&self.plan, r), left_key)
672692
} else {
673-
(left_key, r.normalize(right))
693+
(left_key, Self::normalize(right, r))
674694
}
675695
}
676696
}
@@ -705,11 +725,11 @@ impl LogicalPlanBuilder {
705725
let left_keys: Vec<Column> = using_keys
706726
.clone()
707727
.into_iter()
708-
.map(|c| c.into().normalize(&self.plan))
728+
.map(|c| Self::normalize(&self.plan, c))
709729
.collect::<Result<_>>()?;
710730
let right_keys: Vec<Column> = using_keys
711731
.into_iter()
712-
.map(|c| c.into().normalize(right))
732+
.map(|c| Self::normalize(right, c))
713733
.collect::<Result<_>>()?;
714734

715735
let on: Vec<(_, _)> = left_keys.into_iter().zip(right_keys.into_iter()).collect();

0 commit comments

Comments
 (0)