Skip to content

Commit ab7e7ed

Browse files
committed
add catalog::resolve_table_references as a public utility
1 parent c645b55 commit ab7e7ed

File tree

2 files changed

+118
-96
lines changed

2 files changed

+118
-96
lines changed

datafusion/core/src/catalog/mod.rs

Lines changed: 115 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::catalog::schema::SchemaProvider;
2727
use dashmap::DashMap;
2828
use datafusion_common::{exec_err, not_impl_err, Result};
2929
use std::any::Any;
30+
use std::ops::ControlFlow;
3031
use std::sync::Arc;
3132

3233
/// Represent a list of named [`CatalogProvider`]s.
@@ -157,11 +158,11 @@ impl CatalogProviderList for MemoryCatalogProviderList {
157158
/// access required to read table details (e.g. statistics).
158159
///
159160
/// The pattern that DataFusion itself uses to plan SQL queries is to walk over
160-
/// the query to [find all schema / table references in an `async` function],
161+
/// the query to [find all table references],
161162
/// performing required remote catalog in parallel, and then plans the query
162163
/// using that snapshot.
163164
///
164-
/// [find all schema / table references in an `async` function]: crate::execution::context::SessionState::resolve_table_references
165+
/// [find all table references]: resolve_table_references
165166
///
166167
/// # Example Catalog Implementations
167168
///
@@ -295,6 +296,118 @@ impl CatalogProvider for MemoryCatalogProvider {
295296
}
296297
}
297298

