Skip to content

Commit a6a0810

Browse files
committed
[SPARK-24360][SQL] Support Hive 3.1 metastore
1 parent c08021c commit a6a0810

File tree

10 files changed

+181
-17
lines changed

10 files changed

+181
-17
lines changed

docs/sql-data-sources-hive-tables.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ The following options can be used to configure the version of Hive that is used
115115
<td><code>1.2.1</code></td>
116116
<td>
117117
Version of the Hive metastore. Available
118-
options are <code>0.12.0</code> through <code>2.3.4</code>.
118+
options are <code>0.12.0</code> through <code>2.3.4</code> and <code>3.1.0</code> through <code>3.1.1</code>.
119119
</td>
120120
</tr>
121121
<tr>

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ private[spark] object HiveUtils extends Logging {
6262

6363
val HIVE_METASTORE_VERSION = buildConf("spark.sql.hive.metastore.version")
6464
.doc("Version of the Hive metastore. Available options are " +
65-
s"<code>0.12.0</code> through <code>2.3.4</code>.")
65+
"<code>0.12.0</code> through <code>2.3.4</code> and " +
66+
"<code>3.1.0</code> through <code>3.1.1</code>.")
6667
.stringConf
6768
.createWithDefault(builtinHiveVersion)
6869

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ private[hive] class HiveClientImpl(
105105
case hive.v2_1 => new Shim_v2_1()
106106
case hive.v2_2 => new Shim_v2_2()
107107
case hive.v2_3 => new Shim_v2_3()
108+
case hive.v3_1 => new Shim_v3_1()
108109
}
109110

