Skip to content

[SPARK-50137][HIVE] Avoid fallback to Hive-incompatible ways when table creation fails by thrift exception #48668

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
logInfo(message)
saveTableIntoHive(table, ignoreIfExists)
} catch {
case NonFatal(e) =>
case NonFatal(e) if !HiveUtils.causedByThrift(e) =>
val warningMessage =
log"Could not persist ${MDC(TABLE_NAME, table.identifier.quotedString)} in a Hive " +
log"compatible way. Persisting it into Hive metastore in Spark SQL specific format."
Expand Down
15 changes: 15 additions & 0 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -520,4 +520,19 @@ private[spark] object HiveUtils extends Logging {
case PATTERN_FOR_KEY_EQ_VAL(_, v) => FileUtils.unescapePathName(v)
}
}

/**
* Determine if a Hive call exception is caused by thrift error.
*/
def causedByThrift(e: Throwable): Boolean = {
var target = e
while (target != null) {
val msg = target.getMessage()
if (msg != null && msg.matches("(?s).*(TApplication|TProtocol|TTransport)Exception.*")) {
return true
}
target = target.getCause()
}
false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ private[hive] class HiveClientImpl(
try {
return f
} catch {
case e: Exception if causedByThrift(e) =>
case e: Exception if HiveUtils.causedByThrift(e) =>
caughtException = e
logWarning(
log"HiveClient got thrift exception, destroying client and retrying " +
Expand All @@ -243,18 +243,6 @@ private[hive] class HiveClientImpl(
throw caughtException
}

private def causedByThrift(e: Throwable): Boolean = {
var target = e
while (target != null) {
val msg = target.getMessage()
if (msg != null && msg.matches("(?s).*(TApplication|TProtocol|TTransport)Exception.*")) {
return true
}
target = target.getCause()
}
false
}

private def client: Hive = {
if (clientLoader.cachedHive != null) {
clientLoader.cachedHive.asInstanceOf[Hive]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hive

import org.apache.hadoop.conf.Configuration
import org.apache.logging.log4j.Level

import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down Expand Up @@ -241,4 +242,33 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
val alteredTable = externalCatalog.getTable("db1", tableName)
assert(DataTypeUtils.sameType(alteredTable.schema, newSchema))
}

test("SPARK-50137: Avoid fallback to Hive-incompatible ways on thrift exception") {
val hadoopConf = new Configuration()
// Use an unavailable uri to mock client connection timeout.
hadoopConf.set("hive.metastore.uris", "thrift://1.1.1.1:1111")
hadoopConf.set("hive.metastore.client.connection.timeout", "1s")
// Dummy HiveExternalCatalog to mock that the hive client is still available
// when checking database and table.
val catalog = new HiveExternalCatalog(new SparkConf, hadoopConf) {
override def requireDbExists(db: String): Unit = {}
override def tableExists(db: String, table: String): Boolean = false
}
val logAppender = new LogAppender()
withLogAppender(logAppender, level = Some(Level.WARN)) {
val table = CatalogTable(
identifier = TableIdentifier("tbl", Some("default")),
tableType = CatalogTableType.EXTERNAL,
storage = storageFormat.copy(locationUri = Some(newUriForDatabase())),
schema = new StructType()
.add("col1", "string"),
provider = Some("parquet"))
intercept[Throwable] {
catalog.createTable(table, ignoreIfExists = false)
}
}
assert(!logAppender.loggingEvents.map(_.getMessage.getFormattedMessage).contains(
"Could not persist `default`.`tbl` in a Hive compatible way. " +
"Persisting it into Hive metastore in Spark SQL specific format."))
}
}