-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Create datafusion-functions
crate, extract encode and decode to
#8705
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f2846b0
1a1e0f4
304ce2d
259f122
68b9289
5deddb0
f75472c
3d3d03a
6f961be
a959996
1262f52
841e32a
2d21c35
5881756
cc6268e
76b1650
f3c311d
159cfbd
a15178e
af79f35
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,8 +17,9 @@ | |
|
||
//! FunctionRegistry trait | ||
|
||
use datafusion_common::Result; | ||
use datafusion_common::{not_impl_err, plan_datafusion_err, DataFusionError, Result}; | ||
use datafusion_expr::{AggregateUDF, ScalarUDF, UserDefinedLogicalNode, WindowUDF}; | ||
use std::collections::HashMap; | ||
use std::{collections::HashSet, sync::Arc}; | ||
|
||
/// A registry knows how to build logical expressions out of user-defined function' names | ||
|
@@ -34,6 +35,17 @@ pub trait FunctionRegistry { | |
|
||
/// Returns a reference to the udwf named `name`. | ||
fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>>; | ||
|
||
/// Registers a new [`ScalarUDF`], returning any previously registered | ||
/// implementation. | ||
/// | ||
/// Returns an error (the default) if the function can not be registered, | ||
/// for example if the registry is read only. | ||
fn register_udf(&mut self, _udf: Arc<ScalarUDF>) -> Result<Option<Arc<ScalarUDF>>> { | ||
not_impl_err!("Registering ScalarUDF") | ||
} | ||
|
||
// TODO add register_udaf and register_udwf | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will file follow on tickets for this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
} | ||
|
||
/// Serializer and deserializer registry for extensions like [UserDefinedLogicalNode]. | ||
|
@@ -53,3 +65,51 @@ pub trait SerializerRegistry: Send + Sync { | |
bytes: &[u8], | ||
) -> Result<Arc<dyn UserDefinedLogicalNode>>; | ||
} | ||
|
||
/// A [`FunctionRegistry`] that uses in memory [`HashMap`]s | ||
#[derive(Default, Debug)] | ||
pub struct MemoryFunctionRegistry { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is currently only used for testing, but I plan to reduce the duplication between SessionContext, and TaskContext using it |
||
/// Scalar Functions | ||
udfs: HashMap<String, Arc<ScalarUDF>>, | ||
/// Aggregate Functions | ||
udafs: HashMap<String, Arc<AggregateUDF>>, | ||
/// Window Functions | ||
udwfs: HashMap<String, Arc<WindowUDF>>, | ||
} | ||
|
||
impl MemoryFunctionRegistry { | ||
pub fn new() -> Self { | ||
Self::default() | ||
} | ||
} | ||
|
||
impl FunctionRegistry for MemoryFunctionRegistry { | ||
fn udfs(&self) -> HashSet<String> { | ||
self.udfs.keys().cloned().collect() | ||
} | ||
|
||
fn udf(&self, name: &str) -> Result<Arc<ScalarUDF>> { | ||
self.udfs | ||
.get(name) | ||
.cloned() | ||
.ok_or_else(|| plan_datafusion_err!("Function {name} not found")) | ||
} | ||
|
||
fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> { | ||
self.udafs | ||
.get(name) | ||
.cloned() | ||
.ok_or_else(|| plan_datafusion_err!("Aggregate Function {name} not found")) | ||
} | ||
|
||
fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>> { | ||
self.udwfs | ||
.get(name) | ||
.cloned() | ||
.ok_or_else(|| plan_datafusion_err!("Window Function {name} not found")) | ||
} | ||
|
||
fn register_udf(&mut self, udf: Arc<ScalarUDF>) -> Result<Option<Arc<ScalarUDF>>> { | ||
Ok(self.udfs.insert(udf.name().to_string(), udf)) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -69,14 +69,10 @@ pub enum BuiltinScalarFunction { | |
Cos, | ||
/// cos | ||
Cosh, | ||
/// Decode | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the key point of this PR: to begin removing items from this (giant) enum |
||
Decode, | ||
/// degrees | ||
Degrees, | ||
/// Digest | ||
Digest, | ||
/// Encode | ||
Encode, | ||
/// exp | ||
Exp, | ||
/// factorial | ||
|
@@ -381,9 +377,7 @@ impl BuiltinScalarFunction { | |
BuiltinScalarFunction::Coalesce => Volatility::Immutable, | ||
BuiltinScalarFunction::Cos => Volatility::Immutable, | ||
BuiltinScalarFunction::Cosh => Volatility::Immutable, | ||
BuiltinScalarFunction::Decode => Volatility::Immutable, | ||
BuiltinScalarFunction::Degrees => Volatility::Immutable, | ||
BuiltinScalarFunction::Encode => Volatility::Immutable, | ||
BuiltinScalarFunction::Exp => Volatility::Immutable, | ||
BuiltinScalarFunction::Factorial => Volatility::Immutable, | ||
BuiltinScalarFunction::Floor => Volatility::Immutable, | ||
|
@@ -774,30 +768,6 @@ impl BuiltinScalarFunction { | |
BuiltinScalarFunction::Digest => { | ||
utf8_or_binary_to_binary_type(&input_expr_types[0], "digest") | ||
} | ||
BuiltinScalarFunction::Encode => Ok(match input_expr_types[0] { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic is moved into the |
||
Utf8 => Utf8, | ||
LargeUtf8 => LargeUtf8, | ||
Binary => Utf8, | ||
LargeBinary => LargeUtf8, | ||
Null => Null, | ||
_ => { | ||
return plan_err!( | ||
"The encode function can only accept utf8 or binary." | ||
); | ||
} | ||
}), | ||
BuiltinScalarFunction::Decode => Ok(match input_expr_types[0] { | ||
Utf8 => Binary, | ||
LargeUtf8 => LargeBinary, | ||
Binary => Binary, | ||
LargeBinary => LargeBinary, | ||
Null => Null, | ||
_ => { | ||
return plan_err!( | ||
"The decode function can only accept utf8 or binary." | ||
); | ||
} | ||
}), | ||
BuiltinScalarFunction::SplitPart => { | ||
utf8_to_str_type(&input_expr_types[0], "split_part") | ||
} | ||
|
@@ -1089,24 +1059,6 @@ impl BuiltinScalarFunction { | |
], | ||
self.volatility(), | ||
), | ||
BuiltinScalarFunction::Encode => Signature::one_of( | ||
vec![ | ||
Exact(vec![Utf8, Utf8]), | ||
Exact(vec![LargeUtf8, Utf8]), | ||
Exact(vec![Binary, Utf8]), | ||
Exact(vec![LargeBinary, Utf8]), | ||
], | ||
self.volatility(), | ||
), | ||
BuiltinScalarFunction::Decode => Signature::one_of( | ||
vec![ | ||
Exact(vec![Utf8, Utf8]), | ||
Exact(vec![LargeUtf8, Utf8]), | ||
Exact(vec![Binary, Utf8]), | ||
Exact(vec![LargeBinary, Utf8]), | ||
], | ||
self.volatility(), | ||
), | ||
BuiltinScalarFunction::DateTrunc => Signature::one_of( | ||
vec![ | ||
Exact(vec![Utf8, Timestamp(Nanosecond, None)]), | ||
|
@@ -1551,10 +1503,6 @@ impl BuiltinScalarFunction { | |
BuiltinScalarFunction::SHA384 => &["sha384"], | ||
BuiltinScalarFunction::SHA512 => &["sha512"], | ||
|
||
// encode/decode | ||
BuiltinScalarFunction::Encode => &["encode"], | ||
BuiltinScalarFunction::Decode => &["decode"], | ||
|
||
// other functions | ||
BuiltinScalarFunction::ArrowTypeof => &["arrow_typeof"], | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these functions weren't previously tested. I added tests to show that it is still possible to use the fluent API even with the moving of the
expr_fn
implementation