Skip to content

Commit ebd2b78

Browse files
committed
[SPARK-46727][SQL] Port classifyException() in JDBC dialects on error classes
### What changes were proposed in this pull request? In the PR, I propose to port the existing `classifyException()` method which accepts a description to new one w/ an error class added by #44358. The modified JDBC dialects are: DB2, H2, Oracle, MS SQL Server, MySQL and PostgreSQL. ### Why are the changes needed? The old method `classifyException()` which accepts a `description` only has been deprecated already by ... ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By existing integration tests, and the modified test suite: ``` $ build/sbt "test:testOnly *JDBCV2Suite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44739 from MaxGekk/port-jdbc-classifyException. Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: Max Gekk <max.gekk@gmail.com>
1 parent 872fbc3 commit ebd2b78

File tree

7 files changed

+91
-72
lines changed

7 files changed

+91
-72
lines changed

connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
279279
sql(s"CREATE index i1 ON $catalogName.new_table (col1)")
280280
},
281281
errorClass = "INDEX_ALREADY_EXISTS",
282-
parameters = Map("indexName" -> "i1", "tableName" -> "new_table")
282+
parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`")
283283
)
284284

285285
sql(s"DROP index i1 ON $catalogName.new_table")
@@ -304,7 +304,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
304304
sql(s"DROP index i1 ON $catalogName.new_table")
305305
},
306306
errorClass = "INDEX_NOT_FOUND",
307-
parameters = Map("indexName" -> "i1", "tableName" -> "new_table")
307+
parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`")
308308
)
309309
}
310310
}

sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -144,16 +144,23 @@ private object DB2Dialect extends JdbcDialect {
144144
s"COMMENT ON SCHEMA ${quoteIdentifier(schema)} IS ''"
145145
}
146146

147-
override def classifyException(message: String, e: Throwable): AnalysisException = {
147+
override def classifyException(
148+
e: Throwable,
149+
errorClass: String,
150+
messageParameters: Map[String, String],
151+
description: String): AnalysisException = {
148152
e match {
149153
case sqlException: SQLException =>
150154
sqlException.getSQLState match {
151155
// https://www.ibm.com/docs/en/db2/11.5?topic=messages-sqlstate
152-
case "42893" => throw NonEmptyNamespaceException(
153-
namespace = Array.empty, details = message, cause = Some(e))
154-
case _ => super.classifyException(message, e)
156+
case "42893" =>
157+
throw NonEmptyNamespaceException(
158+
namespace = messageParameters.get("namespace").toArray,
159+
details = sqlException.getMessage,
160+
cause = Some(e))
161+
case _ => super.classifyException(e, errorClass, messageParameters, description)
155162
}
156-
case _ => super.classifyException(message, e)
163+
case _ => super.classifyException(e, errorClass, messageParameters, description)
157164
}
158165
}
159166

sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ import scala.util.control.NonFatal
2828
import org.apache.commons.lang3.StringUtils
2929

3030
import org.apache.spark.sql.AnalysisException
31-
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException, UnresolvedAttribute}
32-
import org.apache.spark.sql.catalyst.util.quoteNameParts
31+
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
3332
import org.apache.spark.sql.connector.catalog.Identifier
3433
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
3534
import org.apache.spark.sql.connector.catalog.index.TableIndex
@@ -195,7 +194,11 @@ private[sql] object H2Dialect extends JdbcDialect {
195194
(ident.namespace() :+ indexName).map(quoteIdentifier).mkString(".")
196195
}
197196

