Skip to content

Commit 1292346

Browse files
committed
[SPARK-8306] AddJar command needs to set the new class loader to the HiveConf inside executionHive.state.
1 parent 104f30c commit 1292346

File tree

4 files changed

+45
-3
lines changed

4 files changed

+45
-3
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ private[hive] class ClientWrapper(
9595
case hive.v14 => new Shim_v0_14()
9696
}
9797

98+
// Create an internal session state for this ClientWrapper.
9899
val state = {
99100
val original = Thread.currentThread().getContextClassLoader
100101
Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
@@ -131,8 +132,15 @@ private[hive] class ClientWrapper(
131132
*/
132133
private def withHiveState[A](f: => A): A = synchronized {
133134
val original = Thread.currentThread().getContextClassLoader
135+
// This setContextClassLoader is used for Hive 0.12's metastore since Hive 0.12 will not
136+
// internally override the context class loader of the current thread with the class loader
137+
// associated with the HiveConf in `state`.
134138
Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
139+
// Set the thread local metastore client to the client associated with this ClientWrapper.
135140
Hive.set(client)
141+
// Starting from Hive 0.13.0, setCurrentSessionState will use the classLoader associated
142+
// with the HiveConf in `state` to override the context class loader of the current
143+
// thread.
136144
shim.setCurrentSessionState(state)
137145
val ret = try f finally {
138146
Thread.currentThread().setContextClassLoader(original)

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,15 @@ case class AddJar(path: String) extends RunnableCommand {
9191
val jarURL = new java.io.File(path).toURL
9292
val newClassLoader = new java.net.URLClassLoader(Array(jarURL), currentClassLoader)
9393
Thread.currentThread.setContextClassLoader(newClassLoader)
94-
org.apache.hadoop.hive.ql.metadata.Hive.get().getConf().setClassLoader(newClassLoader)
95-
96-
// Add jar to isolated hive classloader
94+
// We need to explicitly set the class loader associated with the conf in executionHive's
95+
// state because this class loader will be used as the context class loader of the current
96+
// thread to execute any Hive command.
97+
// We cannot use `org.apache.hadoop.hive.ql.metadata.Hive.get().getConf()` because Hive.get()
98+
// returns the value of a thread local variable and its HiveConf may not be the HiveConf
99+
// associated with `executionHive.state` (for example, HiveContext is created in one thread
100+
// and then add jar is called from another thread).
101+
hiveContext.executionHive.state.getConf.setClassLoader(newClassLoader)
102+
// Add jar to isolated hive (metadataHive) class loader.
97103
hiveContext.runSqlHive(s"ADD JAR $path")
98104

99105
// Add jar to executors
112 KB
Binary file not shown.

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -934,4 +934,32 @@ class SQLQuerySuite extends QueryTest {
934934
sql("set hive.exec.dynamic.partition.mode=strict")
935935
}
936936
}
937+
938+
test("Call add jar in a different thread (SPARK-8306)") {
939+
@volatile var error: Option[Throwable] = None
940+
val thread = new Thread {
941+
override def run() {
942+
// To make sure this test works, this jar should not be loaded in another place.
943+
TestHive.sql(
944+
s"ADD JAR ${TestHive.getHiveFile("hive-contrib-0.13.1.jar").getCanonicalPath()}")
945+
try {
946+
TestHive.sql(
947+
"""
948+
|CREATE TEMPORARY FUNCTION example_max
949+
|AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax'
950+
""".stripMargin)
951+
} catch {
952+
case throwable: Throwable =>
953+
error = Some(throwable)
954+
}
955+
}
956+
}
957+
thread.start()
958+
thread.join()
959+
error match {
960+
case Some(throwable) =>
961+
fail("CREATE TEMPORARY FUNCTION should not fail.", throwable)
962+
case None => // OK
963+
}
964+
}
937965
}

0 commit comments

Comments
 (0)