110111
// Create an internal session state for this HiveClientImpl.
@@ -852,11 +853,17 @@ private[hive] class HiveClientImpl(
852853
client.getAllTables("default").asScala.foreach { t =>
853854
logDebug(s"Deleting table $t")
854855
val table = client.getTable("default", t)
855-
client.getIndexes("default", t, 255).asScala.foreach { index =>
856-
shim.dropIndex(client, "default", t, index.getIndexName)
857-
}
858-
if (!table.isIndexTable) {
859-
client.dropTable("default", t)
856+
try {
857+
client.getIndexes("default", t, 255).asScala.foreach { index =>
858+
shim.dropIndex(client, "default", t, index.getIndexName)
859+
}
860+
if (!table.isIndexTable) {
861+
client.dropTable("default", t)
862+
}
863+
} catch {
864+
case _: NoSuchMethodError =>
865+
// HIVE-18448 Hive 3.0 remove index APIs
866+
client.dropTable("default", t)
860867
}
861868
}
862869
client.getAllDatabases.asScala.filterNot(_ == "default").foreach { db =>

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.serde.serdeConstants
4040

4141
import org.apache.spark.internal.Logging
4242
import org.apache.spark.sql.AnalysisException
43+
import org.apache.spark.sql.SparkSession
4344
import org.apache.spark.sql.catalyst.FunctionIdentifier
4445
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
4546
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, CatalogUtils, FunctionResource, FunctionResourceType}
@@ -1179,3 +1180,128 @@ private[client] class Shim_v2_1 extends Shim_v2_0 {
11791180
private[client] class Shim_v2_2 extends Shim_v2_1
11801181

11811182
private[client] class Shim_v2_3 extends Shim_v2_1
1183+
1184+
private[client] class Shim_v3_1 extends Shim_v2_3 {
1185+
// Spark supports only non-ACID operations
1186+
protected lazy val isAcidIUDoperation = JBoolean.FALSE
1187+
1188+
// Writer ID can be 0 for non-ACID operations
1189+
protected lazy val writeIdInLoadTableOrPartition: JLong = 0L
1190+
1191+
// Statement ID
1192+
protected lazy val stmtIdInLoadTableOrPartition: JInteger = 0
1193+
1194+
protected lazy val listBucketingLevel: JInteger = 0
1195+
1196+
private lazy val clazzLoadFileType = getClass.getClassLoader.loadClass(
1197+
"org.apache.hadoop.hive.ql.plan.LoadTableDesc$LoadFileType")
1198+
1199+
private lazy val loadPartitionMethod =
1200+
findMethod(
1201+
classOf[Hive],
1202+
"loadPartition",
1203+
classOf[Path],
1204+
classOf[Table],
1205+
classOf[JMap[String, String]],
1206+
clazzLoadFileType,
1207+
JBoolean.TYPE,
1208+
JBoolean.TYPE,
1209+
JBoolean.TYPE,
1210+
JBoolean.TYPE,
1211+
JBoolean.TYPE,
1212+
classOf[JLong],
1213+
JInteger.TYPE,
1214+
JBoolean.TYPE)
1215+
private lazy val loadTableMethod =
1216+
findMethod(
1217+
classOf[Hive],
1218+
"loadTable",
1219+
classOf[Path],
1220+
classOf[String],
1221+
clazzLoadFileType,
1222+
JBoolean.TYPE,
1223+
JBoolean.TYPE,
1224+
JBoolean.TYPE,
1225+
JBoolean.TYPE,
1226+
classOf[JLong],
1227+
JInteger.TYPE,
1228+
JBoolean.TYPE)
1229+
private lazy val loadDynamicPartitionsMethod =
1230+
findMethod(
1231+
classOf[Hive],
1232+
"loadDynamicPartitions",
1233+
classOf[Path],
1234+
classOf[String],
1235+
classOf[JMap[String, String]],
1236+
clazzLoadFileType,
1237+
JInteger.TYPE,
1238+
JInteger.TYPE,
1239+
JBoolean.TYPE,
1240+
JLong.TYPE,
1241+
JInteger.TYPE,
1242+
JBoolean.TYPE,
1243+
classOf[AcidUtils.Operation],
1244+
JBoolean.TYPE)
1245+
1246+
override def loadPartition(
1247+
hive: Hive,
1248+
loadPath: Path,
1249+
tableName: String,
1250+
partSpec: JMap[String, String],
1251+
replace: Boolean,
1252+
inheritTableSpecs: Boolean,
1253+
isSkewedStoreAsSubdir: Boolean,
1254+
isSrcLocal: Boolean): Unit = {
1255+
val session = SparkSession.getActiveSession
1256+
assert(session.nonEmpty)
1257+
val database = session.get.sessionState.catalog.getCurrentDatabase
1258+
val table = hive.getTable(database, tableName)
1259+
val loadFileType = if (replace) {
1260+
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
1261+
} else {
1262+
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING"))
1263+
}
1264+
assert(loadFileType.isDefined)
1265+
loadPartitionMethod.invoke(hive, loadPath, table, partSpec, loadFileType.get,
1266+
inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
1267+
isSrcLocal: JBoolean, isAcid, hasFollowingStatsTask,
1268+
writeIdInLoadTableOrPartition, stmtIdInLoadTableOrPartition, replace: JBoolean)
1269+
}
1270+
1271+
override def loadTable(
1272+
hive: Hive,
1273+
loadPath: Path,
1274+
tableName: String,
1275+
replace: Boolean,
1276+
isSrcLocal: Boolean): Unit = {
1277+
val loadFileType = if (replace) {
1278+
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
1279+
} else {
1280+
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING"))
1281+
}
1282+
assert(loadFileType.isDefined)
1283+
loadTableMethod.invoke(hive, loadPath, tableName, loadFileType.get, isSrcLocal: JBoolean,
1284+
isSkewedStoreAsSubdir, isAcidIUDoperation, hasFollowingStatsTask,
1285+
writeIdInLoadTableOrPartition, stmtIdInLoadTableOrPartition: JInteger, replace: JBoolean)
1286+
}
1287+
1288+
override def loadDynamicPartitions(
1289+
hive: Hive,
1290+
loadPath: Path,
1291+
tableName: String,
1292+
partSpec: JMap[String, String],
1293+
replace: Boolean,
1294+
numDP: Int,
1295+
listBucketingEnabled: Boolean): Unit = {
1296+
val loadFileType = if (replace) {
1297+
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
1298+
} else {
1299+
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING"))
1300+
}
1301+
assert(loadFileType.isDefined)
1302+
loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, loadFileType.get,
1303+
numDP: JInteger, listBucketingLevel, isAcid, writeIdInLoadTableOrPartition,
1304+
stmtIdInLoadTableOrPartition, hasFollowingStatsTask, AcidUtils.Operation.NOT_ACID,
1305+
replace: JBoolean)
1306+
}
1307+
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ private[hive] object IsolatedClientLoader extends Logging {
100100
case "2.1" | "2.1.0" | "2.1.1" => hive.v2_1
101101
case "2.2" | "2.2.0" => hive.v2_2
102102
case "2.3" | "2.3.0" | "2.3.1" | "2.3.2" | "2.3.3" | "2.3.4" => hive.v2_3
103+
case "3.1" | "3.1.0" | "3.1.1" => hive.v3_1
103104
case version =>
104105
throw new UnsupportedOperationException(s"Unsupported Hive Metastore version ($version). " +
105106
s"Please set ${HiveUtils.HIVE_METASTORE_VERSION.key} with a valid version.")

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,14 @@ package object client {
7979
exclusions = Seq("org.apache.curator:*",
8080
"org.pentaho:pentaho-aggdesigner-algorithm"))
8181

82-
val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3)
82+
// Since Hive 3.0, HookUtils uses org.apache.logging.log4j.util.Strings
83+
case object v3_1 extends HiveVersion("3.1.1",
84+
extraDeps = Seq("org.apache.logging.log4j:log4j-api:2.10.0",
85+
"org.apache.derby:derby:10.14.1.0"),
86+
exclusions = Seq("org.apache.curator:*",
87+
"org.pentaho:pentaho-aggdesigner-algorithm"))
88+
89+
val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_1)
8390
}
8491
// scalastyle:on
8592

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
114114
// be removed by Hive when Hive is trying to empty the table directory.
115115
val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
116116
val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] =
117-
Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3)
117+
Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_1)
118118

