Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ private[hive] object SparkSQLCLIDriver extends Logging {

// Set all properties specified via command line.
val conf: HiveConf = sessionState.getConf
// Hive 2.0.0 onwards HiveConf.getClassLoader returns the UDFClassLoader (created by Hive).
// Because of this spark cannot find the jars as class loader got changed
// Hive changed the class loader because of HIVE-11878, so it is required to use old
// classLoader as sparks loaded all the jars in this classLoader
conf.setClassLoader(Thread.currentThread().getContextClassLoader)
sessionState.cmdProperties.entrySet().asScala.foreach { item =>
val key = item.getKey.toString
val value = item.getValue.toString
Expand All @@ -133,20 +138,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
// Clean up after we exit
ShutdownHookManager.addShutdownHook { () => SparkSQLEnv.stop() }

val remoteMode = isRemoteMode(sessionState)
// "-h" option has been passed, so connect to Hive thrift server.
if (!remoteMode) {
// Hadoop-20 and above - we need to augment classpath using hiveconf
// components.
// See also: code in ExecDriver.java
var loader = conf.getClassLoader
val auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS)
if (StringUtils.isNotBlank(auxJars)) {
loader = ThriftserverShimUtils.addToClassPath(loader, StringUtils.split(auxJars, ","))
}
conf.setClassLoader(loader)
Thread.currentThread().setContextClassLoader(loader)
} else {
if (isRemoteMode(sessionState)) {
// Hive 1.2 + not supported in CLI
throw new RuntimeException("Remote operations not supported")
}
Expand All @@ -164,6 +156,15 @@ private[hive] object SparkSQLCLIDriver extends Logging {
val cli = new SparkSQLCLIDriver
cli.setHiveVariables(oproc.getHiveVariables)

// In SparkSQL CLI, we may want to use jars augmented by hiveconf
// hive.aux.jars.path, here we add jars augmented by hiveconf to
// Spark's SessionResourceLoader to obtain these jars.
val auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS)
if (StringUtils.isNotBlank(auxJars)) {
val resourceLoader = SparkSQLEnv.sqlContext.sessionState.resourceLoader
StringUtils.split(auxJars, ",").foreach(resourceLoader.addJar(_))
}

// TODO work around for set the log output to console, because the HiveContext
// will set the output into an invalid buffer.
sessionState.in = System.in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,4 +305,31 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
"SELECT example_format('%o', 93);" -> "135"
)
}

test("SPARK-28840 test --jars command") {
val jarFile = new File("../../sql/hive/src/test/resources/SPARK-21101-1.0.jar").getCanonicalPath
runCliWithin(
1.minute,
Seq("--jars", s"$jarFile"))(
"CREATE TEMPORARY FUNCTION testjar AS" +
" 'org.apache.spark.sql.hive.execution.UDTFStack';" -> "",
"SELECT testjar(1,'TEST-SPARK-TEST-jar', 28840);" -> "TEST-SPARK-TEST-jar\t28840"
)
}

test("SPARK-28840 test --jars and hive.aux.jars.path command") {
val jarFile = new File("../../sql/hive/src/test/resources/SPARK-21101-1.0.jar").getCanonicalPath
val hiveContribJar = HiveTestUtils.getHiveContribJar.getCanonicalPath
runCliWithin(
1.minute,
Seq("--jars", s"$jarFile", "--conf",
s"spark.hadoop.${ConfVars.HIVEAUXJARS}=$hiveContribJar"))(
"CREATE TEMPORARY FUNCTION testjar AS" +
" 'org.apache.spark.sql.hive.execution.UDTFStack';" -> "",
"SELECT testjar(1,'TEST-SPARK-TEST-jar', 28840);" -> "TEST-SPARK-TEST-jar\t28840",
"CREATE TEMPORARY FUNCTION example_max AS " +
"'org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax';" -> "",
"SELECT concat_ws(',', 'First', example_max(1234321), 'Third');" -> "First,1234321,Third"
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.hive.thriftserver

import org.apache.commons.logging.LogFactory
import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hive.service.cli.{RowSet, RowSetFactory, TableSchema, Type}
import org.apache.hive.service.cli.Type._
Expand Down Expand Up @@ -60,12 +59,6 @@ private[thriftserver] object ThriftserverShimUtils {
ARRAY_TYPE, MAP_TYPE, STRUCT_TYPE)
}

private[thriftserver] def addToClassPath(
loader: ClassLoader,
auxJars: Array[String]): ClassLoader = {
Utilities.addToClassPath(loader, auxJars)
}

private[thriftserver] val testedProtocolVersions = Seq(
HIVE_CLI_SERVICE_PROTOCOL_V1,
HIVE_CLI_SERVICE_PROTOCOL_V2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@

package org.apache.spark.sql.hive.thriftserver

import java.security.AccessController

import scala.collection.JavaConverters._

import org.apache.hadoop.hive.ql.exec.AddToClassPathAction
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde2.thrift.Type
import org.apache.hadoop.hive.serde2.thrift.Type._
Expand Down Expand Up @@ -65,13 +60,6 @@ private[thriftserver] object ThriftserverShimUtils {
ARRAY_TYPE, MAP_TYPE, STRUCT_TYPE)
}

private[thriftserver] def addToClassPath(
loader: ClassLoader,
auxJars: Array[String]): ClassLoader = {
val addAction = new AddToClassPathAction(loader, auxJars.toList.asJava)
AccessController.doPrivileged(addAction)
}

private[thriftserver] val testedProtocolVersions = Seq(
HIVE_CLI_SERVICE_PROTOCOL_V1,
HIVE_CLI_SERVICE_PROTOCOL_V2,
Expand Down