Skip to content

Commit 2cab41c

Browse files
committed
Refactoring.
1 parent acf9299 commit 2cab41c

File tree

13 files changed

+241
-192
lines changed

13 files changed

+241
-192
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,15 @@ trait FunctionRegistry {
5353
/** Drop a function and return whether the function existed. */
5454
def dropFunction(name: String): Boolean
5555

56-
/* Return the FunctionBuilder and ExpressionInfo for the specified function name and classname. */
57-
def makeFunctionBuilderAndInfo(
58-
name: String,
59-
functionClassName: String): (ExpressionInfo, FunctionBuilder) = {
60-
val clazz = Utils.getContextOrSparkClassLoader.loadClass(functionClassName)
61-
val (_, (info, builder)) =
62-
FunctionRegistry.expression(name, clazz.asInstanceOf[Class[Expression]])
63-
(info, builder)
56+
/**
57+
* Construct a [[FunctionBuilder]] based on the provided class that represents a function.
58+
*
59+
* This performs reflection to decide what type of [[Expression]] to return in the builder.
60+
* This is useful for creating temporary functions.
61+
*/
62+
def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = {
63+
// TODO: at least support UDAFs here
64+
throw new UnsupportedOperationException("Use sqlContext.udf.register(...) instead.")
6465
}
6566
}
6667

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import scala.collection.mutable
2424
import org.apache.spark.sql.AnalysisException
2525
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
2626
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
27-
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, SimpleFunctionRegistry}
27+
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchFunctionException, SimpleFunctionRegistry}
2828
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
2929
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
3030
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
@@ -458,6 +458,18 @@ class SessionCatalog(
458458
externalCatalog.getFunction(db, name.funcName)
459459
}
460460

461+
/**
462+
* Check if a function is already existing.
463+
*
464+
*/
465+
def functionExists(name: FunctionIdentifier): Boolean = {
466+
try {
467+
getFunction(name) != null
468+
} catch {
469+
case _: NoSuchFunctionException => false
470+
case _: AnalysisException => false // HiveExternalCatalog wraps all exceptions with it.
471+
}
472+
}
461473

462474
// ----------------------------------------------------------------
463475
// | Methods that interact with temporary and metastore functions |

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
218218
}
219219
case _ => parseFailed("Invalid CREATE FUNCTION command", node)
220220
}
221-
CreateFunction(dbName, funcName, alias, resources, temp.isDefined)(node.source)
221+
CreateFunction(dbName, funcName, alias, resources, temp.isDefined)
222222

223223
// DROP [TEMPORARY] FUNCTION [IF EXISTS] function_name;
224224
case Token("TOK_DROPFUNCTION", args) =>
@@ -248,7 +248,7 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
248248
val Seq(ifExists, temp) = getClauses(Seq(
249249
"TOK_IFEXISTS", "TOK_TEMPORARY"), otherArgs)
250250

251-
DropFunction(dbName, funcName, ifExists.isDefined, temp.isDefined)(node.source)
251+
DropFunction(dbName, funcName, ifExists.isDefined, temp.isDefined)
252252

253253
case Token("TOK_ALTERTABLE", alterTableArgs) =>
254254
AlterTableCommandParser.parse(node)

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -301,8 +301,7 @@ class SparkSqlAstBuilder extends AstBuilder {
301301
function,
302302
string(ctx.className), // TODO this is not an alias.
303303
resources,
304-
ctx.TEMPORARY != null)(
305-
command(ctx))
304+
ctx.TEMPORARY != null)
306305
}
307306

308307
/**
@@ -315,7 +314,7 @@ class SparkSqlAstBuilder extends AstBuilder {
315314
*/
316315
override def visitDropFunction(ctx: DropFunctionContext): LogicalPlan = withOrigin(ctx) {
317316
val (database, function) = visitFunctionName(ctx.qualifiedName)
318-
DropFunction(database, function, ctx.EXISTS != null, ctx.TEMPORARY != null)(command(ctx))
317+
DropFunction(database, function, ctx.EXISTS != null, ctx.TEMPORARY != null)
319318
}
320319

