Skip to content

Add Support for Dynamic SQL Macros for Flexible Column Selection #15926

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 58 additions & 5 deletions datafusion-examples/examples/sql_frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::common::MacroCatalog;
use datafusion::common::{plan_err, TableReference};
use datafusion::config::ConfigOptions;
use datafusion::error::Result;
Expand All @@ -26,6 +27,7 @@ use datafusion::logical_expr::{
use datafusion::optimizer::{
Analyzer, AnalyzerRule, Optimizer, OptimizerConfig, OptimizerContext, OptimizerRule,
};
use datafusion::sql::macro_context::MacroContextProvider;
use datafusion::sql::planner::{ContextProvider, SqlToRel};
use datafusion::sql::sqlparser::dialect::PostgreSqlDialect;
use datafusion::sql::sqlparser::parser::Parser;
Expand All @@ -52,12 +54,63 @@ pub fn main() -> Result<()> {
let sql = "SELECT name FROM person WHERE age BETWEEN 21 AND 32";
let statements = Parser::parse_sql(&dialect, sql)?;

// Now, use DataFusion's SQL planner, called `SqlToRel` to create a
// `LogicalPlan` from the parsed statement
struct MacroAdapter<T: ContextProvider> {
inner: T,
}

impl<T: ContextProvider> MacroContextProvider for MacroAdapter<T> {
fn macro_catalog(&self) -> Result<Arc<dyn MacroCatalog>> {
plan_err!("SQL macros are not supported in this example")
}
}

impl<T: ContextProvider> ContextProvider for MacroAdapter<T> {
fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
self.inner.get_table_source(name)
}

fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
self.inner.get_function_meta(name)
}

fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
self.inner.get_aggregate_meta(name)
}

fn get_variable_type(&self, var_names: &[String]) -> Option<DataType> {
self.inner.get_variable_type(var_names)
}

fn get_window_meta(&self, name: &str) -> Option<Arc<WindowUDF>> {
self.inner.get_window_meta(name)
}

fn options(&self) -> &ConfigOptions {
self.inner.options()
}

fn udf_names(&self) -> Vec<String> {
self.inner.udf_names()
}

fn udaf_names(&self) -> Vec<String> {
self.inner.udaf_names()
}

fn udwf_names(&self) -> Vec<String> {
self.inner.udwf_names()
}
}

// Now, to use DataFusion's SQL planner (SqlToRel) to create a LogicalPlan,
// we must provide schema and function information via an object that
// implements the ContextProvider trait.
//
// To invoke SqlToRel we must provide it schema and function information
// via an object that implements the `ContextProvider` trait
let context_provider = MyContextProvider::default();
// DataFusion also requires the MacroContextProvider trait for SQL macro support
let base_provider = MyContextProvider::default();
let context_provider = MacroAdapter {
inner: base_provider,
};
let sql_to_rel = SqlToRel::new(&context_provider);
let logical_plan = sql_to_rel.sql_statement_to_plan(statements[0].clone())?;

Expand Down
2 changes: 2 additions & 0 deletions datafusion/catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,5 @@ pub trait CatalogProviderList: Debug + Sync + Send {
/// Retrieves a specific catalog by name, provided it exists.
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>>;
}

pub use datafusion_common::MacroCatalog;
2 changes: 2 additions & 0 deletions datafusion/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ pub mod view;
mod r#async;
mod catalog;
mod dynamic_file;
mod macro_catalog;
mod schema;
mod table;

pub use catalog::*;
pub use datafusion_session::Session;
pub use dynamic_file::catalog::*;
pub use macro_catalog::*;
pub use memory::{
MemTable, MemoryCatalogProvider, MemoryCatalogProviderList, MemorySchemaProvider,
};
Expand Down
82 changes: 82 additions & 0 deletions datafusion/catalog/src/macro_catalog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};

use datafusion_common::{exec_err, MacroCatalog, MacroDefinition, Result};

/// Simple in-memory implementation of a macro catalog.
///
/// This catalog stores macro definitions in memory with no persistence.
/// Macro definitions are stored in a map with the macro name as key.
#[derive(Debug, Default)]
pub struct MemoryMacroCatalog {
macros: RwLock<HashMap<String, Arc<MacroDefinition>>>,
}

impl MemoryMacroCatalog {
pub fn new() -> Self {
Self {
macros: RwLock::new(HashMap::new()),
}
}
}

impl MacroCatalog for MemoryMacroCatalog {
fn as_any(&self) -> &dyn Any {
self
}

fn macro_exists(&self, name: &str) -> bool {
self.macros.read().unwrap().contains_key(name)
}

fn register_macro(
&self,
name: &str,
macro_def: Arc<MacroDefinition>,
or_replace: bool,
) -> Result<()> {
let mut macros = self.macros.write().unwrap();

if !or_replace && macros.contains_key(name) {
return exec_err!(
"Macro {name} already exists and OR REPLACE was not specified"
);
}

macros.insert(name.to_string(), macro_def);
Ok(())
}

fn get_macro(&self, name: &str) -> Result<Arc<MacroDefinition>> {
let macros = self.macros.read().unwrap();

match macros.get(name) {
Some(macro_def) => Ok(Arc::clone(macro_def)),
None => exec_err!("Macro {name} does not exist"),
}
}

fn drop_macro(&self, name: &str) -> Result<Option<Arc<MacroDefinition>>> {
let mut macros = self.macros.write().unwrap();

Ok(macros.remove(name))
}
}
106 changes: 106 additions & 0 deletions datafusion/catalog/tests/macro_catalog_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion_catalog::MemoryMacroCatalog;
use datafusion_common::{MacroCatalog, MacroDefinition};
use std::sync::Arc;

