Skip to content

Commit 2e535f9

Browse files
authored
move dfschema and column (#1758)
1 parent d014ff2 commit 2e535f9

File tree

6 files changed

+912
-855
lines changed

6 files changed

+912
-855
lines changed

datafusion-common/src/column.rs

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Column
19+
20+
use crate::{DFSchema, DataFusionError, Result};
21+
use std::collections::HashSet;
22+
use std::convert::Infallible;
23+
use std::fmt;
24+
use std::str::FromStr;
25+
use std::sync::Arc;
26+
27+
/// A named reference to a qualified field in a schema.
28+
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
29+
pub struct Column {
30+
/// relation/table name.
31+
pub relation: Option<String>,
32+
/// field/column name.
33+
pub name: String,
34+
}
35+
36+
impl Column {
37+
/// Create Column from unqualified name.
38+
pub fn from_name(name: impl Into<String>) -> Self {
39+
Self {
40+
relation: None,
41+
name: name.into(),
42+
}
43+
}
44+
45+
/// Deserialize a fully qualified name string into a column
46+
pub fn from_qualified_name(flat_name: &str) -> Self {
47+
use sqlparser::tokenizer::Token;
48+
49+
let dialect = sqlparser::dialect::GenericDialect {};
50+
let mut tokenizer = sqlparser::tokenizer::Tokenizer::new(&dialect, flat_name);
51+
if let Ok(tokens) = tokenizer.tokenize() {
52+
if let [Token::Word(relation), Token::Period, Token::Word(name)] =
53+
tokens.as_slice()
54+
{
55+
return Column {
56+
relation: Some(relation.value.clone()),
57+
name: name.value.clone(),
58+
};
59+
}
60+
}
61+
// any expression that's not in the form of `foo.bar` will be treated as unqualified column
62+
// name
63+
Column {
64+
relation: None,
65+
name: String::from(flat_name),
66+
}
67+
}
68+
69+
/// Serialize column into a flat name string
70+
pub fn flat_name(&self) -> String {
71+
match &self.relation {
72+
Some(r) => format!("{}.{}", r, self.name),
73+
None => self.name.clone(),
74+
}
75+
}
76+
77+
// Internal implementation of normalize
78+
pub fn normalize_with_schemas(
79+
self,
80+
schemas: &[&Arc<DFSchema>],
81+
using_columns: &[HashSet<Column>],
82+
) -> Result<Self> {
83+
if self.relation.is_some() {
84+
return Ok(self);
85+
}
86+
87+
for schema in schemas {
88+
let fields = schema.fields_with_unqualified_name(&self.name);
89+
match fields.len() {
90+
0 => continue,
91+
1 => {
92+
return Ok(fields[0].qualified_column());
93+
}
94+
_ => {
95+
// More than 1 fields in this schema have their names set to self.name.
96+
//
97+
// This should only happen when a JOIN query with USING constraint references
98+
// join columns using unqualified column name. For example:
99+
//
100+
// ```sql
101+
// SELECT id FROM t1 JOIN t2 USING(id)
102+
// ```
103+
//
104+
// In this case, both `t1.id` and `t2.id` will match unqualified column `id`.
105+
// We will use the relation from the first matched field to normalize self.
106+
107+
// Compare matched fields with one USING JOIN clause at a time
108+
for using_col in using_columns {
109+
let all_matched = fields
110+
.iter()
111+
.all(|f| using_col.contains(&f.qualified_column()));
112+
// All matched fields belong to the same using column set, in orther words
113+
// the same join clause. We simply pick the qualifer from the first match.
114+
if all_matched {
115+
return Ok(fields[0].qualified_column());
116+
}
117+
}
118+
}
119+
}
120+
}
121+
122+
Err(DataFusionError::Plan(format!(
123+
"Column {} not found in provided schemas",
124+
self
125+
)))
126+
}
127+
}
128+
129+
impl From<&str> for Column {
130+
fn from(c: &str) -> Self {
131+
Self::from_qualified_name(c)
132+
}
133+
}
134+
135+
impl FromStr for Column {
136+
type Err = Infallible;
137+
138+
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
139+
Ok(s.into())
140+
}
141+
}
142+
143+
impl fmt::Display for Column {
144+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
145+
match &self.relation {
146+
Some(r) => write!(f, "#{}.{}", r, self.name),
147+
None => write!(f, "#{}", self.name),
148+
}
149+
}
150+
}

0 commit comments

Comments
 (0)