321320
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
2323
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException
2424
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogFunction}
2525
import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
26-
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
26+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, ExpressionInfo}
2727
import org.apache.spark.sql.execution.datasources.BucketSpec
2828
import org.apache.spark.sql.types._
2929

@@ -188,28 +188,21 @@ case class CreateFunction(
188188
functionName: String,
189189
alias: String,
190190
resources: Seq[(String, String)],
191-
isTemp: Boolean)(sql: String)
192-
extends NativeDDLCommand(sql) with Logging {
191+
isTemp: Boolean)
192+
extends RunnableCommand {
193193

194194
override def run(sqlContext: SQLContext): Seq[Row] = {
195195
val func = FunctionIdentifier(functionName, databaseName)
196196
val catalogFunc = CatalogFunction(func, alias, resources)
197197
if (isTemp) {
198-
// Set `ignoreIfExists` to false, so if the temporary function already exists,
199-
// an exception will be thrown.
200-
val (info, builder) =
201-
sqlContext.sessionState.functionRegistry.makeFunctionBuilderAndInfo(functionName, alias)
202-
sqlContext.sessionState.catalog.createTempFunction(functionName, info, builder, false)
198+
val info = new ExpressionInfo(alias, functionName)
199+
val builder =
200+
sqlContext.sessionState.functionRegistry.makeFunctionBuilder(functionName, alias)
201+
sqlContext.sessionState.catalog.createTempFunction(
202+
functionName, info, builder, ignoreIfExists = false)
203203
} else {
204204
// Check if the function to create is already existing. If so, throw exception.
205-
var funcExisting: Boolean =
206-
try {
207-
sqlContext.sessionState.catalog.getFunction(func) != null
208-
} catch {
209-
case _: NoSuchFunctionException => false
210-
case _: AnalysisException => false // HiveExternalCatalog wraps all exceptions with it.
211-
}
212-
if (funcExisting) {
205+
if (sqlContext.sessionState.catalog.functionExists(func)) {
213206
val dbName = databaseName.getOrElse(sqlContext.sessionState.catalog.getCurrentDatabase)
214207
throw new AnalysisException(
215208
s"Function '$functionName' already exists in database '$dbName'.")
@@ -229,8 +222,8 @@ case class DropFunction(
229222
databaseName: Option[String],
230223
functionName: String,
231224
ifExists: Boolean,
232-
isTemp: Boolean)(sql: String)
233-
extends NativeDDLCommand(sql) with Logging {
225+
isTemp: Boolean)
226+
extends RunnableCommand {
234227

235228
override def run(sqlContext: SQLContext): Seq[Row] = {
236229
if (isTemp) {
@@ -240,11 +233,10 @@ case class DropFunction(
240233
} else {
241234
val func = FunctionIdentifier(functionName, databaseName)
242235
if (!ifExists) {
243-
// getFunction can throw NoSuchFunctionException itself, or return null.
244-
val existingFunc = sqlContext.sessionState.catalog.getFunction(func)
245-
if (existingFunc == null) {
236+
if (!sqlContext.sessionState.catalog.functionExists(func)) {
246237
val dbName = databaseName.getOrElse(sqlContext.sessionState.catalog.getCurrentDatabase)
247-
throw new NoSuchFunctionException(dbName, functionName)
238+
throw new AnalysisException(
239+
s"Function '$functionName' does not exist in database '$dbName'.")
248240
}
249241
}
250242
sqlContext.sessionState.catalog.dropFunction(func)

sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,4 +112,20 @@ private[sql] class SessionState(ctx: SQLContext) {
112112
*/
113113
lazy val continuousQueryManager: ContinuousQueryManager = new ContinuousQueryManager(ctx)
114114

115+
/**
116+
* Loads resource to SQLContext.
117+
*/
118+
def loadResource(resource: Resource): Unit = {
119+
resource.resourceType.toLowerCase match {
120+
case "jar" => ctx.addJar(resource.path)
121+
case _ => ctx.sparkContext.addFile(resource.path)
122+
}
123+
}
124+
125+
/**
126+
* Loads resources such as JARs and Files to SQLContext.
127+
*/
128+
def loadResources(resources: Seq[Resource]): Unit = resources.foreach(loadResource(_))
115129
}
130+
131+
case class Resource(resourceType: String, path: String)

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -146,13 +146,13 @@ class DDLCommandSuite extends PlanTest {
146146
"helloworld",
147147
"com.matthewrathbone.example.SimpleUDFExample",
148148
Seq(("jar", "/path/to/jar1"), ("jar", "/path/to/jar2")),
149-
isTemp = true)(sql1)
149+
isTemp = true)
150150
val expected2 = CreateFunction(
151151
Some("hello"),
152152
"world",
153153
"com.matthewrathbone.example.SimpleUDFExample",
154154
Seq(("archive", "/path/to/archive"), ("file", "/path/to/file")),
155-
isTemp = false)(sql2)
155+
isTemp = false)
156156
comparePlans(parsed1, expected1)
157157
comparePlans(parsed2, expected2)
158158
}
@@ -172,22 +172,22 @@ class DDLCommandSuite extends PlanTest {
172172
None,
173173
"helloworld",
174174
ifExists = false,
175-
isTemp = true)(sql1)
175+
isTemp = true)
176176
val expected2 = DropFunction(
177177
None,
178178
"helloworld",
179179
ifExists = true,
180-
isTemp = true)(sql2)
180+
isTemp = true)
181181
val expected3 = DropFunction(
182182
Some("hello"),
183183
"world",
184184
ifExists = false,
185-
isTemp = false)(sql3)
185+
isTemp = false)
186186
val expected4 = DropFunction(
187187
Some("hello"),
188188
"world",
189189
ifExists = true,
190-
isTemp = false)(sql4)
190+
isTemp = false)
191191

192192
comparePlans(parsed1, expected1)
193193
comparePlans(parsed2, expected2)

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala

Lines changed: 73 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -491,46 +491,50 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
491491

492492
test("SPARK-11595 ADD JAR with input path having URL scheme") {
493493
withJdbcStatement { statement =>
494-
val jarPath = "../hive/src/test/resources/TestUDTF.jar"
495-
val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath"
494+
try {
495+
val jarPath = "../hive/src/test/resources/TestUDTF.jar"
496+
val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath"
496497

497-
Seq(
498-
s"ADD JAR $jarURL",
499-
s"""CREATE TEMPORARY FUNCTION udtf_count2
500-
|AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
501-
""".stripMargin
502-
).foreach(statement.execute)
498+
Seq(
499+
s"ADD JAR $jarURL",
500+
s"""CREATE TEMPORARY FUNCTION udtf_count2
501+
|AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
502+
""".stripMargin
503+
).foreach(statement.execute)
503504

504-
val rs1 = statement.executeQuery("DESCRIBE FUNCTION udtf_count2")
505+
val rs1 = statement.executeQuery("DESCRIBE FUNCTION udtf_count2")
505506

506-
assert(rs1.next())
507-
assert(rs1.getString(1) === "Function: udtf_count2")
507+
assert(rs1.next())
508+
assert(rs1.getString(1) === "Function: udtf_count2")
508509

509-
assert(rs1.next())
510-
assertResult("Class: org.apache.spark.sql.hive.execution.GenericUDTFCount2") {
511-
rs1.getString(1)
512-
}
510+
assert(rs1.next())
511+
assertResult("Class: org.apache.spark.sql.hive.execution.GenericUDTFCount2") {
512+
rs1.getString(1)
513+
}
513514

514-
assert(rs1.next())
515-
assert(rs1.getString(1) === "Usage: To be added.")
515+
assert(rs1.next())
516+
assert(rs1.getString(1) === "Usage: To be added.")
516517

517-
val dataPath = "../hive/src/test/resources/data/files/kv1.txt"
518+
val dataPath = "../hive/src/test/resources/data/files/kv1.txt"
518519

519-
Seq(
520-
s"CREATE TABLE test_udtf(key INT, value STRING)",
521-
s"LOAD DATA LOCAL INPATH '$dataPath' OVERWRITE INTO TABLE test_udtf"
522-
).foreach(statement.execute)
520+
Seq(
521+
s"CREATE TABLE test_udtf(key INT, value STRING)",
522+
s"LOAD DATA LOCAL INPATH '$dataPath' OVERWRITE INTO TABLE test_udtf"
523+
).foreach(statement.execute)
523524

524-
val rs2 = statement.executeQuery(
525-
"SELECT key, cc FROM test_udtf LATERAL VIEW udtf_count2(value) dd AS cc")
525+
val rs2 = statement.executeQuery(
526+
"SELECT key, cc FROM test_udtf LATERAL VIEW udtf_count2(value) dd AS cc")
526527

527-
assert(rs2.next())
528-
assert(rs2.getInt(1) === 97)
529-
assert(rs2.getInt(2) === 500)
528+
assert(rs2.next())
529+
assert(rs2.getInt(1) === 97)
530+
assert(rs2.getInt(2) === 500)
530531

531-
assert(rs2.next())
532-
assert(rs2.getInt(1) === 97)
533-
assert(rs2.getInt(2) === 500)
532+
assert(rs2.next())
533+
assert(rs2.getInt(1) === 97)
534+
assert(rs2.getInt(2) === 500)
535+
} finally {
536+
statement.executeQuery("DROP TEMPORARY FUNCTION udtf_count2")
537+
}
534538
}
535539
}
536540

@@ -548,43 +552,47 @@ class SingleSessionSuite extends HiveThriftJdbcTest {
548552
"--conf spark.sql.hive.thriftServer.singleSession=true" :: Nil
549553

550554
test("test single session") {
551-
withMultipleConnectionJdbcStatement(
552-
{ statement =>
553-
val jarPath = "../hive/src/test/resources/TestUDTF.jar"
554-
val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath"
555-
556-
// Configurations and temporary functions added in this session should be visible to all
557-
// the other sessions.
558-
Seq(
559-
"SET foo=bar",
560-
s"ADD JAR $jarURL",
561-
s"""CREATE TEMPORARY FUNCTION udtf_count2
562-
|AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
563-
""".stripMargin
564-
).foreach(statement.execute)
565-
},
566-
567-
{ statement =>
568-
val rs1 = statement.executeQuery("SET foo")
569-
570-
assert(rs1.next())
571-
assert(rs1.getString(1) === "foo")
572-
assert(rs1.getString(2) === "bar")
573-
574-
val rs2 = statement.executeQuery("DESCRIBE FUNCTION udtf_count2")
575-
576-
assert(rs2.next())
577-
assert(rs2.getString(1) === "Function: udtf_count2")
555+
try {
556+
withMultipleConnectionJdbcStatement(
557+
{ statement =>
558+
val jarPath = "../hive/src/test/resources/TestUDTF.jar"
559+
val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath"
560+
561+
// Configurations and temporary functions added in this session should be visible to all
562+
// the other sessions.
563+
Seq(
564+
"SET foo=bar",
565+
s"ADD JAR $jarURL",
566+
s"""CREATE TEMPORARY FUNCTION udtf_count2
567+
|AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
568+
""".stripMargin
569+
).foreach(statement.execute)
570+
},
571+
572+
{ statement =>
573+
val rs1 = statement.executeQuery("SET foo")
574+
575+
assert(rs1.next())
576+
assert(rs1.getString(1) === "foo")
577+
assert(rs1.getString(2) === "bar")
578+
579+
val rs2 = statement.executeQuery("DESCRIBE FUNCTION udtf_count2")
580+
581+
assert(rs2.next())
582+
assert(rs2.getString(1) === "Function: udtf_count2")
583+
584+
assert(rs2.next())
585+
assertResult("Class: org.apache.spark.sql.hive.execution.GenericUDTFCount2") {
586+
rs2.getString(1)
587+
}
578588

579-
assert(rs2.next())
580-
assertResult("Class: org.apache.spark.sql.hive.execution.GenericUDTFCount2") {
581-
rs2.getString(1)
589+
assert(rs2.next())
590+
assert(rs2.getString(1) === "Usage: To be added.")
582591
}
583-
584-
assert(rs2.next())
585-
assert(rs2.getString(1) === "Usage: To be added.")
586-
}
587-
)
592+
)
593+
} finally {
594+
statement.executeQuery("DROP TEMPORARY FUNCTION udtf_count2")
595+
}
588596
}
589597
}
590598

0 commit comments

Comments
 (0)