Skip to content

Commit 2957bc9

Browse files
committed
[SPARK-46608][SQL] Restore backward compatibility of JdbcDialect.classifyException
### What changes were proposed in this pull request? In the PR, I propose to restore `classifyException()` of `JdbcDialect` before the commit 14a933b, and extends `classifyException()` with the error class parameter by `description`: ```scala def classifyException( e: Throwable, errorClass: String, messageParameters: Map[String, String], description: String): AnalysisException ``` The `description` parameter has the same meaning as `message` in the old version of `classifyException()` which is deprecated. Also old implementation of `classifyException()` has been restored in JDBC dialects: MySQL, PostgreSQL and so on. ### Why are the changes needed? To restore compatibility with existing JDBC dialects. ### Does this PR introduce _any_ user-facing change? No, this PR restores the behaviour prior #44358. ### How was this patch tested? By running the affected test suite: ``` $ build/sbt "core/testOnly *SparkThrowableSuite" ``` and modified test suite: ``` $ build/sbt "test:testOnly *JDBCV2Suite" $ build/sbt "test:testOnly *JDBCTableCatalogSuite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44449 from MaxGekk/restore-jdbc-classifyException. Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: Max Gekk <max.gekk@gmail.com>
1 parent 5a74e5a commit 2957bc9

File tree

14 files changed

+143
-122
lines changed

14 files changed

+143
-122
lines changed

common/utils/src/main/resources/error/error-classes.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1185,6 +1185,11 @@
11851185
"message" : [
11861186
"Check that the table <tableName> exists."
11871187
]
1188+
},
1189+
"UNCLASSIFIED" : {
1190+
"message" : [
1191+
"<message>"
1192+
]
11881193
}
11891194
},
11901195
"sqlState" : "HV000"

connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
224224
val e = intercept[AnalysisException] {
225225
sql(s"CREATE TABLE $catalogName.new_table (i INT) TBLPROPERTIES('a'='1')")
226226
}
227-
assert(e.getErrorClass == "FAILED_JDBC.CREATE_TABLE")
227+
assert(e.getErrorClass == "FAILED_JDBC.UNCLASSIFIED")
228228
testCreateTableWithProperty(s"$catalogName.new_table")
229229
}
230230
}
@@ -279,7 +279,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
279279
sql(s"CREATE index i1 ON $catalogName.new_table (col1)")
280280
},
281281
errorClass = "INDEX_ALREADY_EXISTS",
282-
parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`")
282+
parameters = Map("indexName" -> "i1", "tableName" -> "new_table")
283283
)
284284

285285
sql(s"DROP index i1 ON $catalogName.new_table")
@@ -304,7 +304,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
304304
sql(s"DROP index i1 ON $catalogName.new_table")
305305
},
306306
errorClass = "INDEX_NOT_FOUND",
307-
parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`")
307+
parameters = Map("indexName" -> "i1", "tableName" -> "new_table")
308308
)
309309
}
310310
}

docs/sql-error-conditions-failed-jdbc-error-class.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,8 @@ Rename the table `<oldName>` to `<newName>`.
7777

7878
Check that the table `<tableName>` exists.
7979

