Skip to content

[SPARK-46608][SQL] Restore backward compatibility of JdbcDialect.classifyException #44449

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,11 @@
"message" : [
"Check that the table <tableName> exists."
]
},
"UNCLASSIFIED" : {
"message" : [
"<message>"
]
}
},
"sqlState" : "HV000"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"CREATE TABLE $catalogName.new_table (i INT) TBLPROPERTIES('a'='1')")
}
assert(e.getErrorClass == "FAILED_JDBC.CREATE_TABLE")
assert(e.getErrorClass == "FAILED_JDBC.UNCLASSIFIED")
testCreateTableWithProperty(s"$catalogName.new_table")
}
}
Expand Down Expand Up @@ -279,7 +279,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
sql(s"CREATE index i1 ON $catalogName.new_table (col1)")
},
errorClass = "INDEX_ALREADY_EXISTS",
parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`")
parameters = Map("indexName" -> "i1", "tableName" -> "new_table")
)

sql(s"DROP index i1 ON $catalogName.new_table")
Expand All @@ -304,7 +304,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
sql(s"DROP index i1 ON $catalogName.new_table")
},
errorClass = "INDEX_NOT_FOUND",
parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`")
parameters = Map("indexName" -> "i1", "tableName" -> "new_table")
)
}
}
Expand Down
4 changes: 4 additions & 0 deletions docs/sql-error-conditions-failed-jdbc-error-class.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,8 @@ Rename the table `<oldName>` to `<newName>`.

Check that the table `<tableName>` exists.

## UNCLASSIFIED

`<message>`


Original file line number Diff line number Diff line change
Expand Up @@ -1183,12 +1183,14 @@ object JdbcUtils extends Logging with SQLConfHelper {
def classifyException[T](
errorClass: String,
messageParameters: Map[String, String],
dialect: JdbcDialect)(f: => T): T = {
dialect: JdbcDialect,
description: String)(f: => T): T = {
try {
f
} catch {
case e: SparkThrowable with Throwable => throw e
case e: Throwable => throw dialect.classifyException(e, errorClass, messageParameters)
case e: Throwable =>
throw dialect.classifyException(e, errorClass, messageParameters, description)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt
"url" -> jdbcOptions.getRedactUrl(),
"indexName" -> toSQLId(indexName),
"tableName" -> toSQLId(name)),
dialect = JdbcDialects.get(jdbcOptions.url)) {
dialect = JdbcDialects.get(jdbcOptions.url),
description = s"Failed to create index $indexName in ${name()}") {
JdbcUtils.createIndex(
conn, indexName, ident, columns, columnsProperties, properties, jdbcOptions)
}
Expand All @@ -90,7 +91,8 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt
"url" -> jdbcOptions.getRedactUrl(),
"indexName" -> toSQLId(indexName),
"tableName" -> toSQLId(name)),
dialect = JdbcDialects.get(jdbcOptions.url)) {
dialect = JdbcDialects.get(jdbcOptions.url),
description = s"Failed to drop index $indexName in ${name()}") {
JdbcUtils.dropIndex(conn, indexName, ident, jdbcOptions)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ class JDBCTableCatalog extends TableCatalog
messageParameters = Map(
"url" -> options.getRedactUrl(),
"namespace" -> toSQLId(namespace.toSeq)),
dialect) {
dialect,
description = s"Failed get tables from: ${namespace.mkString(".")}") {
conn.getMetaData.getTables(null, schemaPattern, "%", Array("TABLE"))
}
new Iterator[Identifier] {
Expand All @@ -92,7 +93,8 @@ class JDBCTableCatalog extends TableCatalog
messageParameters = Map(
"url" -> options.getRedactUrl(),
"tableName" -> toSQLId(ident)),
dialect) {
dialect,
description = s"Failed table existence check: $ident") {
JdbcUtils.withConnection(options)(JdbcUtils.tableExists(_, writeOptions))
}
}
Expand All @@ -118,7 +120,8 @@ class JDBCTableCatalog extends TableCatalog
"url" -> options.getRedactUrl(),
"oldName" -> toSQLId(oldIdent),
"newName" -> toSQLId(newIdent)),
dialect) {
dialect,
description = s"Failed table renaming from $oldIdent to $newIdent") {
JdbcUtils.renameTable(conn, oldIdent, newIdent, options)
}
}
Expand Down Expand Up @@ -183,7 +186,8 @@ class JDBCTableCatalog extends TableCatalog
messageParameters = Map(
"url" -> options.getRedactUrl(),
"tableName" -> toSQLId(ident)),
dialect) {
dialect,
description = s"Failed table creation: $ident") {
JdbcUtils.createTable(conn, getTableName(ident), schema, caseSensitive, writeOptions)
}
}
Expand All @@ -199,7 +203,8 @@ class JDBCTableCatalog extends TableCatalog
messageParameters = Map(
"url" -> options.getRedactUrl(),
"tableName" -> toSQLId(ident)),
dialect) {
dialect,
description = s"Failed table altering: $ident") {
JdbcUtils.alterTable(conn, getTableName(ident), changes, options)
}
loadTable(ident)
Expand All @@ -214,7 +219,8 @@ class JDBCTableCatalog extends TableCatalog
messageParameters = Map(
"url" -> options.getRedactUrl(),
"namespace" -> toSQLId(namespace.toSeq)),
dialect) {
dialect,
description = s"Failed namespace exists: ${namespace.mkString}") {
JdbcUtils.schemaExists(conn, options, db)
}
}
Expand All @@ -226,7 +232,8 @@ class JDBCTableCatalog extends TableCatalog
JdbcUtils.classifyException(
errorClass = "FAILED_JDBC.LIST_NAMESPACES",
messageParameters = Map("url" -> options.getRedactUrl()),
dialect) {
dialect,
description = s"Failed list namespaces") {
JdbcUtils.listSchemas(conn, options)
}
}
Expand Down Expand Up @@ -279,7 +286,8 @@ class JDBCTableCatalog extends TableCatalog
messageParameters = Map(
"url" -> options.getRedactUrl(),
"namespace" -> toSQLId(db)),
dialect) {
dialect,
description = s"Failed create name space: $db") {
JdbcUtils.createSchema(conn, options, db, comment)
}
}
Expand All @@ -303,7 +311,8 @@ class JDBCTableCatalog extends TableCatalog
messageParameters = Map(
"url" -> options.getRedactUrl(),
"namespace" -> toSQLId(db)),
dialect) {
dialect,
description = s"Failed create comment on name space: $db") {
JdbcUtils.alterSchemaComment(conn, options, db, set.value)
}
}
Expand All @@ -319,7 +328,8 @@ class JDBCTableCatalog extends TableCatalog
messageParameters = Map(
"url" -> options.getRedactUrl(),
"namespace" -> toSQLId(db)),
dialect) {
dialect,
description = s"Failed remove comment on name space: $db") {
JdbcUtils.removeSchemaComment(conn, options, db)
}
}
Expand All @@ -346,7 +356,8 @@ class JDBCTableCatalog extends TableCatalog
messageParameters = Map(
"url" -> options.getRedactUrl(),
"namespace" -> toSQLId(db)),
dialect) {
dialect,
description = s"Failed drop name space: $db") {
JdbcUtils.dropSchema(conn, options, db, cascade)
true
}
Expand Down
16 changes: 5 additions & 11 deletions sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
Original file line number Diff line number Diff line change
Expand Up @@ -144,22 +144,16 @@ private object DB2Dialect extends JdbcDialect {
s"COMMENT ON SCHEMA ${quoteIdentifier(schema)} IS ''"
}

