Skip to content

Commit 73cf5de

Browse files
yhuaimarmbrus
authored andcommitted
[SPARK-8306] [SQL] AddJar command needs to set the new class loader to the HiveConf inside executionHive.state.
https://issues.apache.org/jira/browse/SPARK-8306 I will try to add a test later. marmbrus aarondav Author: Yin Huai <yhuai@databricks.com> Closes #6758 from yhuai/SPARK-8306 and squashes the following commits: 1292346 [Yin Huai] [SPARK-8306] AddJar command needs to set the new class loader to the HiveConf inside executionHive.state. (cherry picked from commit 302556f) Signed-off-by: Michael Armbrust <michael@databricks.com> Conflicts: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
1 parent 5aedfa2 commit 73cf5de

File tree

4 files changed

+46
-3
lines changed

4 files changed

+46
-3
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ private[hive] class ClientWrapper(
9090
}
9191
}
9292

93+
// Create an internal session state for this ClientWrapper.
9394
val state = {
9495
val original = Thread.currentThread().getContextClassLoader
9596
Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
@@ -126,8 +127,16 @@ private[hive] class ClientWrapper(
126127
*/
127128
private def withHiveState[A](f: => A): A = synchronized {
128129
val original = Thread.currentThread().getContextClassLoader
130+
// This setContextClassLoader is used for Hive 0.12's metastore since Hive 0.12 will not
131+
// internally override the context class loader of the current thread with the class loader
132+
// associated with the HiveConf in `state`.
129133
Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
134+
// Set the thread local metastore client to the client associated with this ClientWrapper.
130135
Hive.set(client)
136+
137+
// Starting from Hive 0.13.0, setCurrentSessionState will use the classLoader associated
138+
// with the HiveConf in `state` to override the context class loader of the current
139+
// thread.
131140
version match {
132141
case hive.v12 =>
133142
classOf[SessionState]

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,15 @@ case class AddJar(path: String) extends RunnableCommand {
9191
val jarURL = new java.io.File(path).toURL
9292
val newClassLoader = new java.net.URLClassLoader(Array(jarURL), currentClassLoader)
9393
Thread.currentThread.setContextClassLoader(newClassLoader)
94-
org.apache.hadoop.hive.ql.metadata.Hive.get().getConf().setClassLoader(newClassLoader)
95-
96-
// Add jar to isolated hive classloader
94+
// We need to explicitly set the class loader associated with the conf in executionHive's
95+
// state because this class loader will be used as the context class loader of the current
96+
// thread to execute any Hive command.
97+
// We cannot use `org.apache.hadoop.hive.ql.metadata.Hive.get().getConf()` because Hive.get()
98+
// returns the value of a thread local variable and its HiveConf may not be the HiveConf
99+
// associated with `executionHive.state` (for example, HiveContext is created in one thread
100+
// and then add jar is called from another thread).
101+
hiveContext.executionHive.state.getConf.setClassLoader(newClassLoader)
102+
// Add jar to isolated hive (metadataHive) class loader.
97103
hiveContext.runSqlHive(s"ADD JAR $path")
98104

99105
// Add jar to executors
112 KB
Binary file not shown.

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -906,4 +906,32 @@ class SQLQuerySuite extends QueryTest {
906906
sql("set hive.exec.dynamic.partition.mode=strict")
907907
}
908908
}
909+
910+
test("Call add jar in a different thread (SPARK-8306)") {
911+
@volatile var error: Option[Throwable] = None
912+
val thread = new Thread {
913+
override def run() {
914+
// To make sure this test works, this jar should not be loaded in another place.
915+
TestHive.sql(
916+
s"ADD JAR ${TestHive.getHiveFile("hive-contrib-0.13.1.jar").getCanonicalPath()}")
917+
try {
918+
TestHive.sql(
919+
"""
920+
|CREATE TEMPORARY FUNCTION example_max
921+
|AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax'
922+
""".stripMargin)
923+
} catch {
924+
case throwable: Throwable =>
925+
error = Some(throwable)
926+
}
927+
}
928+
}
929+
thread.start()
930+
thread.join()
931+
error match {
932+
case Some(throwable) =>
933+
fail("CREATE TEMPORARY FUNCTION should not fail.", throwable)
934+
case None => // OK
935+
}
936+
}
909937
}

0 commit comments

Comments
 (0)