80+
## UNCLASSIFIED
81+
82+
`<message>`
83+
8084

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1183,12 +1183,14 @@ object JdbcUtils extends Logging with SQLConfHelper {
11831183
def classifyException[T](
11841184
errorClass: String,
11851185
messageParameters: Map[String, String],
1186-
dialect: JdbcDialect)(f: => T): T = {
1186+
dialect: JdbcDialect,
1187+
description: String)(f: => T): T = {
11871188
try {
11881189
f
11891190
} catch {
11901191
case e: SparkThrowable with Throwable => throw e
1191-
case e: Throwable => throw dialect.classifyException(e, errorClass, messageParameters)
1192+
case e: Throwable =>
1193+
throw dialect.classifyException(e, errorClass, messageParameters, description)
11921194
}
11931195
}
11941196

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt
6969
"url" -> jdbcOptions.getRedactUrl(),
7070
"indexName" -> toSQLId(indexName),
7171
"tableName" -> toSQLId(name)),
72-
dialect = JdbcDialects.get(jdbcOptions.url)) {
72+
dialect = JdbcDialects.get(jdbcOptions.url),
73+
description = s"Failed to create index $indexName in ${name()}") {
7374
JdbcUtils.createIndex(
7475
conn, indexName, ident, columns, columnsProperties, properties, jdbcOptions)
7576
}
@@ -90,7 +91,8 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt
9091
"url" -> jdbcOptions.getRedactUrl(),
9192
"indexName" -> toSQLId(indexName),
9293
"tableName" -> toSQLId(name)),
93-
dialect = JdbcDialects.get(jdbcOptions.url)) {
94+
dialect = JdbcDialects.get(jdbcOptions.url),
95+
description = s"Failed to drop index $indexName in ${name()}") {
9496
JdbcUtils.dropIndex(conn, indexName, ident, jdbcOptions)
9597
}
9698
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ class JDBCTableCatalog extends TableCatalog
7373
messageParameters = Map(
7474
"url" -> options.getRedactUrl(),
7575
"namespace" -> toSQLId(namespace.toSeq)),
76-
dialect) {
76+
dialect,
77+
description = s"Failed get tables from: ${namespace.mkString(".")}") {
7778
conn.getMetaData.getTables(null, schemaPattern, "%", Array("TABLE"))
7879
}
7980
new Iterator[Identifier] {
@@ -92,7 +93,8 @@ class JDBCTableCatalog extends TableCatalog
9293
messageParameters = Map(
9394
"url" -> options.getRedactUrl(),
9495
"tableName" -> toSQLId(ident)),
95-
dialect) {
96+
dialect,
97+
description = s"Failed table existence check: $ident") {
9698
JdbcUtils.withConnection(options)(JdbcUtils.tableExists(_, writeOptions))
9799
}
98100
}
@@ -118,7 +120,8 @@ class JDBCTableCatalog extends TableCatalog
118120
"url" -> options.getRedactUrl(),
119121
"oldName" -> toSQLId(oldIdent),
120122
"newName" -> toSQLId(newIdent)),
121-
dialect) {
123+
dialect,
124+
description = s"Failed table renaming from $oldIdent to $newIdent") {
122125
JdbcUtils.renameTable(conn, oldIdent, newIdent, options)
123126
}
124127
}
@@ -183,7 +186,8 @@ class JDBCTableCatalog extends TableCatalog
183186
messageParameters = Map(
184187
"url" -> options.getRedactUrl(),
185188
"tableName" -> toSQLId(ident)),
186-
dialect) {
189+
dialect,
190+
description = s"Failed table creation: $ident") {
187191
JdbcUtils.createTable(conn, getTableName(ident), schema, caseSensitive, writeOptions)
188192
}
189193
}
@@ -199,7 +203,8 @@ class JDBCTableCatalog extends TableCatalog
199203
messageParameters = Map(
200204
"url" -> options.getRedactUrl(),
201205
"tableName" -> toSQLId(ident)),
202-
dialect) {
206+
dialect,
207+
description = s"Failed table altering: $ident") {
203208
JdbcUtils.alterTable(conn, getTableName(ident), changes, options)
204209
}
205210
loadTable(ident)
@@ -214,7 +219,8 @@ class JDBCTableCatalog extends TableCatalog
214219
messageParameters = Map(
215220
"url" -> options.getRedactUrl(),
216221
"namespace" -> toSQLId(namespace.toSeq)),
217-
dialect) {
222+
dialect,
223+
description = s"Failed namespace exists: ${namespace.mkString}") {
218224
JdbcUtils.schemaExists(conn, options, db)
219225
}
220226
}
@@ -226,7 +232,8 @@ class JDBCTableCatalog extends TableCatalog
226232
JdbcUtils.classifyException(
227233
errorClass = "FAILED_JDBC.LIST_NAMESPACES",
228234
messageParameters = Map("url" -> options.getRedactUrl()),
229-
dialect) {
235+
dialect,
236+
description = s"Failed list namespaces") {
230237
JdbcUtils.listSchemas(conn, options)
231238
}
232239
}
@@ -279,7 +286,8 @@ class JDBCTableCatalog extends TableCatalog
279286
messageParameters = Map(
280287
"url" -> options.getRedactUrl(),
281288
"namespace" -> toSQLId(db)),
282-
dialect) {
289+
dialect,
290+
description = s"Failed create name space: $db") {
283291
JdbcUtils.createSchema(conn, options, db, comment)
284292
}
285293
}
@@ -303,7 +311,8 @@ class JDBCTableCatalog extends TableCatalog
303311
messageParameters = Map(
304312
"url" -> options.getRedactUrl(),
305313
"namespace" -> toSQLId(db)),
306-
dialect) {
314+
dialect,
315+
description = s"Failed create comment on name space: $db") {
307316
JdbcUtils.alterSchemaComment(conn, options, db, set.value)
308317
}
309318
}
@@ -319,7 +328,8 @@ class JDBCTableCatalog extends TableCatalog
319328
messageParameters = Map(
320329
"url" -> options.getRedactUrl(),
321330
"namespace" -> toSQLId(db)),
322-
dialect) {
331+
dialect,
332+
description = s"Failed remove comment on name space: $db") {
323333
JdbcUtils.removeSchemaComment(conn, options, db)
324334
}
325335
}
@@ -346,7 +356,8 @@ class JDBCTableCatalog extends TableCatalog
346356
messageParameters = Map(
347357
"url" -> options.getRedactUrl(),
348358
"namespace" -> toSQLId(db)),
349-
dialect) {
359+
dialect,
360+
description = s"Failed drop name space: $db") {
350361
JdbcUtils.dropSchema(conn, options, db, cascade)
351362
true
352363
}

sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -144,22 +144,16 @@ private object DB2Dialect extends JdbcDialect {
144144
s"COMMENT ON SCHEMA ${quoteIdentifier(schema)} IS ''"
145145
}
146146

147-
override def classifyException(
148-
e: Throwable,
149-
errorClass: String,
150-
messageParameters: Map[String, String]): AnalysisException = {
147+
override def classifyException(message: String, e: Throwable): AnalysisException = {
151148
e match {
152149
case sqlException: SQLException =>
153150
sqlException.getSQLState match {
154151
// https://www.ibm.com/docs/en/db2/11.5?topic=messages-sqlstate
155-
case "42893" =>
156-
throw NonEmptyNamespaceException(
157-
namespace = messageParameters.get("namespace").toArray,
158-
details = sqlException.getMessage,
159-
cause = Some(e))
160-
case _ => super.classifyException(e, errorClass, messageParameters)
152+
case "42893" => throw NonEmptyNamespaceException(
153+
namespace = Array.empty, details = message, cause = Some(e))
154+
case _ => super.classifyException(message, e)
161155
}
162-
case _ => super.classifyException(e, errorClass, messageParameters)
156+
case _ => super.classifyException(message, e)
163157
}
164158
}
165159

sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ import scala.util.control.NonFatal
2828
import org.apache.commons.lang3.StringUtils
2929

3030
import org.apache.spark.sql.AnalysisException
31-
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
31+
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException, UnresolvedAttribute}
32+
import org.apache.spark.sql.catalyst.util.quoteNameParts
3233
import org.apache.spark.sql.connector.catalog.Identifier
3334
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
3435
import org.apache.spark.sql.connector.catalog.index.TableIndex
@@ -194,10 +195,7 @@ private[sql] object H2Dialect extends JdbcDialect {
194195
(ident.namespace() :+ indexName).map(quoteIdentifier).mkString(".")
195196
}
196197

