Skip to content

[SPARK-26794][SQL]SparkSession enableHiveSupport does not point to hive but in-memory while the SparkContext exists #23709

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 15 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class SparkSession private(
@Unstable
@transient
lazy val sharedState: SharedState = {
existingSharedState.getOrElse(new SharedState(sparkContext))
existingSharedState.getOrElse(new SharedState(sparkContext, initialSessionOptions))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,14 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}

/**
* A class that holds all state shared across sessions in a given [[SQLContext]].
*
* @param sparkContext The Spark context associated with this SharedState
* @param initialConfigs The configs from the very first created SparkSession
*/
private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
private[sql] class SharedState(
val sparkContext: SparkContext,
initialConfigs: scala.collection.Map[String, String])
extends Logging {

// Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on
// the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf.
Expand Down Expand Up @@ -77,6 +83,27 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
}
logInfo(s"Warehouse path is '$warehousePath'.")

// These 2 variables should be initiated after `warehousePath`, because in the first place we need
// to load hive-site.xml into hadoopConf and determine the warehouse path which will be set into
// both spark conf and hadoop conf avoiding be affected by any SparkSession level options
private val (conf, hadoopConf) = {
val confClone = sparkContext.conf.clone()
val hadoopConfClone = new Configuration(sparkContext.hadoopConfiguration)
// If `SparkSession` is instantiated using an existing `SparkContext` instance and no existing
// `SharedState`, all `SparkSession` level configurations have higher priority to generate a
// `SharedState` instance. This will be done only once then shared across `SparkSession`s
initialConfigs.foreach {
case (k, _) if k == "hive.metastore.warehouse.dir" || k == WAREHOUSE_PATH.key =>
logWarning(s"Not allowing to set ${WAREHOUSE_PATH.key} or hive.metastore.warehouse.dir " +
s"in SparkSession's options, it should be set statically for cross-session usages")
case (k, v) =>
logDebug(s"Applying initial SparkSession options to SparkConf/HadoopConf: $k -> $v")
confClone.set(k, v)
hadoopConfClone.set(k, v)

}
(confClone, hadoopConfClone)
}

/**
* Class for caching query results reused in future executions.
Expand All @@ -89,7 +116,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
*/
val statusStore: SQLAppStatusStore = {
val kvStore = sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore]
val listener = new SQLAppStatusListener(sparkContext.conf, kvStore, live = true)
val listener = new SQLAppStatusListener(conf, kvStore, live = true)
sparkContext.listenerBus.addToStatusQueue(listener)
val statusStore = new SQLAppStatusStore(kvStore, Some(listener))
sparkContext.ui.foreach(new SQLTab(statusStore, _))
Expand All @@ -101,9 +128,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
*/
lazy val externalCatalog: ExternalCatalogWithListener = {
val externalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
SharedState.externalCatalogClassName(sparkContext.conf),
sparkContext.conf,
sparkContext.hadoopConfiguration)
SharedState.externalCatalogClassName(conf), conf, hadoopConf)

val defaultDbDefinition = CatalogDatabase(
SessionCatalog.DEFAULT_DATABASE,
Expand Down Expand Up @@ -137,7 +162,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
// System preserved database should not exists in metastore. However it's hard to guarantee it
// for every session, because case-sensitivity differs. Here we always lowercase it to make our
// life easier.
val globalTempDB = sparkContext.conf.get(GLOBAL_TEMP_DATABASE).toLowerCase(Locale.ROOT)
val globalTempDB = conf.get(GLOBAL_TEMP_DATABASE).toLowerCase(Locale.ROOT)
if (externalCatalog.databaseExists(globalTempDB)) {
throw new SparkException(
s"$globalTempDB is a system preserved database, please rename your existing database " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private[hive] class TestHiveExternalCatalog(
private[hive] class TestHiveSharedState(
sc: SparkContext,
hiveClient: Option[HiveClient] = None)
extends SharedState(sc) {
extends SharedState(sc, initialConfigs = Map.empty[String, String]) {

override lazy val externalCatalog: ExternalCatalogWithListener = {
new ExternalCatalogWithListener(new TestHiveExternalCatalog(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive

import org.apache.hadoop.hive.conf.HiveConf.ConfVars

import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.sql.internal.SharedState
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.util.Utils

class HiveSharedStateSuite extends SparkFunSuite {

test("initial configs should be passed to SharedState but not SparkContext") {
val conf = new SparkConf().setMaster("local").setAppName("SharedState Test")
val sc = SparkContext.getOrCreate(conf)
val invalidPath = "invalid/path"
val metastorePath = Utils.createTempDir()
val tmpDb = "tmp_db"

// The initial configs used to generate SharedState, none of these should affect the global
// shared SparkContext's configurations. Especially, all these configs are passed to the cloned
// confs inside SharedState except metastore warehouse dir.
val initialConfigs = Map("spark.foo" -> "bar",
WAREHOUSE_PATH.key -> invalidPath,
ConfVars.METASTOREWAREHOUSE.varname -> invalidPath,
CATALOG_IMPLEMENTATION.key -> "hive",
ConfVars.METASTORECONNECTURLKEY.varname ->
s"jdbc:derby:;databaseName=$metastorePath/metastore_db;create=true",
GLOBAL_TEMP_DATABASE.key -> tmpDb)

val state = new SharedState(sc, initialConfigs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we be more real-world and create SparkSession here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In hive module test, there is an long cached TestHiveSession, so I use this SharedState explicitly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can we build SparkSession in the above test case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Individually the above test runs correctly, but will use a existing SparkSession with hive support while runs under the whole hive module. so I add the second test to insure all configurations passed correctly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add beforeEach and call SparkSession.clearActiveSession and SparkSession.clearDefaultSession?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without this fix ,test 1 fails individually but pass fully

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to move test 1 to sql core module, we can catch the IllegalArgumentException to check it. Then the test is reliable, it doesn't matter we run it individually or with other tests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IllegalArgumentException does not mean that the catalog is hive or not. It will also pass anyway with or without this fix,and the fact is that it doesn't have the chance to get to where the catalog is initialized.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then let's just remove test 1, but post the code in PR description to let people know how to reproduce the bug.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. both done.

assert(state.warehousePath !== invalidPath, "warehouse path can't determine by session options")
assert(sc.conf.get(WAREHOUSE_PATH.key) !== invalidPath,
"warehouse conf in session options can't affect application wide spark conf")
assert(sc.hadoopConfiguration.get(ConfVars.METASTOREWAREHOUSE.varname) !== invalidPath,
"warehouse conf in session options can't affect application wide hadoop conf")

assert(!state.sparkContext.conf.contains("spark.foo"),
"static spark conf should not be affected by session")
assert(state.externalCatalog.unwrapped.isInstanceOf[HiveExternalCatalog],
"Initial SparkSession options can determine the catalog")
val client = state.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client
assert(client.getConf("spark.foo", "") === "bar",
"session level conf should be passed to catalog")
assert(client.getConf(ConfVars.METASTOREWAREHOUSE.varname, invalidPath) !== invalidPath,
"session level conf should be passed to catalog except warehouse dir")

assert(state.globalTempViewManager.database === tmpDb)
}
}