Skip to content

Commit 00294fc

Browse files
committed
[SC-5452] Do not assume that an ExternalCatalog is a HiveExternalCatalog
## What changes were proposed in this pull request? The Spark extensions suite added a wrapper for the `ExternalCatalog`. This wrapper is injected into the `SharedState` when the spark extensions contain catalog hooks, and it sits between the `SharedState` and the actual `ExternalCatalog`. This - unfortunately - breaks `InsertIntoHiveTable` and a number tests, that assumes that the `ExternalCatalog` in the `SharedState` is always a `HiveExternalCatalog` in order to extract the `HiveClient`. This PR fixes this problem by adding a utility method that properly extracts the `HiveClient` from the `SharedState`, and it updates all uses of the `spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client` with the new utility method. ## How was this patch tested? Existing tests. Author: Herman van Hovell <hvanhovell@databricks.com> Closes apache#153 from hvanhovell/SC-5452.
1 parent 3e372a1 commit 00294fc

File tree

11 files changed

+23
-24
lines changed

11 files changed

+23
-24
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,8 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
3636
/**
3737
* A Hive client used for interacting with the metastore.
3838
*/
39-
lazy val metadataHive: HiveClient = {
40-
val catalog = sparkSession.sharedState.externalCatalog match {
41-
case catalog: HiveExternalCatalog => catalog
42-
case HookCallingExternalCatalog(catalog: HiveExternalCatalog) => catalog
43-
}
44-
catalog.client.newSession()
45-
}
39+
lazy val metadataHive: HiveClient =
40+
HiveUtils.getHiveClient(sparkSession.sharedState).newSession()
4641

