-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-14123][SQL] Implement function related DDL commands #12036
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7d00184
9af70af
35ad7ae
cb29f0f
77848c9
b8dda84
ee957db
133ce1a
6b76980
e05b108
314c4db
c370c47
acf9299
2cab41c
65d9dbd
05709f0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder | |
import org.apache.spark.sql.catalyst.expressions._ | ||
import org.apache.spark.sql.catalyst.expressions.aggregate._ | ||
import org.apache.spark.sql.catalyst.util.StringKeyHashMap | ||
import org.apache.spark.util.Utils | ||
|
||
|
||
/** A catalog for looking up user defined functions, used by an [[Analyzer]]. */ | ||
|
@@ -52,6 +53,16 @@ trait FunctionRegistry { | |
/** Drop a function and return whether the function existed. */ | ||
def dropFunction(name: String): Boolean | ||
|
||
/** | ||
* Construct a [[FunctionBuilder]] based on the provided class that represents a function. | ||
* | ||
* This performs reflection to decide what type of [[Expression]] to return in the builder. | ||
* This is useful for creating temporary functions. | ||
*/ | ||
def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = { | ||
// TODO: at least support UDAFs here | ||
throw new UnsupportedOperationException("Use sqlContext.udf.register(...) instead.") | ||
} | ||
} | ||
|
||
class SimpleFunctionRegistry extends FunctionRegistry { | ||
|
@@ -67,9 +78,14 @@ class SimpleFunctionRegistry extends FunctionRegistry { | |
} | ||
|
||
override def lookupFunction(name: String, children: Seq[Expression]): Expression = { | ||
val builder = functionBuilders.get(name) | ||
if (builder.isEmpty) { | ||
throw new AnalysisException(s"undefined function $name") | ||
} | ||
val func = synchronized { | ||
functionBuilders.get(name).map(_._2).getOrElse { | ||
throw new AnalysisException(s"undefined function $name") | ||
Try(builder.map(_._2)) match { | ||
case Success(e) => e.get | ||
case Failure(e) => throw new AnalysisException(e.getMessage) | ||
} | ||
} | ||
func(children) | ||
|
@@ -337,13 +353,12 @@ object FunctionRegistry { | |
fr | ||
} | ||
|
||
/** See usage above. */ | ||
def expression[T <: Expression](name: String) | ||
(implicit tag: ClassTag[T]): (String, (ExpressionInfo, FunctionBuilder)) = { | ||
|
||
def expression[T <: Expression]( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We create |
||
name: String, | ||
runtimeClass: Class[T]): (String, (ExpressionInfo, FunctionBuilder)) = { | ||
// See if we can find a constructor that accepts Seq[Expression] | ||
val varargCtor = Try(tag.runtimeClass.getDeclaredConstructor(classOf[Seq[_]])).toOption | ||
val builder = (expressions: Seq[Expression]) => { | ||
val varargCtor = Try(runtimeClass.getDeclaredConstructor(classOf[Seq[_]])).toOption | ||
val builder: FunctionBuilder = (expressions: Seq[Expression]) => { | ||
if (varargCtor.isDefined) { | ||
// If there is an apply method that accepts Seq[Expression], use that one. | ||
Try(varargCtor.get.newInstance(expressions).asInstanceOf[Expression]) match { | ||
|
@@ -353,7 +368,7 @@ object FunctionRegistry { | |
} else { | ||
// Otherwise, find an ctor method that matches the number of arguments, and use that. | ||
val params = Seq.fill(expressions.size)(classOf[Expression]) | ||
val f = Try(tag.runtimeClass.getDeclaredConstructor(params : _*)) match { | ||
val f = Try(runtimeClass.getDeclaredConstructor(params : _*)) match { | ||
case Success(e) => | ||
e | ||
case Failure(e) => | ||
|
@@ -366,14 +381,19 @@ object FunctionRegistry { | |
} | ||
} | ||
|
||
val clazz = tag.runtimeClass | ||
val df = clazz.getAnnotation(classOf[ExpressionDescription]) | ||
val df = runtimeClass.getAnnotation(classOf[ExpressionDescription]) | ||
if (df != null) { | ||
(name, | ||
(new ExpressionInfo(clazz.getCanonicalName, name, df.usage(), df.extended()), | ||
(new ExpressionInfo(runtimeClass.getCanonicalName, name, df.usage(), df.extended()), | ||
builder)) | ||
} else { | ||
(name, (new ExpressionInfo(clazz.getCanonicalName, name), builder)) | ||
(name, (new ExpressionInfo(runtimeClass.getCanonicalName, name), builder)) | ||
} | ||
} | ||
|
||
/** See usage above. */ | ||
def expression[T <: Expression](name: String) | ||
(implicit tag: ClassTag[T]): (String, (ExpressionInfo, FunctionBuilder)) = { | ||
expression(name, tag.runtimeClass.asInstanceOf[Class[T]]) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,11 +18,12 @@ | |
package org.apache.spark.sql.execution.command | ||
|
||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.sql.{Row, SQLContext} | ||
import org.apache.spark.sql.catalyst.TableIdentifier | ||
import org.apache.spark.sql.catalyst.catalog.CatalogDatabase | ||
import org.apache.spark.sql.{AnalysisException, Row, SQLContext} | ||
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} | ||
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException | ||
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogFunction} | ||
import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec | ||
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} | ||
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, ExpressionInfo} | ||
import org.apache.spark.sql.execution.datasources.BucketSpec | ||
import org.apache.spark.sql.types._ | ||
|
||
|
@@ -175,13 +176,42 @@ case class DescribeDatabase( | |
} | ||
} | ||
|
||
/** | ||
* The DDL command that creates a function. | ||
* alias: the class name that implements the created function. | ||
* resources: Jars, files, or archives which need to be added to the environment when the function | ||
* is referenced for the first time by a session. | ||
* isTemp: indicates if it is a temporary function. | ||
*/ | ||
case class CreateFunction( | ||
databaseName: Option[String], | ||
functionName: String, | ||
alias: String, | ||
resources: Seq[(String, String)], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. resources is not used here? I think we need to change There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yah, I should put the resources into it. |
||
isTemp: Boolean)(sql: String) | ||
extends NativeDDLCommand(sql) with Logging | ||
isTemp: Boolean) | ||
extends RunnableCommand { | ||
|
||
override def run(sqlContext: SQLContext): Seq[Row] = { | ||
val func = FunctionIdentifier(functionName, databaseName) | ||
val catalogFunc = CatalogFunction(func, alias, resources) | ||
if (isTemp) { | ||
val info = new ExpressionInfo(alias, functionName) | ||
val builder = | ||
sqlContext.sessionState.functionRegistry.makeFunctionBuilder(functionName, alias) | ||
sqlContext.sessionState.catalog.createTempFunction( | ||
functionName, info, builder, ignoreIfExists = false) | ||
} else { | ||
// Check if the function to create is already existing. If so, throw exception. | ||
if (sqlContext.sessionState.catalog.functionExists(func)) { | ||
val dbName = databaseName.getOrElse(sqlContext.sessionState.catalog.getCurrentDatabase) | ||
throw new AnalysisException( | ||
s"Function '$functionName' already exists in database '$dbName'.") | ||
} | ||
sqlContext.sessionState.catalog.createFunction(catalogFunc) | ||
} | ||
Seq.empty[Row] | ||
} | ||
} | ||
|
||
/** | ||
* The DDL command that drops a function. | ||
|
@@ -192,8 +222,28 @@ case class DropFunction( | |
databaseName: Option[String], | ||
functionName: String, | ||
ifExists: Boolean, | ||
isTemp: Boolean)(sql: String) | ||
extends NativeDDLCommand(sql) with Logging | ||
isTemp: Boolean) | ||
extends RunnableCommand { | ||
|
||
override def run(sqlContext: SQLContext): Seq[Row] = { | ||
if (isTemp) { | ||
require(databaseName.isEmpty, | ||
"attempted to drop a temporary function while specifying a database") | ||
sqlContext.sessionState.catalog.dropTempFunction(functionName, ifExists) | ||
} else { | ||
val func = FunctionIdentifier(functionName, databaseName) | ||
if (!ifExists) { | ||
if (!sqlContext.sessionState.catalog.functionExists(func)) { | ||
val dbName = databaseName.getOrElse(sqlContext.sessionState.catalog.getCurrentDatabase) | ||
throw new AnalysisException( | ||
s"Function '$functionName' does not exist in database '$dbName'.") | ||
} | ||
} | ||
sqlContext.sessionState.catalog.dropFunction(func) | ||
} | ||
Seq.empty[Row] | ||
} | ||
} | ||
|
||
/** Rename in ALTER TABLE/VIEW: change the name of a table/view to a different name. */ | ||
case class AlterTableRename( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What exception could
Option.map
possibly throw? This method doesn't need to change at all and the old code is more concise.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hive's exception will be thrown. I faced the exception thrown in working on the changes. One jenkins test (
HiveUDFSuite
) checks the error message. We will miss the actual exception and message and only get AnalysisException withundefined function
here. I think it is proper to show real error message.