#[test]
fn test_memory_macro_catalog() {
let catalog = MemoryMacroCatalog::new();

let macro_def = MacroDefinition {
name: "test_macro".to_string(),
parameters: vec!["param1".to_string(), "param2".to_string()],
body: "SELECT * FROM table WHERE id > param1 AND name LIKE param2".to_string(),
};

catalog
.register_macro("test_macro", Arc::new(macro_def), false)
.unwrap();

assert!(catalog.macro_exists("test_macro"));

let retrieved_macro = catalog.get_macro("test_macro").unwrap();
assert_eq!(retrieved_macro.name, "test_macro");
assert_eq!(retrieved_macro.parameters.len(), 2);
assert_eq!(retrieved_macro.parameters[0], "param1");
assert_eq!(retrieved_macro.parameters[1], "param2");

let replacement_macro = MacroDefinition {
name: "test_macro".to_string(),
parameters: vec!["single_param".to_string()],
body: "SELECT id FROM table WHERE id = single_param".to_string(),
};

catalog
.register_macro("test_macro", Arc::new(replacement_macro), true)
.unwrap();

let updated_macro = catalog.get_macro("test_macro").unwrap();
assert_eq!(updated_macro.parameters.len(), 1);
assert_eq!(updated_macro.parameters[0], "single_param");

let another_macro = MacroDefinition {
name: "test_macro".to_string(),
parameters: vec!["x".to_string(), "y".to_string()],
body: "SELECT x, y FROM table".to_string(),
};

let result = catalog.register_macro("test_macro", Arc::new(another_macro), false);

assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("already exists"));

let another_macro = MacroDefinition {
name: "another_macro".to_string(),
parameters: vec!["x".to_string(), "y".to_string()],
body: "SELECT x, y FROM table".to_string(),
};

catalog
.register_macro("another_macro", Arc::new(another_macro), false)
.unwrap();

assert!(catalog.macro_exists("test_macro"));
assert!(catalog.macro_exists("another_macro"));
}

#[test]
fn test_macro_catalog_case_sensitivity() {
let catalog = MemoryMacroCatalog::new();

let macro_def = MacroDefinition {
name: "MixedCaseMacro".to_string(),
parameters: vec!["Param".to_string()],
body: "SELECT * FROM table WHERE id = Param".to_string(),
};

catalog
.register_macro("MixedCaseMacro", Arc::new(macro_def), false)
.unwrap();

assert!(catalog.macro_exists("MixedCaseMacro"));
assert!(!catalog.macro_exists("mixedcasemacro"));
assert!(!catalog.macro_exists("MIXEDCASEMACRO"));

let result = catalog.get_macro("MixedCaseMacro");
assert!(result.is_ok());

let result = catalog.get_macro("mixedcasemacro");
assert!(result.is_err());
}
2 changes: 2 additions & 0 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ mod column;
mod dfschema;
mod functional_dependencies;
mod join_type;
mod macros;
mod param_value;
#[cfg(feature = "pyarrow")]
mod pyarrow;
Expand Down Expand Up @@ -78,6 +79,7 @@ pub use functional_dependencies::{
};
use hashbrown::hash_map::DefaultHashBuilder;
pub use join_type::{JoinConstraint, JoinSide, JoinType};
pub use macros::{MacroCatalog, MacroDefinition};
pub use param_value::ParamValues;
pub use scalar::{ScalarType, ScalarValue};
pub use schema_reference::SchemaReference;
Expand Down
75 changes: 75 additions & 0 deletions datafusion/common/src/macros.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! SQL macro catalog interface definitions for DataFusion

use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;

use crate::Result;

/// Trait for SQL Macro catalog functionality, which manages
/// macro definitions used in SQL statements.
///
/// This trait enables support for SQL macros in DataFusion, which allow
/// users to define reusable SQL templates with parameters that can be
/// expanded at query planning time.
pub trait MacroCatalog: Debug + Send + Sync {
/// Returns the macro catalog as [`Any`] so that it can be downcast to a
/// specific implementation.
fn as_any(&self) -> &dyn Any;

/// Check if a macro with the given name exists in the catalog.
fn macro_exists(&self, name: &str) -> bool;

/// Get a macro definition by name.
///
/// Returns an error if the macro does not exist.
fn get_macro(&self, name: &str) -> Result<Arc<MacroDefinition>>;

/// Register a macro definition with the catalog.
///
/// If a macro with the same name already exists and `or_replace` is true,
/// the existing macro is replaced. Otherwise, an error is returned.
fn register_macro(
&self,
name: &str,
macro_def: Arc<MacroDefinition>,
or_replace: bool,
) -> Result<()>;

/// Remove a macro definition by name.
///
/// Returns the removed macro if it existed, None otherwise.
fn drop_macro(&self, name: &str) -> Result<Option<Arc<MacroDefinition>>>;
}

/// Definition of a SQL macro
///
/// A SQL macro is a parameterized SQL query that can be expanded at query planning time.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct MacroDefinition {
/// Name of the macro
pub name: String,

/// List of parameters that can be substituted in the SQL body
pub parameters: Vec<String>,

/// SQL body of the macro that will be expanded
pub body: String,
}
Loading
Loading