4742
/**
4843
* Internal catalog for managing table and database states.

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.spark.{SparkConf, SparkContext}
3737
import org.apache.spark.internal.Logging
3838
import org.apache.spark.sql._
3939
import org.apache.spark.sql.hive.client._
40-
import org.apache.spark.sql.internal.SQLConf
40+
import org.apache.spark.sql.internal.{HookCallingExternalCatalog, SharedState, SQLConf}
4141
import org.apache.spark.sql.internal.SQLConf._
4242
import org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION, WAREHOUSE_PATH}
4343
import org.apache.spark.sql.types._
@@ -455,4 +455,11 @@ private[spark] object HiveUtils extends Logging {
455455
case (decimal, DecimalType()) => decimal.toString
456456
case (other, tpe) if primitiveTypes contains tpe => other.toString
457457
}
458+
459+
/** Get the Hive client from a SharedState. */
460+
def getHiveClient(state: SharedState): HiveClient = state.externalCatalog match {
461+
case c: HiveExternalCatalog => c.client
462+
case HookCallingExternalCatalog(c: HiveExternalCatalog) => c.client
463+
case _ => sys.error(s"Cannot extract Hive client from $state")
464+
}
458465
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ case class InsertIntoHiveTable(
131131
def getExternalTmpPath(path: Path): Path = {
132132
import org.apache.spark.sql.hive.client.hive._
133133

134-
val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version
134+
val hiveVersion = HiveUtils.getHiveClient(sqlContext.sharedState).version
135135
// Before Hive 1.1, when inserting into a table, Hive will create the staging directory under
136136
// a common scratch directory. After the writing is finished, Hive will simply empty the table
137137
// directory and move the staging directory to it.

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
589589
}
590590

591591
test("SPARK-15887: hive-site.xml should be loaded") {
592-
val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
592+
val hiveClient = HiveUtils.getHiveClient(spark.sharedState)
593593
assert(hiveClient.getConf("hive.in.test", "") == "true")
594594
}
595595
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
3535
with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
3636

3737
// To test `HiveExternalCatalog`, we need to read/write the raw table meta from/to hive client.
38-
val hiveClient: HiveClient =
39-
spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
38+
val hiveClient: HiveClient = HiveUtils.getHiveClient(spark.sharedState)
4039

4140
val tempDir = Utils.createTempDir().getCanonicalFile
4241

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -396,8 +396,7 @@ object SetMetastoreURLTest extends Logging {
396396
}
397397

398398
// HiveExternalCatalog is used when Hive support is enabled.
399-
val actualMetastoreURL =
400-
spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
399+
val actualMetastoreURL = HiveUtils.getHiveClient(spark.sharedState)
401400
.getConf("javax.jdo.option.ConnectionURL", "this_is_a_wrong_URL")
402401
logInfo(s"javax.jdo.option.ConnectionURL is $actualMetastoreURL")
403402

@@ -823,7 +822,7 @@ object SPARK_18360 {
823822
val defaultDbLocation = spark.catalog.getDatabase("default").locationUri
824823
assert(new Path(defaultDbLocation) == new Path(spark.sharedState.warehousePath))
825824

826-
val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
825+
val hiveClient = HiveUtils.getHiveClient(spark.sharedState)
827826

828827
try {
829828
val tableMeta = CatalogTable(

sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
5353

5454
// To test `HiveExternalCatalog`, we need to read the raw table metadata(schema, partition
5555
// columns and bucket specification are still in table properties) from hive client.
56-
private def hiveClient: HiveClient =
57-
sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
56+
private def hiveClient: HiveClient = HiveUtils.getHiveClient(sharedState)
5857

5958
test("persistent JSON table") {
6059
withTable("jsonTable") {

sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing
294294
}
295295

296296
private def createRawHiveTable(ddl: String): Unit = {
297-
hiveContext.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.runSqlHive(ddl)
297+
HiveUtils.getHiveClient(hiveContext.sharedState).runSqlHive(ddl)
298298
}
299299

300300
private def checkCreateTable(table: String): Unit = {

sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
285285
sql(s"analyze table $tableName compute STATISTICS FOR COLUMNS " + stats.keys.mkString(", "))
286286

287287
// Validate statistics
288-
val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
288+
val hiveClient = HiveUtils.getHiveClient(spark.sharedState)
289289
val table = hiveClient.getTable("default", tableName)
290290

291291
val props = table.properties.filterKeys(_.startsWith("spark.sql.statistics.colStats"))

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAl
2727
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType}
2828
import org.apache.spark.sql.catalyst.TableIdentifier
2929
import org.apache.spark.sql.execution.command.DDLUtils
30-
import org.apache.spark.sql.hive.HiveExternalCatalog
30+
import org.apache.spark.sql.hive.HiveUtils
3131
import org.apache.spark.sql.hive.test.TestHiveSingleton
3232
import org.apache.spark.sql.internal.SQLConf
3333
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
@@ -990,7 +990,7 @@ class HiveDDLSuite
990990
val indexName = tabName + "_index"
991991
withTable(tabName) {
992992
// Spark SQL does not support creating index. Thus, we have to use Hive client.
993-
val client = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
993+
val client = HiveUtils.getHiveClient(spark.sharedState)
994994
sql(s"CREATE TABLE $tabName(a int)")
995995

996996
try {
@@ -1020,7 +1020,7 @@ class HiveDDLSuite
10201020
val tabName = "tab1"
10211021
withTable(tabName) {
10221022
// Spark SQL does not support creating skewed table. Thus, we have to use Hive client.
1023-
val client = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
1023+
val client = HiveUtils.getHiveClient(spark.sharedState)
10241024
client.runSqlHive(
10251025
s"""
10261026
|CREATE Table $tabName(col1 int, col2 int)

sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.io.File
2222
import org.scalatest.BeforeAndAfterAll
2323

2424
import org.apache.spark.sql.{QueryTest, Row}
25-
import org.apache.spark.sql.hive.HiveExternalCatalog
25+
import org.apache.spark.sql.hive.HiveUtils
2626
import org.apache.spark.sql.hive.test.TestHiveSingleton
2727
import org.apache.spark.sql.sources._
2828
import org.apache.spark.sql.types._
@@ -153,7 +153,7 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
153153
}
154154

155155
test("SPARK-18220: read Hive orc table with varchar column") {
156-
val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
156+
val hiveClient = HiveUtils.getHiveClient(spark.sharedState)
157157
try {
158158
hiveClient.runSqlHive("CREATE TABLE orc_varchar(a VARCHAR(10)) STORED AS orc")
159159
hiveClient.runSqlHive("INSERT INTO TABLE orc_varchar SELECT 'a' FROM (SELECT 1) t")

0 commit comments

Comments
 (0)