override def classifyException(
e: Throwable,
errorClass: String,
messageParameters: Map[String, String]): AnalysisException = {
override def classifyException(message: String, e: Throwable): AnalysisException = {
e match {
case sqlException: SQLException =>
sqlException.getSQLState match {
// https://www.ibm.com/docs/en/db2/11.5?topic=messages-sqlstate
case "42893" =>
throw NonEmptyNamespaceException(
namespace = messageParameters.get("namespace").toArray,
details = sqlException.getMessage,
cause = Some(e))
case _ => super.classifyException(e, errorClass, messageParameters)
case "42893" => throw NonEmptyNamespaceException(
namespace = Array.empty, details = message, cause = Some(e))
case _ => super.classifyException(message, e)
}
case _ => super.classifyException(e, errorClass, messageParameters)
case _ => super.classifyException(message, e)
}
}

Expand Down
33 changes: 17 additions & 16 deletions sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import scala.util.control.NonFatal
import org.apache.commons.lang3.StringUtils

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.util.quoteNameParts
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.catalog.index.TableIndex
Expand Down Expand Up @@ -194,10 +195,7 @@ private[sql] object H2Dialect extends JdbcDialect {
(ident.namespace() :+ indexName).map(quoteIdentifier).mkString(".")
}

override def classifyException(
e: Throwable,
errorClass: String,
messageParameters: Map[String, String]): AnalysisException = {
override def classifyException(message: String, e: Throwable): AnalysisException = {
e match {
case exception: SQLException =>
// Error codes are from https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html
Expand All @@ -208,16 +206,15 @@ private[sql] object H2Dialect extends JdbcDialect {
val regex = """"((?:[^"\\]|\\[\\"ntbrf])+)"""".r
val name = regex.findFirstMatchIn(e.getMessage).get.group(1)
val quotedName = org.apache.spark.sql.catalyst.util.quoteIdentifier(name)
throw new TableAlreadyExistsException(
errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS",
throw new TableAlreadyExistsException(errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS",
messageParameters = Map("relationName" -> quotedName),
cause = Some(e))
// TABLE_OR_VIEW_NOT_FOUND_1
case 42102 =>
val relationName = messageParameters.getOrElse("tableName", "")
val quotedName = quoteNameParts(UnresolvedAttribute.parseAttributeName(message))
throw new NoSuchTableException(
errorClass = "TABLE_OR_VIEW_NOT_FOUND",
messageParameters = Map("relationName" -> relationName),
messageParameters = Map("relationName" -> quotedName),
cause = Some(e))
// SCHEMA_NOT_FOUND_1
case 90079 =>
Expand All @@ -227,21 +224,25 @@ private[sql] object H2Dialect extends JdbcDialect {
throw new NoSuchNamespaceException(errorClass = "SCHEMA_NOT_FOUND",
messageParameters = Map("schemaName" -> quotedName))
// INDEX_ALREADY_EXISTS_1
case 42111 if errorClass == "FAILED_JDBC.CREATE_INDEX" =>
val indexName = messageParameters("indexName")
val tableName = messageParameters("tableName")
case 42111 =>
// The message is: Failed to create index indexName in tableName
val regex = "(?s)Failed to create index (.*) in (.*)".r
val indexName = regex.findFirstMatchIn(message).get.group(1)
val tableName = regex.findFirstMatchIn(message).get.group(2)
throw new IndexAlreadyExistsException(
indexName = indexName, tableName = tableName, cause = Some(e))
// INDEX_NOT_FOUND_1
case 42112 if errorClass == "FAILED_JDBC.DROP_INDEX" =>
val indexName = messageParameters("indexName")
val tableName = messageParameters("tableName")
case 42112 =>
// The message is: Failed to drop index indexName in tableName
val regex = "(?s)Failed to drop index (.*) in (.*)".r
val indexName = regex.findFirstMatchIn(message).get.group(1)
val tableName = regex.findFirstMatchIn(message).get.group(2)
throw new NoSuchIndexException(indexName, tableName, cause = Some(e))
case _ => // do nothing
}
case _ => // do nothing
}
super.classifyException(e, errorClass, messageParameters)
super.classifyException(message, e)
}

override def compileExpression(expr: Expression): Option[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,13 +633,31 @@ abstract class JdbcDialect extends Serializable with Logging {
* @param e The dialect specific exception.
* @param errorClass The error class assigned in the case of an unclassified `e`
* @param messageParameters The message parameters of `errorClass`
* @param description The error description
* @return `AnalysisException` or its sub-class.
*/
def classifyException(
e: Throwable,
errorClass: String,
messageParameters: Map[String, String]): AnalysisException = {
new AnalysisException(errorClass, messageParameters, cause = Some(e))
messageParameters: Map[String, String],
description: String): AnalysisException = {
classifyException(description, e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of passing description around, shall we generate the description here using the error class name and the tableName in the parameters?

Copy link
Member Author

@MaxGekk MaxGekk Jan 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be not fully compatible with existing JDBC dialects because values in messageParameters has been preprocessed already like quoting, and a JDBC dialect could use regexp for parsing parameters from message/description. Theoretically, it can break the regexps.

And if we pass the quoting values to an JDBC dialect, it can throw some Spark exception which performs quoting inside like PostgresDialect throws NonEmptyNamespaceException. The last one does quoting inside its constructor. Apparently, we will get double quoting values which is a bug already.

}

/**
* Gets a dialect exception, classifies it and wraps it by `AnalysisException`.
* @param message The error message to be placed to the returned exception.
* @param e The dialect specific exception.
* @return `AnalysisException` or its sub-class.
*/
@deprecated("Please override the classifyException method with an error class", "4.0.0")
def classifyException(message: String, e: Throwable): AnalysisException = {
new AnalysisException(
errorClass = "FAILED_JDBC.UNCLASSIFIED",
messageParameters = Map(
"url" -> "jdbc:",
"message" -> message),
cause = Some(e))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,21 +190,15 @@ private object MsSqlServerDialect extends JdbcDialect {
if (limit > 0) s"TOP ($limit)" else ""
}

override def classifyException(
e: Throwable,
errorClass: String,
messageParameters: Map[String, String]): AnalysisException = {
override def classifyException(message: String, e: Throwable): AnalysisException = {
e match {
case sqlException: SQLException =>
sqlException.getErrorCode match {
case 3729 =>
throw NonEmptyNamespaceException(
namespace = messageParameters.get("namespace").toArray,
details = sqlException.getMessage,
cause = Some(e))
case _ => super.classifyException(e, errorClass, messageParameters)
case 3729 => throw NonEmptyNamespaceException(
namespace = Array.empty, details = message, cause = Some(e))
case _ => super.classifyException(message, e)
}
case _ => super.classifyException(e, errorClass, messageParameters)
case _ => super.classifyException(message, e)
}
}

Expand Down
Loading