Skip to content

Commit

Permalink
remove's gone from the trait ...
Browse files Browse the repository at this point in the history
... `DROP FUNCTION` will look for function name
in all available registries (udf, udaf, udwf).

`remove` may be necessary if UDaF and UDwF do not
get `simplify` method from apache#9304.
  • Loading branch information
milenkovicm committed Mar 5, 2024
1 parent 262f0f6 commit fe63c31
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 64 deletions.
54 changes: 20 additions & 34 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -827,20 +827,23 @@ impl SessionContext {
}

async fn drop_function(&self, stmt: DropFunction) -> Result<DataFrame> {
let _function = {
let state = self.state.read().clone();
let function_factory = &state.function_factory;

match function_factory {
Some(f) => f.remove(state.config(), stmt).await?,
None => Err(DataFusionError::Configuration(
"Function factory has not been configured".into(),
))?,
}
};

// TODO: Once we have unregister UDF we need to implement it here
self.return_empty_dataframe()
// we don't know function type at this point
// decision has been made to drop all functions
let mut dropped = false;
dropped |= self.state.write().deregister_udf(&stmt.name)?.is_some();
dropped |= self.state.write().deregister_udaf(&stmt.name)?.is_some();
dropped |= self.state.write().deregister_udwf(&stmt.name)?.is_some();

// DROP FUNCTION IF EXISTS drops the specified function only if that
// function exists and in this way, it avoids error. While the DROP FUNCTION
// statement also performs the same function, it throws an
// error if the function does not exist.

if !stmt.if_exists && !dropped {
Err(DataFusionError::Execution("Function does not exist".into()))
} else {
self.return_empty_dataframe()
}
}

/// Registers a variable provider within this context.
Expand Down Expand Up @@ -1310,18 +1313,9 @@ impl QueryPlanner for DefaultQueryPlanner {
.await
}
}
/// Crates and registers a function from [CreateFunction] statement
///
/// It is intended to handle `CREATE FUNCTION` statements
/// and interact with [SessionState] to registers new udfs.
///
/// Datafusion `SQL` dialect does not support `CREATE FUNCTION`
/// in generic dialect, so dialect should be changed to `PostgreSQL`
///
/// ```rust, no_run
/// # use datafusion::execution::config::SessionConfig;
/// SessionConfig::new().set_str("datafusion.sql_parser.dialect", "PostgreSQL");
/// ```
/// A pluggable interface to handle `CREATE FUNCTION` statements
/// and interact with [SessionState] to registers new udf, udaf or udwf.

#[async_trait]
pub trait FunctionFactory: Sync + Send {
/// Handles creation of user defined function specified in [CreateFunction] statement
Expand All @@ -1330,14 +1324,6 @@ pub trait FunctionFactory: Sync + Send {
state: &SessionConfig,
statement: CreateFunction,
) -> Result<RegisterFunction>;

/// Drops user defined function from [SessionState]
// Naming it `drop` would make more sense but its already occupied in rust
async fn remove(
&self,
state: &SessionConfig,
statement: DropFunction,
) -> Result<RegisterFunction>;
}

/// Type of function to create
Expand Down
45 changes: 15 additions & 30 deletions datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use datafusion_common::{
};
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_expr::{
create_udaf, create_udf, Accumulator, ColumnarValue, CreateFunction, DropFunction,
ExprSchemable, LogicalPlanBuilder, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
create_udaf, create_udf, Accumulator, ColumnarValue, CreateFunction, ExprSchemable,
LogicalPlanBuilder, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
};
use parking_lot::Mutex;
use rand::{thread_rng, Rng};
Expand Down Expand Up @@ -693,28 +693,10 @@ impl FunctionFactory for MockFunctionFactory {

Ok(RegisterFunction::Scalar(Arc::new(mock_udf)))
}

async fn remove(
&self,
_config: &SessionConfig,
_statement: DropFunction,
) -> datafusion::error::Result<RegisterFunction> {
// TODO: I don't like that remove returns RegisterFunction
// we have to keep two states in FunctionFactory iml and
// SessionState
//
// It would be better to return (function_name, function type) tuple
//
// at the moment state does not support unregister user defined functions

Err(datafusion_common::DataFusionError::NotImplemented(
"remove function has not been implemented".into(),
))
}
}

#[tokio::test]
async fn create_scalar_function_from_sql_statement() {
async fn create_scalar_function_from_sql_statement() -> Result<()> {
let function_factory = Arc::new(MockFunctionFactory::default());
let runtime_config = RuntimeConfig::new();
let runtime_environment = RuntimeEnv::new(runtime_config).unwrap();
Expand All @@ -732,23 +714,26 @@ async fn create_scalar_function_from_sql_statement() {
RETURNS DOUBLE
RETURN $1 + $2
"#;
let _ = ctx.sql(sql).await.unwrap();
let _ = ctx.sql(sql).await?;

ctx.sql("select better_add(2.0, 2.0)")
.await
.unwrap()
.show()
.await
.unwrap();
ctx.sql("select better_add(2.0, 2.0)").await?.show().await?;

// check if we sql expr has been converted to datafusion expr
let captured_expression = function_factory.captured_expr.lock().clone().unwrap();

// is there some better way to test this
assert_eq!("$1 + $2", captured_expression.to_string());

// no support at the moment
// ctx.sql("drop function better_add").await.unwrap();
// statement drops function
assert!(ctx.sql("drop function better_add").await.is_ok());
// no function, it panics
assert!(ctx.sql("drop function better_add").await.is_err());
// no function, it dies not care
assert!(ctx.sql("drop function if exists better_add").await.is_ok());
// query should fail as there is no function
assert!(ctx.sql("select better_add(2.0, 2.0)").await.is_err());

Ok(())
}

fn create_udf_context() -> SessionContext {
Expand Down

0 comments on commit fe63c31

Please sign in to comment.