299+
/// Resolve all table references in the SQL statement.
300+
///
301+
/// ## Example
302+
///
303+
/// ```
304+
/// use datafusion_sql::parser::DFParser;
305+
/// use datafusion::catalog::resolve_table_references;
306+
///
307+
/// let query = "SELECT a FROM foo where x IN (SELECT y FROM bar)";
308+
/// let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
309+
/// let table_refs = resolve_table_references(&statement, true).unwrap();
310+
/// assert_eq!(table_refs.len(), 2);
311+
/// assert_eq!(table_refs[0].to_string(), "foo");
312+
/// assert_eq!(table_refs[1].to_string(), "bar");
313+
/// ```
314+
pub fn resolve_table_references(
315+
statement: &datafusion_sql::parser::Statement,
316+
enable_ident_normalization: bool,
317+
) -> datafusion_common::Result<Vec<TableReference>> {
318+
use crate::sql::planner::object_name_to_table_reference;
319+
use datafusion_sql::parser::{
320+
CopyToSource, CopyToStatement, Statement as DFStatement,
321+
};
322+
use information_schema::INFORMATION_SCHEMA;
323+
use information_schema::INFORMATION_SCHEMA_TABLES;
324+
use sqlparser::ast::*;
325+
326+
// Getting `TableProviders` is async but planing is not -- thus pre-fetch
327+
// table providers for all relations referenced in this query
328+
let mut relations = hashbrown::HashSet::with_capacity(10);
329+
330+
struct RelationVisitor<'a>(&'a mut hashbrown::HashSet<ObjectName>);
331+
332+
impl<'a> RelationVisitor<'a> {
333+
/// Record that `relation` was used in this statement
334+
fn insert(&mut self, relation: &ObjectName) {
335+
self.0.get_or_insert_with(relation, |_| relation.clone());
336+
}
337+
}
338+
339+
impl<'a> Visitor for RelationVisitor<'a> {
340+
type Break = ();
341+
342+
fn pre_visit_relation(&mut self, relation: &ObjectName) -> ControlFlow<()> {
343+
self.insert(relation);
344+
ControlFlow::Continue(())
345+
}
346+
347+
fn pre_visit_statement(&mut self, statement: &Statement) -> ControlFlow<()> {
348+
if let Statement::ShowCreate {
349+
obj_type: ShowCreateObject::Table | ShowCreateObject::View,
350+
obj_name,
351+
} = statement
352+
{
353+
self.insert(obj_name)
354+
}
355+
356+
// SHOW statements will later be rewritten into a SELECT from the information_schema
357+
let requires_information_schema = matches!(
358+
statement,
359+
Statement::ShowFunctions { .. }
360+
| Statement::ShowVariable { .. }
361+
| Statement::ShowStatus { .. }
362+
| Statement::ShowVariables { .. }
363+
| Statement::ShowCreate { .. }
364+
| Statement::ShowColumns { .. }
365+
| Statement::ShowTables { .. }
366+
| Statement::ShowCollation { .. }
367+
);
368+
if requires_information_schema {
369+
for s in INFORMATION_SCHEMA_TABLES {
370+
self.0.insert(ObjectName(vec![
371+
Ident::new(INFORMATION_SCHEMA),
372+
Ident::new(*s),
373+
]));
374+
}
375+
}
376+
ControlFlow::Continue(())
377+
}
378+
}
379+
380+
let mut visitor = RelationVisitor(&mut relations);
381+
fn visit_statement(statement: &DFStatement, visitor: &mut RelationVisitor<'_>) {
382+
match statement {
383+
DFStatement::Statement(s) => {
384+
let _ = s.as_ref().visit(visitor);
385+
}
386+
DFStatement::CreateExternalTable(table) => {
387+
visitor
388+
.0
389+
.insert(ObjectName(vec![Ident::from(table.name.as_str())]));
390+
}
391+
DFStatement::CopyTo(CopyToStatement { source, .. }) => match source {
392+
CopyToSource::Relation(table_name) => {
393+
visitor.insert(table_name);
394+
}
395+
CopyToSource::Query(query) => {
396+
query.visit(visitor);
397+
}
398+
},
399+
DFStatement::Explain(explain) => visit_statement(&explain.statement, visitor),
400+
}
401+
}
402+
403+
visit_statement(statement, &mut visitor);
404+
405+
relations
406+
.into_iter()
407+
.map(|x| object_name_to_table_reference(x, enable_ident_normalization))
408+
.collect::<datafusion_common::Result<_>>()
409+
}
410+
298411
#[cfg(test)]
299412
mod tests {
300413
use super::*;

datafusion/core/src/execution/session_state.rs

Lines changed: 3 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,12 @@ use datafusion_optimizer::{
6666
use datafusion_physical_expr::create_physical_expr;
6767
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
6868
use datafusion_physical_plan::ExecutionPlan;
69-
use datafusion_sql::parser::{CopyToSource, CopyToStatement, DFParser, Statement};
70-
use datafusion_sql::planner::{
71-
object_name_to_table_reference, ContextProvider, ParserOptions, SqlToRel,
72-
};
69+
use datafusion_sql::parser::{DFParser, Statement};
70+
use datafusion_sql::planner::{ContextProvider, ParserOptions, SqlToRel};
7371
use sqlparser::dialect::dialect_from_str;
7472
use std::collections::hash_map::Entry;
7573
use std::collections::{HashMap, HashSet};
7674
use std::fmt::Debug;
77-
use std::ops::ControlFlow;
7875
use std::sync::Arc;
7976
use url::Url;
8077
use uuid::Uuid;
@@ -498,97 +495,9 @@ impl SessionState {
498495
&self,
499496
statement: &datafusion_sql::parser::Statement,
500497
) -> datafusion_common::Result<Vec<TableReference>> {
501-
use crate::catalog::information_schema::INFORMATION_SCHEMA_TABLES;
502-
use datafusion_sql::parser::Statement as DFStatement;
503-
use sqlparser::ast::*;
504-
505-
// Getting `TableProviders` is async but planing is not -- thus pre-fetch
506-
// table providers for all relations referenced in this query
507-
let mut relations = hashbrown::HashSet::with_capacity(10);
508-
509-
struct RelationVisitor<'a>(&'a mut hashbrown::HashSet<ObjectName>);
510-
511-
impl<'a> RelationVisitor<'a> {
512-
/// Record that `relation` was used in this statement
513-
fn insert(&mut self, relation: &ObjectName) {
514-
self.0.get_or_insert_with(relation, |_| relation.clone());
515-
}
516-
}
517-
518-
impl<'a> Visitor for RelationVisitor<'a> {
519-
type Break = ();
520-
521-
fn pre_visit_relation(&mut self, relation: &ObjectName) -> ControlFlow<()> {
522-
self.insert(relation);
523-
ControlFlow::Continue(())
524-
}
525-
526-
fn pre_visit_statement(&mut self, statement: &Statement) -> ControlFlow<()> {
527-
if let Statement::ShowCreate {
528-
obj_type: ShowCreateObject::Table | ShowCreateObject::View,
529-
obj_name,
530-
} = statement
531-
{
532-
self.insert(obj_name)
533-
}
534-
535-
// SHOW statements will later be rewritten into a SELECT from the information_schema
536-
let requires_information_schema = matches!(
537-
statement,
538-
Statement::ShowFunctions { .. }
539-
| Statement::ShowVariable { .. }
540-
| Statement::ShowStatus { .. }
541-
| Statement::ShowVariables { .. }
542-
| Statement::ShowCreate { .. }
543-
| Statement::ShowColumns { .. }
544-
| Statement::ShowTables { .. }
545-
| Statement::ShowCollation { .. }
546-
);
547-
if requires_information_schema {
548-
for s in INFORMATION_SCHEMA_TABLES {
549-
self.0.insert(ObjectName(vec![
550-
Ident::new(INFORMATION_SCHEMA),
551-
Ident::new(*s),
552-
]));
553-
}
554-
}
555-
ControlFlow::Continue(())
556-
}
557-
}
558-
559-
let mut visitor = RelationVisitor(&mut relations);
560-
fn visit_statement(statement: &DFStatement, visitor: &mut RelationVisitor<'_>) {
561-
match statement {
562-
DFStatement::Statement(s) => {
563-
let _ = s.as_ref().visit(visitor);
564-
}
565-
DFStatement::CreateExternalTable(table) => {
566-
visitor
567-
.0
568-
.insert(ObjectName(vec![Ident::from(table.name.as_str())]));
569-
}
570-
DFStatement::CopyTo(CopyToStatement { source, .. }) => match source {
571-
CopyToSource::Relation(table_name) => {
572-
visitor.insert(table_name);
573-
}
574-
CopyToSource::Query(query) => {
575-
query.visit(visitor);
576-
}
577-
},
578-
DFStatement::Explain(explain) => {
579-
visit_statement(&explain.statement, visitor)
580-
}
581-
}
582-
}
583-
584-
visit_statement(statement, &mut visitor);
585-
586498
let enable_ident_normalization =
587499
self.config.options().sql_parser.enable_ident_normalization;
588-
relations
589-
.into_iter()
590-
.map(|x| object_name_to_table_reference(x, enable_ident_normalization))
591-
.collect::<datafusion_common::Result<_>>()
500+
crate::catalog::resolve_table_references(statement, enable_ident_normalization)
592501
}
593502

594503
/// Convert an AST Statement into a LogicalPlan

0 commit comments

Comments
 (0)