198-
override def classifyException(message: String, e: Throwable): AnalysisException = {
197+
override def classifyException(
198+
e: Throwable,
199+
errorClass: String,
200+
messageParameters: Map[String, String],
201+
description: String): AnalysisException = {
199202
e match {
200203
case exception: SQLException =>
201204
// Error codes are from https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html
@@ -206,15 +209,16 @@ private[sql] object H2Dialect extends JdbcDialect {
206209
val regex = """"((?:[^"\\]|\\[\\"ntbrf])+)"""".r
207210
val name = regex.findFirstMatchIn(e.getMessage).get.group(1)
208211
val quotedName = org.apache.spark.sql.catalyst.util.quoteIdentifier(name)
209-
throw new TableAlreadyExistsException(errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS",
212+
throw new TableAlreadyExistsException(
213+
errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS",
210214
messageParameters = Map("relationName" -> quotedName),
211215
cause = Some(e))
212216
// TABLE_OR_VIEW_NOT_FOUND_1
213217
case 42102 =>
214-
val quotedName = quoteNameParts(UnresolvedAttribute.parseAttributeName(message))
218+
val relationName = messageParameters.getOrElse("tableName", "")
215219
throw new NoSuchTableException(
216220
errorClass = "TABLE_OR_VIEW_NOT_FOUND",
217-
messageParameters = Map("relationName" -> quotedName),
221+
messageParameters = Map("relationName" -> relationName),
218222
cause = Some(e))
219223
// SCHEMA_NOT_FOUND_1
220224
case 90079 =>
@@ -224,25 +228,21 @@ private[sql] object H2Dialect extends JdbcDialect {
224228
throw new NoSuchNamespaceException(errorClass = "SCHEMA_NOT_FOUND",
225229
messageParameters = Map("schemaName" -> quotedName))
226230
// INDEX_ALREADY_EXISTS_1
227-
case 42111 =>
228-
// The message is: Failed to create index indexName in tableName
229-
val regex = "(?s)Failed to create index (.*) in (.*)".r
230-
val indexName = regex.findFirstMatchIn(message).get.group(1)
231-
val tableName = regex.findFirstMatchIn(message).get.group(2)
231+
case 42111 if errorClass == "FAILED_JDBC.CREATE_INDEX" =>
232+
val indexName = messageParameters("indexName")
233+
val tableName = messageParameters("tableName")
232234
throw new IndexAlreadyExistsException(
233235
indexName = indexName, tableName = tableName, cause = Some(e))
234236
// INDEX_NOT_FOUND_1
235-
case 42112 =>
236-
// The message is: Failed to drop index indexName in tableName
237-
val regex = "(?s)Failed to drop index (.*) in (.*)".r
238-
val indexName = regex.findFirstMatchIn(message).get.group(1)
239-
val tableName = regex.findFirstMatchIn(message).get.group(2)
237+
case 42112 if errorClass == "FAILED_JDBC.DROP_INDEX" =>
238+
val indexName = messageParameters("indexName")
239+
val tableName = messageParameters("tableName")
240240
throw new NoSuchIndexException(indexName, tableName, cause = Some(e))
241241
case _ => // do nothing
242242
}
243243
case _ => // do nothing
244244
}
245-
super.classifyException(message, e)
245+
super.classifyException(e, errorClass, messageParameters, description)
246246
}
247247

248248
override def compileExpression(expr: Expression): Option[String] = {

sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -190,15 +190,22 @@ private object MsSqlServerDialect extends JdbcDialect {
190190
if (limit > 0) s"TOP ($limit)" else ""
191191
}
192192

193-
override def classifyException(message: String, e: Throwable): AnalysisException = {
193+
override def classifyException(
194+
e: Throwable,
195+
errorClass: String,
196+
messageParameters: Map[String, String],
197+
description: String): AnalysisException = {
194198
e match {
195199
case sqlException: SQLException =>
196200
sqlException.getErrorCode match {
197-
case 3729 => throw NonEmptyNamespaceException(
198-
namespace = Array.empty, details = message, cause = Some(e))
199-
case _ => super.classifyException(message, e)
201+
case 3729 =>
202+
throw NonEmptyNamespaceException(
203+
namespace = messageParameters.get("namespace").toArray,
204+
details = sqlException.getMessage,
205+
cause = Some(e))
206+
case _ => super.classifyException(e, errorClass, messageParameters, description)
200207
}
201-
case _ => super.classifyException(message, e)
208+
case _ => super.classifyException(e, errorClass, messageParameters, description)
202209
}
203210
}
204211

sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -270,28 +270,27 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
270270
indexMap.values.toArray
271271
}
272272

273-
override def classifyException(message: String, e: Throwable): AnalysisException = {
273+
override def classifyException(
274+
e: Throwable,
275+
errorClass: String,
276+
messageParameters: Map[String, String],
277+
description: String): AnalysisException = {
274278
e match {
275279
case sqlException: SQLException =>
276280
sqlException.getErrorCode match {
277281
// ER_DUP_KEYNAME
278-
case 1061 =>
279-
// The message is: Failed to create index indexName in tableName
280-
val regex = "(?s)Failed to create index (.*) in (.*)".r
281-
val indexName = regex.findFirstMatchIn(message).get.group(1)
282-
val tableName = regex.findFirstMatchIn(message).get.group(2)
283-
throw new IndexAlreadyExistsException(
284-
indexName = indexName, tableName = tableName, cause = Some(e))
285-
case 1091 =>
286-
// The message is: Failed to drop index indexName in tableName
287-
val regex = "(?s)Failed to drop index (.*) in (.*)".r
288-
val indexName = regex.findFirstMatchIn(message).get.group(1)
289-
val tableName = regex.findFirstMatchIn(message).get.group(2)
282+
case 1061 if errorClass == "FAILED_JDBC.CREATE_INDEX" =>
283+
val indexName = messageParameters("indexName")
284+
val tableName = messageParameters("tableName")
285+
throw new IndexAlreadyExistsException(indexName, tableName, cause = Some(e))
286+
case 1091 if errorClass == "FAILED_JDBC.DROP_INDEX" =>
287+
val indexName = messageParameters("indexName")
288+
val tableName = messageParameters("tableName")
290289
throw new NoSuchIndexException(indexName, tableName, cause = Some(e))
291-
case _ => super.classifyException(message, e)
290+
case _ => super.classifyException(e, errorClass, messageParameters, description)
292291
}
293292
case unsupported: UnsupportedOperationException => throw unsupported
294-
case _ => super.classifyException(message, e)
293+
case _ => super.classifyException(e, errorClass, messageParameters, description)
295294
}
296295
}
297296

sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -225,42 +225,48 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper {
225225
s"DROP INDEX ${quoteIdentifier(indexName)}"
226226
}
227227

228-
override def classifyException(message: String, e: Throwable): AnalysisException = {
228+
// Message pattern defined by postgres specification
229+
private final val pgAlreadyExistsRegex = """(?:.*)relation "(.*)" already exists""".r
230+
231+
override def classifyException(
232+
e: Throwable,
233+
errorClass: String,
234+
messageParameters: Map[String, String],
235+
description: String): AnalysisException = {
229236
e match {
230237
case sqlException: SQLException =>
231238
sqlException.getSQLState match {
232239
// https://www.postgresql.org/docs/14/errcodes-appendix.html
233240
case "42P07" =>
234-
// Message patterns defined at caller sides of spark
235-
val indexRegex = "(?s)Failed to create index (.*) in (.*)".r
236-
val renameRegex = "(?s)Failed table renaming from (.*) to (.*)".r
237-
// Message pattern defined by postgres specification
238-
val pgRegex = """(?:.*)relation "(.*)" already exists""".r
239-
240-
message match {
241-
case indexRegex(index, table) =>
242-
throw new IndexAlreadyExistsException(
243-
indexName = index, tableName = table, cause = Some(e))
244-
case renameRegex(_, newTable) =>
245-
throw QueryCompilationErrors.tableAlreadyExistsError(newTable)
246-
case _ if pgRegex.findFirstMatchIn(sqlException.getMessage).nonEmpty =>
247-
val tableName = pgRegex.findFirstMatchIn(sqlException.getMessage).get.group(1)
248-
throw QueryCompilationErrors.tableAlreadyExistsError(tableName)
249-
case _ => super.classifyException(message, e)
241+
if (errorClass == "FAILED_JDBC.CREATE_INDEX") {
242+
throw new IndexAlreadyExistsException(
243+
indexName = messageParameters("indexName"),
244+
tableName = messageParameters("tableName"),
245+
cause = Some(e))
246+
} else if (errorClass == "FAILED_JDBC.RENAME_TABLE") {
247+
val newTable = messageParameters("newName")
248+
throw QueryCompilationErrors.tableAlreadyExistsError(newTable)
249+
} else {
250+
val tblRegexp = pgAlreadyExistsRegex.findFirstMatchIn(sqlException.getMessage)
251+
if (tblRegexp.nonEmpty) {
252+
throw QueryCompilationErrors.tableAlreadyExistsError(tblRegexp.get.group(1))
253+
} else {
254+
super.classifyException(e, errorClass, messageParameters, description)
255+
}
250256
}
251-
case "42704" =>
252-
// The message is: Failed to drop index indexName in tableName
253-
val regex = "(?s)Failed to drop index (.*) in (.*)".r
254-
val indexName = regex.findFirstMatchIn(message).get.group(1)
255-
val tableName = regex.findFirstMatchIn(message).get.group(2)
257+
case "42704" if errorClass == "FAILED_JDBC.DROP_INDEX" =>
258+
val indexName = messageParameters("indexName")
259+
val tableName = messageParameters("tableName")
256260
throw new NoSuchIndexException(indexName, tableName, cause = Some(e))
257261
case "2BP01" =>
258262
throw NonEmptyNamespaceException(
259-
namespace = Array.empty, details = message, cause = Some(e))
260-
case _ => super.classifyException(message, e)
263+
namespace = messageParameters.get("namespace").toArray,
264+
details = sqlException.getMessage,
265+
cause = Some(e))
266+
case _ => super.classifyException(e, errorClass, messageParameters, description)
261267
}
262268
case unsupported: UnsupportedOperationException => throw unsupported
263-
case _ => super.classifyException(message, e)
269+
case _ => super.classifyException(e, errorClass, messageParameters, description)
264270
}
265271
}
266272

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2980,8 +2980,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
29802980
},
29812981
errorClass = "INDEX_ALREADY_EXISTS",
29822982
parameters = Map(
2983-
"indexName" -> "people_index",
2984-
"tableName" -> "test.people"
2983+
"indexName" -> "`people_index`",
2984+
"tableName" -> "`test`.`people`"
29852985
)
29862986
)
29872987
assert(jdbcTable.indexExists("people_index"))
@@ -2997,7 +2997,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
29972997
sql(s"DROP INDEX people_index ON TABLE h2.test.people")
29982998
},
29992999
errorClass = "INDEX_NOT_FOUND",
3000-
parameters = Map("indexName" -> "people_index", "tableName" -> "test.people")
3000+
parameters = Map("indexName" -> "`people_index`", "tableName" -> "`test`.`people`")
30013001
)
30023002
assert(jdbcTable.indexExists("people_index") == false)
30033003
val indexes3 = jdbcTable.listIndexes()

0 commit comments

Comments
 (0)