197-
override def classifyException(
198-
e: Throwable,
199-
errorClass: String,
200-
messageParameters: Map[String, String]): AnalysisException = {
198+
override def classifyException(message: String, e: Throwable): AnalysisException = {
201199
e match {
202200
case exception: SQLException =>
203201
// Error codes are from https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html
@@ -208,16 +206,15 @@ private[sql] object H2Dialect extends JdbcDialect {
208206
val regex = """"((?:[^"\\]|\\[\\"ntbrf])+)"""".r
209207
val name = regex.findFirstMatchIn(e.getMessage).get.group(1)
210208
val quotedName = org.apache.spark.sql.catalyst.util.quoteIdentifier(name)
211-
throw new TableAlreadyExistsException(
212-
errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS",
209+
throw new TableAlreadyExistsException(errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS",
213210
messageParameters = Map("relationName" -> quotedName),
214211
cause = Some(e))
215212
// TABLE_OR_VIEW_NOT_FOUND_1
216213
case 42102 =>
217-
val relationName = messageParameters.getOrElse("tableName", "")
214+
val quotedName = quoteNameParts(UnresolvedAttribute.parseAttributeName(message))
218215
throw new NoSuchTableException(
219216
errorClass = "TABLE_OR_VIEW_NOT_FOUND",
220-
messageParameters = Map("relationName" -> relationName),
217+
messageParameters = Map("relationName" -> quotedName),
221218
cause = Some(e))
222219
// SCHEMA_NOT_FOUND_1
223220
case 90079 =>
@@ -227,21 +224,25 @@ private[sql] object H2Dialect extends JdbcDialect {
227224
throw new NoSuchNamespaceException(errorClass = "SCHEMA_NOT_FOUND",
228225
messageParameters = Map("schemaName" -> quotedName))
229226
// INDEX_ALREADY_EXISTS_1
230-
case 42111 if errorClass == "FAILED_JDBC.CREATE_INDEX" =>
231-
val indexName = messageParameters("indexName")
232-
val tableName = messageParameters("tableName")
227+
case 42111 =>
228+
// The message is: Failed to create index indexName in tableName
229+
val regex = "(?s)Failed to create index (.*) in (.*)".r
230+
val indexName = regex.findFirstMatchIn(message).get.group(1)
231+
val tableName = regex.findFirstMatchIn(message).get.group(2)
233232
throw new IndexAlreadyExistsException(
234233
indexName = indexName, tableName = tableName, cause = Some(e))
235234
// INDEX_NOT_FOUND_1
236-
case 42112 if errorClass == "FAILED_JDBC.DROP_INDEX" =>
237-
val indexName = messageParameters("indexName")
238-
val tableName = messageParameters("tableName")
235+
case 42112 =>
236+
// The message is: Failed to drop index indexName in tableName
237+
val regex = "(?s)Failed to drop index (.*) in (.*)".r
238+
val indexName = regex.findFirstMatchIn(message).get.group(1)
239+
val tableName = regex.findFirstMatchIn(message).get.group(2)
239240
throw new NoSuchIndexException(indexName, tableName, cause = Some(e))
240241
case _ => // do nothing
241242
}
242243
case _ => // do nothing
243244
}
244-
super.classifyException(e, errorClass, messageParameters)
245+
super.classifyException(message, e)
245246
}
246247

247248
override def compileExpression(expr: Expression): Option[String] = {

sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -633,13 +633,31 @@ abstract class JdbcDialect extends Serializable with Logging {
633633
* @param e The dialect specific exception.
634634
* @param errorClass The error class assigned in the case of an unclassified `e`
635635
* @param messageParameters The message parameters of `errorClass`
636+
* @param description The error description
636637
* @return `AnalysisException` or its sub-class.
637638
*/
638639
def classifyException(
639640
e: Throwable,
640641
errorClass: String,
641-
messageParameters: Map[String, String]): AnalysisException = {
642-
new AnalysisException(errorClass, messageParameters, cause = Some(e))
642+
messageParameters: Map[String, String],
643+
description: String): AnalysisException = {
644+
classifyException(description, e)
645+
}
646+
647+
/**
648+
* Gets a dialect exception, classifies it and wraps it by `AnalysisException`.
649+
* @param message The error message to be placed to the returned exception.
650+
* @param e The dialect specific exception.
651+
* @return `AnalysisException` or its sub-class.
652+
*/
653+
@deprecated("Please override the classifyException method with an error class", "4.0.0")
654+
def classifyException(message: String, e: Throwable): AnalysisException = {
655+
new AnalysisException(
656+
errorClass = "FAILED_JDBC.UNCLASSIFIED",
657+
messageParameters = Map(
658+
"url" -> "jdbc:",
659+
"message" -> message),
660+
cause = Some(e))
643661
}
644662

645663
/**

sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -190,21 +190,15 @@ private object MsSqlServerDialect extends JdbcDialect {
190190
if (limit > 0) s"TOP ($limit)" else ""
191191
}
192192

193-
override def classifyException(
194-
e: Throwable,
195-
errorClass: String,
196-
messageParameters: Map[String, String]): AnalysisException = {
193+
override def classifyException(message: String, e: Throwable): AnalysisException = {
197194
e match {
198195
case sqlException: SQLException =>
199196
sqlException.getErrorCode match {
200-
case 3729 =>
201-
throw NonEmptyNamespaceException(
202-
namespace = messageParameters.get("namespace").toArray,
203-
details = sqlException.getMessage,
204-
cause = Some(e))
205-
case _ => super.classifyException(e, errorClass, messageParameters)
197+
case 3729 => throw NonEmptyNamespaceException(
198+
namespace = Array.empty, details = message, cause = Some(e))
199+
case _ => super.classifyException(message, e)
206200
}
207-
case _ => super.classifyException(e, errorClass, messageParameters)
201+
case _ => super.classifyException(message, e)
208202
}
209203
}
210204

0 commit comments

Comments
 (0)