Skip to content

Commit c4fd754

Browse files
authored
Add catalog::resolve_table_references (#10876)
* resolve information_schema references only when necessary * add `catalog::resolve_table_references` as a public utility * collect CTEs separately in resolve_table_references * test CTE name shadowing * handle CTE name shadowing in resolve_table_references * handle unions, recursive and nested CTEs in resolve_table_references
1 parent 378b9ee commit c4fd754

File tree

3 files changed

+256
-86
lines changed

3 files changed

+256
-86
lines changed

datafusion/core/src/catalog/mod.rs

Lines changed: 237 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ use crate::catalog::schema::SchemaProvider;
2727
use dashmap::DashMap;
2828
use datafusion_common::{exec_err, not_impl_err, Result};
2929
use std::any::Any;
30+
use std::collections::BTreeSet;
31+
use std::ops::ControlFlow;
3032
use std::sync::Arc;
3133

3234
/// Represent a list of named [`CatalogProvider`]s.
@@ -157,11 +159,11 @@ impl CatalogProviderList for MemoryCatalogProviderList {
157159
/// access required to read table details (e.g. statistics).
158160
///
159161
/// The pattern that DataFusion itself uses to plan SQL queries is to walk over
160-
/// the query to [find all schema / table references in an `async` function],
162+
/// the query to [find all table references],
161163
/// performing required remote catalog in parallel, and then plans the query
162164
/// using that snapshot.
163165
///
164-
/// [find all schema / table references in an `async` function]: crate::execution::context::SessionState::resolve_table_references
166+
/// [find all table references]: resolve_table_references
165167
///
166168
/// # Example Catalog Implementations
167169
///
@@ -295,6 +297,182 @@ impl CatalogProvider for MemoryCatalogProvider {
295297
}
296298
}
297299

300+
/// Collects all tables and views referenced in the SQL statement. CTEs are collected separately.
301+
/// This can be used to determine which tables need to be in the catalog for a query to be planned.
302+
///
303+
/// # Returns
304+
///
305+
/// A `(table_refs, ctes)` tuple, the first element contains table and view references and the second
306+
/// element contains any CTE aliases that were defined and possibly referenced.
307+
///
308+
/// ## Example
309+
///
310+
/// ```
311+
/// # use datafusion_sql::parser::DFParser;
312+
/// # use datafusion::catalog::resolve_table_references;
313+
/// let query = "SELECT a FROM foo where x IN (SELECT y FROM bar)";
314+
/// let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
315+
/// let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
316+
/// assert_eq!(table_refs.len(), 2);
317+
/// assert_eq!(table_refs[0].to_string(), "bar");
318+
/// assert_eq!(table_refs[1].to_string(), "foo");
319+
/// assert_eq!(ctes.len(), 0);
320+
/// ```
321+
///
322+
/// ## Example with CTEs
323+
///
324+
/// ```
325+
/// # use datafusion_sql::parser::DFParser;
326+
/// # use datafusion::catalog::resolve_table_references;
327+
/// let query = "with my_cte as (values (1), (2)) SELECT * from my_cte;";
328+
/// let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
329+
/// let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
330+
/// assert_eq!(table_refs.len(), 0);
331+
/// assert_eq!(ctes.len(), 1);
332+
/// assert_eq!(ctes[0].to_string(), "my_cte");
333+
/// ```
334+
pub fn resolve_table_references(
335+
statement: &datafusion_sql::parser::Statement,
336+
enable_ident_normalization: bool,
337+
) -> datafusion_common::Result<(Vec<TableReference>, Vec<TableReference>)> {
338+
use crate::sql::planner::object_name_to_table_reference;
339+
use datafusion_sql::parser::{
340+
CopyToSource, CopyToStatement, Statement as DFStatement,
341+
};
342+
use information_schema::INFORMATION_SCHEMA;
343+
use information_schema::INFORMATION_SCHEMA_TABLES;
344+
use sqlparser::ast::*;
345+
346+
struct RelationVisitor {
347+
relations: BTreeSet<ObjectName>,
348+
all_ctes: BTreeSet<ObjectName>,
349+
ctes_in_scope: Vec<ObjectName>,
350+
}
351+
352+
impl RelationVisitor {
353+
/// Record the reference to `relation`, if it's not a CTE reference.
354+
fn insert_relation(&mut self, relation: &ObjectName) {
355+
if !self.relations.contains(relation)
356+
&& !self.ctes_in_scope.contains(relation)
357+
{
358+
self.relations.insert(relation.clone());
359+
}
360+
}
361+
}
362+
363+
impl Visitor for RelationVisitor {
364+
type Break = ();
365+
366+
fn pre_visit_relation(&mut self, relation: &ObjectName) -> ControlFlow<()> {
367+
self.insert_relation(relation);
368+
ControlFlow::Continue(())
369+
}
370+
371+
fn pre_visit_query(&mut self, q: &Query) -> ControlFlow<Self::Break> {
372+
if let Some(with) = &q.with {
373+
for cte in &with.cte_tables {
374+
// The non-recursive CTE name is not in scope when evaluating the CTE itself, so this is valid:
375+
// `WITH t AS (SELECT * FROM t) SELECT * FROM t`
376+
// Where the first `t` refers to a predefined table. So we are careful here
377+
// to visit the CTE first, before putting it in scope.
378+
if !with.recursive {
379+
// This is a bit hackish as the CTE will be visited again as part of visiting `q`,
380+
// but thankfully `insert_relation` is idempotent.
381+
cte.visit(self);
382+
}
383+
self.ctes_in_scope
384+
.push(ObjectName(vec![cte.alias.name.clone()]));
385+
}
386+
}
387+
ControlFlow::Continue(())
388+
}
389+
390+
fn post_visit_query(&mut self, q: &Query) -> ControlFlow<Self::Break> {
391+
if let Some(with) = &q.with {
392+
for _ in &with.cte_tables {
393+
// Unwrap: We just pushed these in `pre_visit_query`
394+
self.all_ctes.insert(self.ctes_in_scope.pop().unwrap());
395+
}
396+
}
397+
ControlFlow::Continue(())
398+
}
399+
400+
fn pre_visit_statement(&mut self, statement: &Statement) -> ControlFlow<()> {
401+
if let Statement::ShowCreate {
402+
obj_type: ShowCreateObject::Table | ShowCreateObject::View,
403+
obj_name,
404+
} = statement
405+
{
406+
self.insert_relation(obj_name)
407+
}
408+
409+
// SHOW statements will later be rewritten into a SELECT from the information_schema
410+
let requires_information_schema = matches!(
411+
statement,
412+
Statement::ShowFunctions { .. }
413+
| Statement::ShowVariable { .. }
414+
| Statement::ShowStatus { .. }
415+
| Statement::ShowVariables { .. }
416+
| Statement::ShowCreate { .. }
417+
| Statement::ShowColumns { .. }
418+
| Statement::ShowTables { .. }
419+
| Statement::ShowCollation { .. }
420+
);
421+
if requires_information_schema {
422+
for s in INFORMATION_SCHEMA_TABLES {
423+
self.relations.insert(ObjectName(vec![
424+
Ident::new(INFORMATION_SCHEMA),
425+
Ident::new(*s),
426+
]));
427+
}
428+
}
429+
ControlFlow::Continue(())
430+
}
431+
}
432+
433+
let mut visitor = RelationVisitor {
434+
relations: BTreeSet::new(),
435+
all_ctes: BTreeSet::new(),
436+
ctes_in_scope: vec![],
437+
};
438+
439+
fn visit_statement(statement: &DFStatement, visitor: &mut RelationVisitor) {
440+
match statement {
441+
DFStatement::Statement(s) => {
442+
let _ = s.as_ref().visit(visitor);
443+
}
444+
DFStatement::CreateExternalTable(table) => {
445+
visitor
446+
.relations
447+
.insert(ObjectName(vec![Ident::from(table.name.as_str())]));
448+
}
449+
DFStatement::CopyTo(CopyToStatement { source, .. }) => match source {
450+
CopyToSource::Relation(table_name) => {
451+
visitor.insert_relation(table_name);
452+
}
453+
CopyToSource::Query(query) => {
454+
query.visit(visitor);
455+
}
456+
},
457+
DFStatement::Explain(explain) => visit_statement(&explain.statement, visitor),
458+
}
459+
}
460+
461+
visit_statement(statement, &mut visitor);
462+
463+
let table_refs = visitor
464+
.relations
465+
.into_iter()
466+
.map(|x| object_name_to_table_reference(x, enable_ident_normalization))
467+
.collect::<datafusion_common::Result<_>>()?;
468+
let ctes = visitor
469+
.all_ctes
470+
.into_iter()
471+
.map(|x| object_name_to_table_reference(x, enable_ident_normalization))
472+
.collect::<datafusion_common::Result<_>>()?;
473+
Ok((table_refs, ctes))
474+
}
475+
298476
#[cfg(test)]
299477
mod tests {
300478
use super::*;
@@ -363,4 +541,61 @@ mod tests {
363541
let cat = Arc::new(MemoryCatalogProvider::new()) as Arc<dyn CatalogProvider>;
364542
assert!(cat.deregister_schema("foo", false).unwrap().is_none());
365543
}
544+
545+
#[test]
546+
fn resolve_table_references_shadowed_cte() {
547+
use datafusion_sql::parser::DFParser;
548+
549+
// An interesting edge case where the `t` name is used both as an ordinary table reference
550+
// and as a CTE reference.
551+
let query = "WITH t AS (SELECT * FROM t) SELECT * FROM t";
552+
let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
553+
let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
554+
assert_eq!(table_refs.len(), 1);
555+
assert_eq!(ctes.len(), 1);
556+
assert_eq!(ctes[0].to_string(), "t");
557+
assert_eq!(table_refs[0].to_string(), "t");
558+
559+
// UNION is a special case where the CTE is not in scope for the second branch.
560+
let query = "(with t as (select 1) select * from t) union (select * from t)";
561+
let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
562+
let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
563+
assert_eq!(table_refs.len(), 1);
564+
assert_eq!(ctes.len(), 1);
565+
assert_eq!(ctes[0].to_string(), "t");
566+
assert_eq!(table_refs[0].to_string(), "t");
567+
568+
// Nested CTEs are also handled.
569+
// Here the first `u` is a CTE, but the second `u` is a table reference.
570+
// While `t` is always a CTE.
571+
let query = "(with t as (with u as (select 1) select * from u) select * from u cross join t)";
572+
let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
573+
let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
574+
assert_eq!(table_refs.len(), 1);
575+
assert_eq!(ctes.len(), 2);
576+
assert_eq!(ctes[0].to_string(), "t");
577+
assert_eq!(ctes[1].to_string(), "u");
578+
assert_eq!(table_refs[0].to_string(), "u");
579+
}
580+
581+
#[test]
582+
fn resolve_table_references_recursive_cte() {
583+
use datafusion_sql::parser::DFParser;
584+
585+
let query = "
586+
WITH RECURSIVE nodes AS (
587+
SELECT 1 as id
588+
UNION ALL
589+
SELECT id + 1 as id
590+
FROM nodes
591+
WHERE id < 10
592+
)
593+
SELECT * FROM nodes
594+
";
595+
let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
596+
let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
597+
assert_eq!(table_refs.len(), 0);
598+
assert_eq!(ctes.len(), 1);
599+
assert_eq!(ctes[0].to_string(), "nodes");
600+
}
366601
}

datafusion/core/src/execution/session_state.rs

Lines changed: 12 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,12 @@ use datafusion_optimizer::{
6666
use datafusion_physical_expr::create_physical_expr;
6767
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
6868
use datafusion_physical_plan::ExecutionPlan;
69-
use datafusion_sql::parser::{CopyToSource, CopyToStatement, DFParser, Statement};
70-
use datafusion_sql::planner::{
71-
object_name_to_table_reference, ContextProvider, ParserOptions, SqlToRel,
72-
};
69+
use datafusion_sql::parser::{DFParser, Statement};
70+
use datafusion_sql::planner::{ContextProvider, ParserOptions, SqlToRel};
7371
use sqlparser::dialect::dialect_from_str;
7472
use std::collections::hash_map::Entry;
7573
use std::collections::{HashMap, HashSet};
7674
use std::fmt::Debug;
77-
use std::ops::ControlFlow;
7875
use std::sync::Arc;
7976
use url::Url;
8077
use uuid::Uuid;
@@ -493,91 +490,22 @@ impl SessionState {
493490
Ok(statement)
494491
}
495492

496-
/// Resolve all table references in the SQL statement.
493+
/// Resolve all table references in the SQL statement. Does not include CTE references.
494+
///
495+
/// See [`catalog::resolve_table_references`] for more information.
496+
///
497+
/// [`catalog::resolve_table_references`]: crate::catalog::resolve_table_references
497498
pub fn resolve_table_references(
498499
&self,
499500
statement: &datafusion_sql::parser::Statement,
500501
) -> datafusion_common::Result<Vec<TableReference>> {
501-
use crate::catalog::information_schema::INFORMATION_SCHEMA_TABLES;
502-
use datafusion_sql::parser::Statement as DFStatement;
503-
use sqlparser::ast::*;
504-
505-
// Getting `TableProviders` is async but planing is not -- thus pre-fetch
506-
// table providers for all relations referenced in this query
507-
let mut relations = hashbrown::HashSet::with_capacity(10);
508-
509-
struct RelationVisitor<'a>(&'a mut hashbrown::HashSet<ObjectName>);
510-
511-
impl<'a> RelationVisitor<'a> {
512-
/// Record that `relation` was used in this statement
513-
fn insert(&mut self, relation: &ObjectName) {
514-
self.0.get_or_insert_with(relation, |_| relation.clone());
515-
}
516-
}
517-
518-
impl<'a> Visitor for RelationVisitor<'a> {
519-
type Break = ();
520-
521-
fn pre_visit_relation(&mut self, relation: &ObjectName) -> ControlFlow<()> {
522-
self.insert(relation);
523-
ControlFlow::Continue(())
524-
}
525-
526-
fn pre_visit_statement(&mut self, statement: &Statement) -> ControlFlow<()> {
527-
if let Statement::ShowCreate {
528-
obj_type: ShowCreateObject::Table | ShowCreateObject::View,
529-
obj_name,
530-
} = statement
531-
{
532-
self.insert(obj_name)
533-
}
534-
ControlFlow::Continue(())
535-
}
536-
}
537-
538-
let mut visitor = RelationVisitor(&mut relations);
539-
fn visit_statement(statement: &DFStatement, visitor: &mut RelationVisitor<'_>) {
540-
match statement {
541-
DFStatement::Statement(s) => {
542-
let _ = s.as_ref().visit(visitor);
543-
}
544-
DFStatement::CreateExternalTable(table) => {
545-
visitor
546-
.0
547-
.insert(ObjectName(vec![Ident::from(table.name.as_str())]));
548-
}
549-
DFStatement::CopyTo(CopyToStatement { source, .. }) => match source {
550-
CopyToSource::Relation(table_name) => {
551-
visitor.insert(table_name);
552-
}
553-
CopyToSource::Query(query) => {
554-
query.visit(visitor);
555-
}
556-
},
557-
DFStatement::Explain(explain) => {
558-
visit_statement(&explain.statement, visitor)
559-
}
560-
}
561-
}
562-
563-
visit_statement(statement, &mut visitor);
564-
565-
// Always include information_schema if available
566-
if self.config.information_schema() {
567-
for s in INFORMATION_SCHEMA_TABLES {
568-
relations.insert(ObjectName(vec![
569-
Ident::new(INFORMATION_SCHEMA),
570-
Ident::new(*s),
571-
]));
572-
}
573-
}
574-
575502
let enable_ident_normalization =
576503
self.config.options().sql_parser.enable_ident_normalization;
577-
relations
578-
.into_iter()
579-
.map(|x| object_name_to_table_reference(x, enable_ident_normalization))
580-
.collect::<datafusion_common::Result<_>>()
504+
let (table_refs, _) = crate::catalog::resolve_table_references(
505+
statement,
506+
enable_ident_normalization,
507+
)?;
508+
Ok(table_refs)
581509
}
582510

583511
/// Convert an AST Statement into a LogicalPlan

datafusion/sqllogictest/test_files/cte.slt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -828,3 +828,10 @@ SELECT * FROM non_recursive_cte, recursive_cte;
828828
----
829829
1 1
830830
1 3
831+
832+
# Name shadowing:
833+
# The first `t` refers to the table, the second to the CTE.
834+
query I
835+
WITH t AS (SELECT * FROM t where t.a < 2) SELECT * FROM t
836+
----
837+
1

0 commit comments

Comments
 (0)