Skip to content

[SPARK-46727][SQL] Port classifyException() in JDBC dialects on error classes #44739

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
sql(s"CREATE index i1 ON $catalogName.new_table (col1)")
},
errorClass = "INDEX_ALREADY_EXISTS",
parameters = Map("indexName" -> "i1", "tableName" -> "new_table")
parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`")
)

sql(s"DROP index i1 ON $catalogName.new_table")
Expand All @@ -304,7 +304,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
sql(s"DROP index i1 ON $catalogName.new_table")
},
errorClass = "INDEX_NOT_FOUND",
parameters = Map("indexName" -> "i1", "tableName" -> "new_table")
parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`")
)
}
}
Expand Down
17 changes: 12 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,23 @@ private object DB2Dialect extends JdbcDialect {
s"COMMENT ON SCHEMA ${quoteIdentifier(schema)} IS ''"
}

override def classifyException(message: String, e: Throwable): AnalysisException = {
override def classifyException(
e: Throwable,
errorClass: String,
messageParameters: Map[String, String],
description: String): AnalysisException = {
e match {
case sqlException: SQLException =>
sqlException.getSQLState match {
// https://www.ibm.com/docs/en/db2/11.5?topic=messages-sqlstate
case "42893" => throw NonEmptyNamespaceException(
namespace = Array.empty, details = message, cause = Some(e))
case _ => super.classifyException(message, e)
case "42893" =>
throw NonEmptyNamespaceException(
namespace = messageParameters.get("namespace").toArray,
details = sqlException.getMessage,
cause = Some(e))
case _ => super.classifyException(e, errorClass, messageParameters, description)
}
case _ => super.classifyException(message, e)
case _ => super.classifyException(e, errorClass, messageParameters, description)
}
}

Expand Down
34 changes: 17 additions & 17 deletions sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ import scala.util.control.NonFatal
import org.apache.commons.lang3.StringUtils

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.util.quoteNameParts
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.catalog.index.TableIndex
Expand Down Expand Up @@ -195,7 +194,11 @@ private[sql] object H2Dialect extends JdbcDialect {
(ident.namespace() :+ indexName).map(quoteIdentifier).mkString(".")
}

override def classifyException(message: String, e: Throwable): AnalysisException = {
override def classifyException(
e: Throwable,
errorClass: String,
messageParameters: Map[String, String],
description: String): AnalysisException = {
e match {
case exception: SQLException =>
// Error codes are from https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html
Expand All @@ -206,15 +209,16 @@ private[sql] object H2Dialect extends JdbcDialect {
val regex = """"((?:[^"\\]|\\[\\"ntbrf])+)"""".r
val name = regex.findFirstMatchIn(e.getMessage).get.group(1)
val quotedName = org.apache.spark.sql.catalyst.util.quoteIdentifier(name)
throw new TableAlreadyExistsException(errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS",
throw new TableAlreadyExistsException(
errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS",
messageParameters = Map("relationName" -> quotedName),
cause = Some(e))
// TABLE_OR_VIEW_NOT_FOUND_1
case 42102 =>
val quotedName = quoteNameParts(UnresolvedAttribute.parseAttributeName(message))
val relationName = messageParameters.getOrElse("tableName", "")
throw new NoSuchTableException(
errorClass = "TABLE_OR_VIEW_NOT_FOUND",
messageParameters = Map("relationName" -> quotedName),
messageParameters = Map("relationName" -> relationName),
cause = Some(e))
// SCHEMA_NOT_FOUND_1
case 90079 =>
Expand All @@ -224,25 +228,21 @@ private[sql] object H2Dialect extends JdbcDialect {
throw new NoSuchNamespaceException(errorClass = "SCHEMA_NOT_FOUND",
messageParameters = Map("schemaName" -> quotedName))
// INDEX_ALREADY_EXISTS_1
case 42111 =>
// The message is: Failed to create index indexName in tableName
val regex = "(?s)Failed to create index (.*) in (.*)".r
val indexName = regex.findFirstMatchIn(message).get.group(1)
val tableName = regex.findFirstMatchIn(message).get.group(2)
case 42111 if errorClass == "FAILED_JDBC.CREATE_INDEX" =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there only need error code?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is some kind of a guard for accessing messageParameters. Previous implementation implicitly assumes that the error happens on creating an index, see the regexp. Here I just made the assumption explicit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM.

val indexName = messageParameters("indexName")
val tableName = messageParameters("tableName")
throw new IndexAlreadyExistsException(
indexName = indexName, tableName = tableName, cause = Some(e))
// INDEX_NOT_FOUND_1
case 42112 =>
// The message is: Failed to drop index indexName in tableName
val regex = "(?s)Failed to drop index (.*) in (.*)".r
val indexName = regex.findFirstMatchIn(message).get.group(1)
val tableName = regex.findFirstMatchIn(message).get.group(2)
case 42112 if errorClass == "FAILED_JDBC.DROP_INDEX" =>
val indexName = messageParameters("indexName")
val tableName = messageParameters("tableName")
throw new NoSuchIndexException(indexName, tableName, cause = Some(e))
case _ => // do nothing
}
case _ => // do nothing
}
super.classifyException(message, e)
super.classifyException(e, errorClass, messageParameters, description)
}

override def compileExpression(expr: Expression): Option[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,22 @@ private object MsSqlServerDialect extends JdbcDialect {
if (limit > 0) s"TOP ($limit)" else ""
}

override def classifyException(message: String, e: Throwable): AnalysisException = {
override def classifyException(
e: Throwable,
errorClass: String,
messageParameters: Map[String, String],
description: String): AnalysisException = {
e match {
case sqlException: SQLException =>
sqlException.getErrorCode match {
case 3729 => throw NonEmptyNamespaceException(
namespace = Array.empty, details = message, cause = Some(e))
case _ => super.classifyException(message, e)
case 3729 =>
throw NonEmptyNamespaceException(
namespace = messageParameters.get("namespace").toArray,
details = sqlException.getMessage,
cause = Some(e))
case _ => super.classifyException(e, errorClass, messageParameters, description)
}
case _ => super.classifyException(message, e)
case _ => super.classifyException(e, errorClass, messageParameters, description)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,28 +270,27 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
indexMap.values.toArray
}

override def classifyException(message: String, e: Throwable): AnalysisException = {
override def classifyException(
e: Throwable,
errorClass: String,
messageParameters: Map[String, String],
description: String): AnalysisException = {
e match {
case sqlException: SQLException =>
sqlException.getErrorCode match {
// ER_DUP_KEYNAME
case 1061 =>
// The message is: Failed to create index indexName in tableName
val regex = "(?s)Failed to create index (.*) in (.*)".r
val indexName = regex.findFirstMatchIn(message).get.group(1)
val tableName = regex.findFirstMatchIn(message).get.group(2)
throw new IndexAlreadyExistsException(
indexName = indexName, tableName = tableName, cause = Some(e))
case 1091 =>
// The message is: Failed to drop index indexName in tableName
val regex = "(?s)Failed to drop index (.*) in (.*)".r
val indexName = regex.findFirstMatchIn(message).get.group(1)
val tableName = regex.findFirstMatchIn(message).get.group(2)
case 1061 if errorClass == "FAILED_JDBC.CREATE_INDEX" =>
val indexName = messageParameters("indexName")
val tableName = messageParameters("tableName")
throw new IndexAlreadyExistsException(indexName, tableName, cause = Some(e))
case 1091 if errorClass == "FAILED_JDBC.DROP_INDEX" =>
val indexName = messageParameters("indexName")
val tableName = messageParameters("tableName")
throw new NoSuchIndexException(indexName, tableName, cause = Some(e))
case _ => super.classifyException(message, e)
case _ => super.classifyException(e, errorClass, messageParameters, description)
}
case unsupported: UnsupportedOperationException => throw unsupported
case _ => super.classifyException(message, e)
case _ => super.classifyException(e, errorClass, messageParameters, description)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,42 +225,48 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper {
s"DROP INDEX ${quoteIdentifier(indexName)}"
}

override def classifyException(message: String, e: Throwable): AnalysisException = {
// Message pattern defined by postgres specification
private final val pgAlreadyExistsRegex = """(?:.*)relation "(.*)" already exists""".r

override def classifyException(
e: Throwable,
errorClass: String,
messageParameters: Map[String, String],
description: String): AnalysisException = {
e match {
case sqlException: SQLException =>
sqlException.getSQLState match {
// https://www.postgresql.org/docs/14/errcodes-appendix.html
case "42P07" =>
// Message patterns defined at caller sides of spark
val indexRegex = "(?s)Failed to create index (.*) in (.*)".r
val renameRegex = "(?s)Failed table renaming from (.*) to (.*)".r
// Message pattern defined by postgres specification
val pgRegex = """(?:.*)relation "(.*)" already exists""".r

message match {
case indexRegex(index, table) =>
throw new IndexAlreadyExistsException(
indexName = index, tableName = table, cause = Some(e))
case renameRegex(_, newTable) =>
throw QueryCompilationErrors.tableAlreadyExistsError(newTable)
case _ if pgRegex.findFirstMatchIn(sqlException.getMessage).nonEmpty =>
val tableName = pgRegex.findFirstMatchIn(sqlException.getMessage).get.group(1)
throw QueryCompilationErrors.tableAlreadyExistsError(tableName)
case _ => super.classifyException(message, e)
if (errorClass == "FAILED_JDBC.CREATE_INDEX") {
throw new IndexAlreadyExistsException(
indexName = messageParameters("indexName"),
tableName = messageParameters("tableName"),
cause = Some(e))
} else if (errorClass == "FAILED_JDBC.RENAME_TABLE") {
val newTable = messageParameters("newName")
throw QueryCompilationErrors.tableAlreadyExistsError(newTable)
} else {
val tblRegexp = pgAlreadyExistsRegex.findFirstMatchIn(sqlException.getMessage)
if (tblRegexp.nonEmpty) {
throw QueryCompilationErrors.tableAlreadyExistsError(tblRegexp.get.group(1))
} else {
super.classifyException(e, errorClass, messageParameters, description)
}
}
case "42704" =>
// The message is: Failed to drop index indexName in tableName
val regex = "(?s)Failed to drop index (.*) in (.*)".r
val indexName = regex.findFirstMatchIn(message).get.group(1)
val tableName = regex.findFirstMatchIn(message).get.group(2)
case "42704" if errorClass == "FAILED_JDBC.DROP_INDEX" =>
val indexName = messageParameters("indexName")
val tableName = messageParameters("tableName")
throw new NoSuchIndexException(indexName, tableName, cause = Some(e))
case "2BP01" =>
throw NonEmptyNamespaceException(
namespace = Array.empty, details = message, cause = Some(e))
case _ => super.classifyException(message, e)
namespace = messageParameters.get("namespace").toArray,
details = sqlException.getMessage,
cause = Some(e))
case _ => super.classifyException(e, errorClass, messageParameters, description)
}
case unsupported: UnsupportedOperationException => throw unsupported
case _ => super.classifyException(message, e)
case _ => super.classifyException(e, errorClass, messageParameters, description)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2980,8 +2980,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
},
errorClass = "INDEX_ALREADY_EXISTS",
parameters = Map(
"indexName" -> "people_index",
"tableName" -> "test.people"
"indexName" -> "`people_index`",
"tableName" -> "`test`.`people`"
)
)
assert(jdbcTable.indexExists("people_index"))
Expand All @@ -2997,7 +2997,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
sql(s"DROP INDEX people_index ON TABLE h2.test.people")
},
errorClass = "INDEX_NOT_FOUND",
parameters = Map("indexName" -> "people_index", "tableName" -> "test.people")
parameters = Map("indexName" -> "`people_index`", "tableName" -> "`test`.`people`")
)
assert(jdbcTable.indexExists("people_index") == false)
val indexes3 = jdbcTable.listIndexes()
Expand Down