119119
// Ensure all the supported versions are considered here.
120120
assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,5 @@ import org.apache.spark.SparkFunSuite
2323

2424
private[client] trait HiveClientVersions {
2525
protected val versions =
26-
IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3")
26+
IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3", "3.1")
2727
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu
3434
// Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
3535
// hive.metastore.schema.verification from false to true since 2.0
3636
// For details, see the JIRA HIVE-6113 and HIVE-12463
37-
if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3") {
37+
if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3" ||
38+
version == "3.1") {
3839
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
3940
hadoopConf.set("hive.metastore.schema.verification", "false")
4041
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
103103
}
104104

105105
private val versions =
106-
Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3")
106+
Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3", "3.1")
107107

108108
private var client: HiveClient = null
109109

@@ -118,10 +118,15 @@ class VersionsSuite extends SparkFunSuite with Logging {
118118
// Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
119119
// hive.metastore.schema.verification from false to true since 2.0
120120
// For details, see the JIRA HIVE-6113 and HIVE-12463
121-
if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3") {
121+
if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3" ||
122+
version == "3.1") {
122123
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
123124
hadoopConf.set("hive.metastore.schema.verification", "false")
124125
}
126+
// Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`.
127+
if (version == "3.1") {
128+
hadoopConf.set("hive.in.test", "true")
129+
}
125130
client = buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf))
126131
if (versionSpark != null) versionSpark.reset()
127132
versionSpark = TestHiveVersion(client)
@@ -318,7 +323,20 @@ class VersionsSuite extends SparkFunSuite with Logging {
318323
properties = Map.empty)
319324

320325
test(s"$version: sql create partitioned table") {
321-
client.runSqlHive("CREATE TABLE src_part (value INT) PARTITIONED BY (key1 INT, key2 INT)")
326+
val table = CatalogTable(
327+
identifier = TableIdentifier("src_part", Some("default")),
328+
tableType = CatalogTableType.MANAGED,
329+
schema = new StructType().add("value", "int").add("key1", "int").add("key2", "int"),
330+
partitionColumnNames = Seq("key1", "key2"),
331+
storage = CatalogStorageFormat(
332+
locationUri = None,
333+
inputFormat = Some(classOf[TextInputFormat].getName),
334+
outputFormat = Some(classOf[HiveIgnoreKeyTextOutputFormat[_, _]].getName),
335+
serde = Some(classOf[LazySimpleSerDe].getName()),
336+
compressed = false,
337+
properties = Map.empty
338+
))
339+
client.createTable(table, ignoreIfExists = false)
322340
}
323341

324342
val testPartitionCount = 2
@@ -556,9 +574,12 @@ class VersionsSuite extends SparkFunSuite with Logging {
556574
}
557575

558576
test(s"$version: sql create index and reset") {
559-
client.runSqlHive("CREATE TABLE indexed_table (key INT)")
560-
client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " +
561-
"as 'COMPACT' WITH DEFERRED REBUILD")
577+
// HIVE-18448 Since Hive 3.0, INDEX is not supported.
578+
if (version != "3.1") {
579+
client.runSqlHive("CREATE TABLE indexed_table (key INT)")
580+
client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " +
581+
"as 'COMPACT' WITH DEFERRED REBUILD")
582+
}
562583
}
563584

564585
///////////////////////////////////////////////////////////////////////////

0 commit comments

Comments
 (0)