Skip to content

[SPARK-14123][SQL] Implement function related DDL commands #12036

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.util.StringKeyHashMap
import org.apache.spark.util.Utils


/** A catalog for looking up user defined functions, used by an [[Analyzer]]. */
Expand All @@ -52,6 +53,16 @@ trait FunctionRegistry {
/** Drop a function and return whether the function existed. */
def dropFunction(name: String): Boolean

/**
* Construct a [[FunctionBuilder]] based on the provided class that represents a function.
*
* This performs reflection to decide what type of [[Expression]] to return in the builder.
* This is useful for creating temporary functions.
*/
def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = {
// TODO: at least support UDAFs here
throw new UnsupportedOperationException("Use sqlContext.udf.register(...) instead.")
}
}

class SimpleFunctionRegistry extends FunctionRegistry {
Expand All @@ -67,9 +78,14 @@ class SimpleFunctionRegistry extends FunctionRegistry {
}

override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
val builder = functionBuilders.get(name)
if (builder.isEmpty) {
throw new AnalysisException(s"undefined function $name")
}
val func = synchronized {
functionBuilders.get(name).map(_._2).getOrElse {
throw new AnalysisException(s"undefined function $name")
Try(builder.map(_._2)) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exception could Option.map possibly throw? This method doesn't need to change at all and the old code is more concise.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hive's exception will be thrown. I faced the exception thrown in working on the changes. One jenkins test (HiveUDFSuite) checks the error message. We will miss the actual exception and message and only get AnalysisException with undefined function here. I think it is proper to show real error message.

case Success(e) => e.get
case Failure(e) => throw new AnalysisException(e.getMessage)
}
}
func(children)
Expand Down Expand Up @@ -337,13 +353,12 @@ object FunctionRegistry {
fr
}

/** See usage above. */
def expression[T <: Expression](name: String)
(implicit tag: ClassTag[T]): (String, (ExpressionInfo, FunctionBuilder)) = {

def expression[T <: Expression](
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We create FunctionBuilder here for temporary functions.

name: String,
runtimeClass: Class[T]): (String, (ExpressionInfo, FunctionBuilder)) = {
// See if we can find a constructor that accepts Seq[Expression]
val varargCtor = Try(tag.runtimeClass.getDeclaredConstructor(classOf[Seq[_]])).toOption
val builder = (expressions: Seq[Expression]) => {
val varargCtor = Try(runtimeClass.getDeclaredConstructor(classOf[Seq[_]])).toOption
val builder: FunctionBuilder = (expressions: Seq[Expression]) => {
if (varargCtor.isDefined) {
// If there is an apply method that accepts Seq[Expression], use that one.
Try(varargCtor.get.newInstance(expressions).asInstanceOf[Expression]) match {
Expand All @@ -353,7 +368,7 @@ object FunctionRegistry {
} else {
// Otherwise, find an ctor method that matches the number of arguments, and use that.
val params = Seq.fill(expressions.size)(classOf[Expression])
val f = Try(tag.runtimeClass.getDeclaredConstructor(params : _*)) match {
val f = Try(runtimeClass.getDeclaredConstructor(params : _*)) match {
case Success(e) =>
e
case Failure(e) =>
Expand All @@ -366,14 +381,19 @@ object FunctionRegistry {
}
}

val clazz = tag.runtimeClass
val df = clazz.getAnnotation(classOf[ExpressionDescription])
val df = runtimeClass.getAnnotation(classOf[ExpressionDescription])
if (df != null) {
(name,
(new ExpressionInfo(clazz.getCanonicalName, name, df.usage(), df.extended()),
(new ExpressionInfo(runtimeClass.getCanonicalName, name, df.usage(), df.extended()),
builder))
} else {
(name, (new ExpressionInfo(clazz.getCanonicalName, name), builder))
(name, (new ExpressionInfo(runtimeClass.getCanonicalName, name), builder))
}
}

/** See usage above. */
def expression[T <: Expression](name: String)
(implicit tag: ClassTag[T]): (String, (ExpressionInfo, FunctionBuilder)) = {
expression(name, tag.runtimeClass.asInstanceOf[Class[T]])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.mutable
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, SimpleFunctionRegistry}
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchFunctionException, SimpleFunctionRegistry}
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
Expand Down Expand Up @@ -458,6 +458,18 @@ class SessionCatalog(
externalCatalog.getFunction(db, name.funcName)
}

/**
* Check if a function is already existing.
*
*/
def functionExists(name: FunctionIdentifier): Boolean = {
try {
getFunction(name) != null
} catch {
case _: NoSuchFunctionException => false
case _: AnalysisException => false // HiveExternalCatalog wraps all exceptions with it.
}
}

// ----------------------------------------------------------------
// | Methods that interact with temporary and metastore functions |
Expand All @@ -469,12 +481,13 @@ class SessionCatalog(
*/
def createTempFunction(
name: String,
info: ExpressionInfo,
funcDefinition: FunctionBuilder,
ignoreIfExists: Boolean): Unit = {
if (functionRegistry.lookupFunctionBuilder(name).isDefined && !ignoreIfExists) {
throw new AnalysisException(s"Temporary function '$name' already exists.")
}
functionRegistry.registerFunction(name, funcDefinition)
functionRegistry.registerFunction(name, info, funcDefinition)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,12 @@ abstract class ExternalCatalog {
*
* @param identifier name of the function
* @param className fully qualified class name, e.g. "org.apache.spark.util.MyFunc"
* @param resources resource types and Uris used by the function
*/
case class CatalogFunction(identifier: FunctionIdentifier, className: String)
case class CatalogFunction(
identifier: FunctionIdentifier,
className: String,
resources: Seq[(String, String)])


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,8 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
test("get function") {
val catalog = newBasicCatalog()
assert(catalog.getFunction("db2", "func1") ==
CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass))
CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass,
Seq.empty[(String, String)]))
intercept[AnalysisException] {
catalog.getFunction("db2", "does_not_exist")
}
Expand Down Expand Up @@ -557,7 +558,7 @@ abstract class CatalogTestUtils {
}

def newFunc(name: String, database: Option[String] = None): CatalogFunction = {
CatalogFunction(FunctionIdentifier(name, database), funcClass)
CatalogFunction(FunctionIdentifier(name, database), funcClass, Seq.empty[(String, String)])
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.catalog
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias}


Expand Down Expand Up @@ -685,18 +685,21 @@ class SessionCatalogSuite extends SparkFunSuite {
val catalog = new SessionCatalog(newBasicCatalog())
val tempFunc1 = (e: Seq[Expression]) => e.head
val tempFunc2 = (e: Seq[Expression]) => e.last
catalog.createTempFunction("temp1", tempFunc1, ignoreIfExists = false)
catalog.createTempFunction("temp2", tempFunc2, ignoreIfExists = false)
val info1 = new ExpressionInfo("tempFunc1", "temp1")
val info2 = new ExpressionInfo("tempFunc2", "temp2")
catalog.createTempFunction("temp1", info1, tempFunc1, ignoreIfExists = false)
catalog.createTempFunction("temp2", info2, tempFunc2, ignoreIfExists = false)
assert(catalog.getTempFunction("temp1") == Some(tempFunc1))
assert(catalog.getTempFunction("temp2") == Some(tempFunc2))
assert(catalog.getTempFunction("temp3") == None)
val tempFunc3 = (e: Seq[Expression]) => Literal(e.size)
val info3 = new ExpressionInfo("tempFunc3", "temp1")
// Temporary function already exists
intercept[AnalysisException] {
catalog.createTempFunction("temp1", tempFunc3, ignoreIfExists = false)
catalog.createTempFunction("temp1", info3, tempFunc3, ignoreIfExists = false)
}
// Temporary function is overridden
catalog.createTempFunction("temp1", tempFunc3, ignoreIfExists = true)
catalog.createTempFunction("temp1", info3, tempFunc3, ignoreIfExists = true)
assert(catalog.getTempFunction("temp1") == Some(tempFunc3))
}

Expand Down Expand Up @@ -726,8 +729,9 @@ class SessionCatalogSuite extends SparkFunSuite {

test("drop temp function") {
val catalog = new SessionCatalog(newBasicCatalog())
val info = new ExpressionInfo("tempFunc", "func1")
val tempFunc = (e: Seq[Expression]) => e.head
catalog.createTempFunction("func1", tempFunc, ignoreIfExists = false)
catalog.createTempFunction("func1", info, tempFunc, ignoreIfExists = false)
assert(catalog.getTempFunction("func1") == Some(tempFunc))
catalog.dropTempFunction("func1", ignoreIfNotExists = false)
assert(catalog.getTempFunction("func1") == None)
Expand All @@ -739,7 +743,9 @@ class SessionCatalogSuite extends SparkFunSuite {

test("get function") {
val catalog = new SessionCatalog(newBasicCatalog())
val expected = CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass)
val expected =
CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass,
Seq.empty[(String, String)])
assert(catalog.getFunction(FunctionIdentifier("func1", Some("db2"))) == expected)
// Get function without explicitly specifying database
catalog.setCurrentDatabase("db2")
Expand All @@ -758,8 +764,9 @@ class SessionCatalogSuite extends SparkFunSuite {

test("lookup temp function") {
val catalog = new SessionCatalog(newBasicCatalog())
val info1 = new ExpressionInfo("tempFunc1", "func1")
val tempFunc1 = (e: Seq[Expression]) => e.head
catalog.createTempFunction("func1", tempFunc1, ignoreIfExists = false)
catalog.createTempFunction("func1", info1, tempFunc1, ignoreIfExists = false)
assert(catalog.lookupFunction("func1", Seq(Literal(1), Literal(2), Literal(3))) == Literal(1))
catalog.dropTempFunction("func1", ignoreIfNotExists = false)
intercept[AnalysisException] {
Expand Down Expand Up @@ -809,8 +816,9 @@ class SessionCatalogSuite extends SparkFunSuite {
test("rename temp function") {
val externalCatalog = newBasicCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
val info = new ExpressionInfo("tempFunc", "func1")
val tempFunc = (e: Seq[Expression]) => e.head
sessionCatalog.createTempFunction("func1", tempFunc, ignoreIfExists = false)
sessionCatalog.createTempFunction("func1", info, tempFunc, ignoreIfExists = false)
sessionCatalog.setCurrentDatabase("db2")
// If a database is specified, we'll always rename the function in that database
sessionCatalog.renameFunction(
Expand Down Expand Up @@ -853,12 +861,14 @@ class SessionCatalogSuite extends SparkFunSuite {

test("list functions") {
val catalog = new SessionCatalog(newBasicCatalog())
val info1 = new ExpressionInfo("tempFunc1", "func1")
val info2 = new ExpressionInfo("tempFunc2", "yes_me")
val tempFunc1 = (e: Seq[Expression]) => e.head
val tempFunc2 = (e: Seq[Expression]) => e.last
catalog.createFunction(newFunc("func2", Some("db2")))
catalog.createFunction(newFunc("not_me", Some("db2")))
catalog.createTempFunction("func1", tempFunc1, ignoreIfExists = false)
catalog.createTempFunction("yes_me", tempFunc2, ignoreIfExists = false)
catalog.createTempFunction("func1", info1, tempFunc1, ignoreIfExists = false)
catalog.createTempFunction("yes_me", info2, tempFunc2, ignoreIfExists = false)
assert(catalog.listFunctions("db1", "*").toSet ==
Set(FunctionIdentifier("func1"),
FunctionIdentifier("yes_me")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,7 @@ class SparkSqlAstBuilder extends AstBuilder {
function,
string(ctx.className), // TODO this is not an alias.
resources,
ctx.TEMPORARY != null)(
command(ctx))
ctx.TEMPORARY != null)
}

/**
Expand All @@ -315,7 +314,7 @@ class SparkSqlAstBuilder extends AstBuilder {
*/
override def visitDropFunction(ctx: DropFunctionContext): LogicalPlan = withOrigin(ctx) {
val (database, function) = visitFunctionName(ctx.qualifiedName)
DropFunction(database, function, ctx.EXISTS != null, ctx.TEMPORARY != null)(command(ctx))
DropFunction(database, function, ctx.EXISTS != null, ctx.TEMPORARY != null)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
package org.apache.spark.sql.execution.command

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogFunction}
import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, ExpressionInfo}
import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -175,13 +176,42 @@ case class DescribeDatabase(
}
}

/**
* The DDL command that creates a function.
* alias: the class name that implements the created function.
* resources: Jars, files, or archives which need to be added to the environment when the function
* is referenced for the first time by a session.
* isTemp: indicates if it is a temporary function.
*/
case class CreateFunction(
databaseName: Option[String],
functionName: String,
alias: String,
resources: Seq[(String, String)],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resources is not used here? I think we need to change CatalogFunction to include resources so we can propagate that to Hive properly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yah, I should put the resources into it.

isTemp: Boolean)(sql: String)
extends NativeDDLCommand(sql) with Logging
isTemp: Boolean)
extends RunnableCommand {

override def run(sqlContext: SQLContext): Seq[Row] = {
val func = FunctionIdentifier(functionName, databaseName)
val catalogFunc = CatalogFunction(func, alias, resources)
if (isTemp) {
val info = new ExpressionInfo(alias, functionName)
val builder =
sqlContext.sessionState.functionRegistry.makeFunctionBuilder(functionName, alias)
sqlContext.sessionState.catalog.createTempFunction(
functionName, info, builder, ignoreIfExists = false)
} else {
// Check if the function to create is already existing. If so, throw exception.
if (sqlContext.sessionState.catalog.functionExists(func)) {
val dbName = databaseName.getOrElse(sqlContext.sessionState.catalog.getCurrentDatabase)
throw new AnalysisException(
s"Function '$functionName' already exists in database '$dbName'.")
}
sqlContext.sessionState.catalog.createFunction(catalogFunc)
}
Seq.empty[Row]
}
}

/**
* The DDL command that drops a function.
Expand All @@ -192,8 +222,28 @@ case class DropFunction(
databaseName: Option[String],
functionName: String,
ifExists: Boolean,
isTemp: Boolean)(sql: String)
extends NativeDDLCommand(sql) with Logging
isTemp: Boolean)
extends RunnableCommand {

override def run(sqlContext: SQLContext): Seq[Row] = {
if (isTemp) {
require(databaseName.isEmpty,
"attempted to drop a temporary function while specifying a database")
sqlContext.sessionState.catalog.dropTempFunction(functionName, ifExists)
} else {
val func = FunctionIdentifier(functionName, databaseName)
if (!ifExists) {
if (!sqlContext.sessionState.catalog.functionExists(func)) {
val dbName = databaseName.getOrElse(sqlContext.sessionState.catalog.getCurrentDatabase)
throw new AnalysisException(
s"Function '$functionName' does not exist in database '$dbName'.")
}
}
sqlContext.sessionState.catalog.dropFunction(func)
}
Seq.empty[Row]
}
}

/** Rename in ALTER TABLE/VIEW: change the name of a table/view to a different name. */
case class AlterTableRename(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,20 @@ private[sql] class SessionState(ctx: SQLContext) {
*/
lazy val continuousQueryManager: ContinuousQueryManager = new ContinuousQueryManager(ctx)

/**
* Loads resource to SQLContext.
*/
def loadResource(resource: Resource): Unit = {
resource.resourceType.toLowerCase match {
case "jar" => ctx.addJar(resource.path)
case _ => ctx.sparkContext.addFile(resource.path)
}
}

/**
* Loads resources such as JARs and Files to SQLContext.
*/
def loadResources(resources: Seq[Resource]): Unit = resources.foreach(loadResource(_))
}

case class Resource(resourceType: String, path: String)
Loading