Skip to content

Commit 701e356

Browse files
committed
Revert "[SPARK-50137][HIVE] Avoid fallback to Hive-incompatible ways when table creation fails by thrift exception"
This reverts commit bc27f69.
1 parent bc27f69 commit 701e356

File tree

4 files changed

+14
-47
lines changed

4 files changed

+14
-47
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
417417
logInfo(message)
418418
saveTableIntoHive(table, ignoreIfExists)
419419
} catch {
420-
case NonFatal(e) if !HiveUtils.causedByThrift(e) =>
420+
case NonFatal(e) =>
421421
val warningMessage =
422422
log"Could not persist ${MDC(TABLE_NAME, table.identifier.quotedString)} in a Hive " +
423423
log"compatible way. Persisting it into Hive metastore in Spark SQL specific format."

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -521,19 +521,4 @@ private[spark] object HiveUtils extends Logging {
521521
case PATTERN_FOR_KEY_EQ_VAL(_, v) => FileUtils.unescapePathName(v)
522522
}
523523
}
524-
525-
/**
526-
* Determine if a Hive call exception is caused by thrift error.
527-
*/
528-
def causedByThrift(e: Throwable): Boolean = {
529-
var target = e
530-
while (target != null) {
531-
val msg = target.getMessage()
532-
if (msg != null && msg.matches("(?s).*(TApplication|TProtocol|TTransport)Exception.*")) {
533-
return true
534-
}
535-
target = target.getCause()
536-
}
537-
false
538-
}
539524
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ private[hive] class HiveClientImpl(
236236
try {
237237
return f
238238
} catch {
239-
case e: Exception if HiveUtils.causedByThrift(e) =>
239+
case e: Exception if causedByThrift(e) =>
240240
caughtException = e
241241
logWarning(
242242
log"HiveClient got thrift exception, destroying client and retrying " +
@@ -251,6 +251,18 @@ private[hive] class HiveClientImpl(
251251
throw caughtException
252252
}
253253

254+
private def causedByThrift(e: Throwable): Boolean = {
255+
var target = e
256+
while (target != null) {
257+
val msg = target.getMessage()
258+
if (msg != null && msg.matches("(?s).*(TApplication|TProtocol|TTransport)Exception.*")) {
259+
return true
260+
}
261+
target = target.getCause()
262+
}
263+
false
264+
}
265+
254266
private def client: Hive = {
255267
if (clientLoader.cachedHive != null) {
256268
clientLoader.cachedHive.asInstanceOf[Hive]

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.sql.hive
1919

2020
import org.apache.hadoop.conf.Configuration
21-
import org.apache.logging.log4j.Level
2221

2322
import org.apache.spark.SparkConf
2423
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -242,33 +241,4 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
242241
val alteredTable = externalCatalog.getTable("db1", tableName)
243242
assert(DataTypeUtils.sameType(alteredTable.schema, newSchema))
244243
}
245-
246-
test("SPARK-50137: Avoid fallback to Hive-incompatible ways on thrift exception") {
247-
val hadoopConf = new Configuration()
248-
// Use an unavailable uri to mock client connection timeout.
249-
hadoopConf.set("hive.metastore.uris", "thrift://1.1.1.1:1111")
250-
hadoopConf.set("hive.metastore.client.connection.timeout", "1s")
251-
// Dummy HiveExternalCatalog to mock that the hive client is still available
252-
// when checking database and table.
253-
val catalog = new HiveExternalCatalog(new SparkConf, hadoopConf) {
254-
override def requireDbExists(db: String): Unit = {}
255-
override def tableExists(db: String, table: String): Boolean = false
256-
}
257-
val logAppender = new LogAppender()
258-
withLogAppender(logAppender, level = Some(Level.WARN)) {
259-
val table = CatalogTable(
260-
identifier = TableIdentifier("tbl", Some("default")),
261-
tableType = CatalogTableType.EXTERNAL,
262-
storage = storageFormat.copy(locationUri = Some(newUriForDatabase())),
263-
schema = new StructType()
264-
.add("col1", "string"),
265-
provider = Some("parquet"))
266-
intercept[Throwable] {
267-
catalog.createTable(table, ignoreIfExists = false)
268-
}
269-
}
270-
assert(!logAppender.loggingEvents.map(_.getMessage.getFormattedMessage).contains(
271-
"Could not persist `default`.`tbl` in a Hive compatible way. " +
272-
"Persisting it into Hive metastore in Spark SQL specific format."))
273-
}
274244
}

0 commit comments

Comments
 (0)