Skip to content

[SPARK-16858] [SQL] [TEST] Removal of TestHiveSharedState #14463

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.language.implicitConversions

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.exec.FunctionRegistry
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
Expand All @@ -40,7 +39,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.CacheTableCommand
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.{ShutdownHookManager, Utils}

Expand Down Expand Up @@ -86,8 +84,6 @@ class TestHiveContext(
new TestHiveContext(sparkSession.newSession())
}

override def sharedState: TestHiveSharedState = sparkSession.sharedState

override def sessionState: TestHiveSessionState = sparkSession.sessionState

def setCacheTables(c: Boolean): Unit = {
Expand All @@ -112,47 +108,51 @@ class TestHiveContext(
* A [[SparkSession]] used in [[TestHiveContext]].
*
* @param sc SparkContext
* @param scratchDirPath scratch directory used by Hive's metastore client
* @param metastoreTemporaryConf configuration options for Hive's metastore
* @param existingSharedState optional [[TestHiveSharedState]]
* @param existingSharedState optional [[HiveSharedState]]
* @param loadTestTables if true, load the test tables. They can only be loaded when running
* in the JVM, i.e when calling from Python this flag has to be false.
*/
private[hive] class TestHiveSparkSession(
@transient private val sc: SparkContext,
scratchDirPath: File,
metastoreTemporaryConf: Map[String, String],
@transient private val existingSharedState: Option[TestHiveSharedState],
@transient private val existingSharedState: Option[HiveSharedState],
private val loadTestTables: Boolean)
extends SparkSession(sc) with Logging { self =>

def this(sc: SparkContext, loadTestTables: Boolean) {
this(
sc,
TestHiveContext.makeScratchDir(),
HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false),
None,
existingSharedState = None,
loadTestTables)
}

{ // set the metastore temporary configuration
val metastoreTempConf = HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false) ++ Map(
ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true",
// scratch directory used by Hive's metastore client
ConfVars.SCRATCHDIR.varname -> TestHiveContext.makeScratchDir().toURI.toString,
ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY.varname -> "1")

metastoreTempConf.foreach { case (k, v) =>
sc.hadoopConfiguration.set(k, v)
}
}

assume(sc.conf.get(CATALOG_IMPLEMENTATION) == "hive")

// TODO: Let's remove TestHiveSharedState and TestHiveSessionState. Otherwise,
// TODO: Let's remove HiveSharedState and TestHiveSessionState. Otherwise,
// we are not really testing the reflection logic based on the setting of
// CATALOG_IMPLEMENTATION.
@transient
override lazy val sharedState: TestHiveSharedState = {
existingSharedState.getOrElse(
new TestHiveSharedState(sc, scratchDirPath, metastoreTemporaryConf))
override lazy val sharedState: HiveSharedState = {
existingSharedState.getOrElse(new HiveSharedState(sc))
}

@transient
override lazy val sessionState: TestHiveSessionState =
new TestHiveSessionState(self)

override def newSession(): TestHiveSparkSession = {
new TestHiveSparkSession(
sc, scratchDirPath, metastoreTemporaryConf, Some(sharedState), loadTestTables)
new TestHiveSparkSession(sc, Some(sharedState), loadTestTables)
}

private var cacheTables: Boolean = false
Expand Down Expand Up @@ -505,19 +505,6 @@ private[hive] class TestHiveFunctionRegistry extends SimpleFunctionRegistry {
}


private[hive] class TestHiveSharedState(
sc: SparkContext,
scratchDirPath: File,
metastoreTemporaryConf: Map[String, String])
extends HiveSharedState(sc) {

override lazy val metadataHive: HiveClient = {
TestHiveContext.newClientForMetadata(
sc.conf, sc.hadoopConfiguration, scratchDirPath, metastoreTemporaryConf)
}
}


private[hive] class TestHiveSessionState(
sparkSession: TestHiveSparkSession)
extends HiveSessionState(sparkSession) { self =>
Expand Down Expand Up @@ -560,33 +547,6 @@ private[hive] object TestHiveContext {
SQLConf.SHUFFLE_PARTITIONS.key -> "5"
)

/**
* Create a [[HiveClient]] used to retrieve metadata from the Hive MetaStore.
*/
def newClientForMetadata(
conf: SparkConf,
hadoopConf: Configuration,
scratchDirPath: File,
metastoreTemporaryConf: Map[String, String]): HiveClient = {
HiveUtils.newClientForMetadata(
conf,
hadoopConf,
hiveClientConfigurations(hadoopConf, scratchDirPath, metastoreTemporaryConf))
}

/**
* Configurations needed to create a [[HiveClient]].
*/
def hiveClientConfigurations(
hadoopConf: Configuration,
scratchDirPath: File,
metastoreTemporaryConf: Map[String, String]): Map[String, String] = {
HiveUtils.hiveClientConfigurations(hadoopConf) ++ metastoreTemporaryConf ++ Map(
ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true",
ConfVars.SCRATCHDIR.varname -> scratchDirPath.toURI.toString,
ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY.varname -> "1")
}

def makeWarehouseDir(): File = {
val warehouseDir = Utils.createTempDir(namePrefix = "warehouse")
warehouseDir.delete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing
}

private def createRawHiveTable(ddl: String): Unit = {
hiveContext.sharedState.metadataHive.runSqlHive(ddl)
hiveContext.sharedState.asInstanceOf[HiveSharedState].metadataHive.runSqlHive(ddl)
}

private def checkCreateTable(table: String): Unit = {
Expand Down