Skip to content

[SPARK-46410][SQL] Assign error classes/subclasses to JdbcUtils.classifyException #44358

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 73 additions & 5 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,79 @@
],
"sqlState" : "38000"
},
"FAILED_JDBC" : {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FAILED_JDBC -> FAILED_JDBC_OPERATION

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too long, does _OPERATION bring any benefits?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just because FAILED_JDBC looks confused.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JDBC means Java Database Connectivity, the connectivity might fails at any time, ops or while establishing connections or while keeping it. For instance, this is real example:

org.postgresql.util.PSQLException: The connection attempt failed.
at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:331)
at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:223)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the connectivity is not changed if create index failed

"message" : [
"Failed JDBC <url> on the operation:"
],
"subClass" : {
"ALTER_TABLE" : {
"message" : [
"Alter the table <tableName>."
]
},
"CREATE_INDEX" : {
"message" : [
"Create the index <indexName> in the <tableName> table."
]
},
"CREATE_NAMESPACE" : {
"message" : [
"Create the namespace <namespace>."
]
},
"CREATE_NAMESPACE_COMMENT" : {
"message" : [
"Create a comment on the namespace: <namespace>."
]
},
"CREATE_TABLE" : {
"message" : [
"Create the table <tableName>."
]
},
"DROP_INDEX" : {
"message" : [
"Drop the index <indexName> in the <tableName> table."
]
},
"DROP_NAMESPACE" : {
"message" : [
"Drop the namespace <namespace>."
]
},
"GET_TABLES" : {
"message" : [
"Get tables from the namespace: <namespace>."
]
},
"LIST_NAMESPACES" : {
"message" : [
"List namespaces."
]
},
"NAMESPACE_EXISTS" : {
"message" : [
"Check that the namespace <namespace> exists."
]
},
"REMOVE_NAMESPACE_COMMENT" : {
"message" : [
"Remove a comment on the namespace: <namespace>."
]
},
"RENAME_TABLE" : {
"message" : [
"Rename the table <oldName> to <newName>."
]
},
"TABLE_EXISTS" : {
"message" : [
"Check that the table <tableName> exists."
]
}
},
"sqlState" : "HV000"
},
"FAILED_PARSE_STRUCT_TYPE" : {
"message" : [
"Failed parsing struct: <raw>."
Expand Down Expand Up @@ -6778,11 +6851,6 @@
"pivot is not supported on a streaming DataFrames/Datasets"
]
},
"_LEGACY_ERROR_TEMP_3064" : {
"message" : [
"<msg>"
]
},
"_LEGACY_ERROR_TEMP_3065" : {
"message" : [
"<clazz>: <msg>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,10 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu

test("CREATE TABLE with table property") {
withTable(s"$catalogName.new_table") {
val m = intercept[AnalysisException] {
val e = intercept[AnalysisException] {
sql(s"CREATE TABLE $catalogName.new_table (i INT) TBLPROPERTIES('a'='1')")
}.message
assert(m.contains("Failed table creation"))
}
assert(e.getErrorClass == "FAILED_JDBC.CREATE_TABLE")
testCreateTableWithProperty(s"$catalogName.new_table")
}
}
Expand Down Expand Up @@ -279,7 +279,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
sql(s"CREATE index i1 ON $catalogName.new_table (col1)")
},
errorClass = "INDEX_ALREADY_EXISTS",
parameters = Map("indexName" -> "i1", "tableName" -> "new_table")
parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`")
)

sql(s"DROP index i1 ON $catalogName.new_table")
Expand All @@ -304,7 +304,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
sql(s"DROP index i1 ON $catalogName.new_table")
},
errorClass = "INDEX_NOT_FOUND",
parameters = Map("indexName" -> "i1", "tableName" -> "new_table")
parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`")
)
}
}
Expand Down
80 changes: 80 additions & 0 deletions docs/sql-error-conditions-failed-jdbc-error-class.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
---
layout: global
title: FAILED_JDBC error class
displayTitle: FAILED_JDBC error class
license: |
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---

SQLSTATE: HV000

Failed JDBC `<url>` on the operation:

This error class has the following derived error classes:

## ALTER_TABLE

Alter the table `<tableName>`.

## CREATE_INDEX

Create the index `<indexName>` in the `<tableName>` table.

## CREATE_NAMESPACE

Create the namespace `<namespace>`.

## CREATE_NAMESPACE_COMMENT

Create a comment on the namespace: `<namespace>`.

## CREATE_TABLE

Create the table `<tableName>`.

## DROP_INDEX

Drop the index `<indexName>` in the `<tableName>` table.

## DROP_NAMESPACE

Drop the namespace `<namespace>`.

## GET_TABLES

Get tables from the namespace: `<namespace>`.

## LIST_NAMESPACES

List namespaces.

## NAMESPACE_EXISTS

Check that the namespace `<namespace>` exists.

## REMOVE_NAMESPACE_COMMENT

Remove a comment on the namespace: `<namespace>`.

## RENAME_TABLE

Rename the table `<oldName>` to `<newName>`.

## TABLE_EXISTS

Check that the table `<tableName>` exists.


8 changes: 8 additions & 0 deletions docs/sql-error-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,14 @@ User defined function (`<functionName>`: (`<signature>`) => `<result>`) failed d

Failed preparing of the function `<funcName>` for call. Please, double check function's arguments.

### [FAILED_JDBC](sql-error-conditions-failed-jdbc-error-class.html)

SQLSTATE: HV000

Failed JDBC `<url>` on the operation:

For more details see [FAILED_JDBC](sql-error-conditions-failed-jdbc-error-class.html)

### FAILED_PARSE_STRUCT_TYPE

[SQLSTATE: 22018](sql-error-conditions-sqlstates.html#class-22-data-exception)
Expand Down
3 changes: 3 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ object MimaExcludes {
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.storage.CacheId$"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.CacheId.apply"),

// SPARK-46410: Assign error classes/subclasses to JdbcUtils.classifyException
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.jdbc.JdbcDialect.classifyException"),

(problem: Problem) => problem match {
case MissingClassProblem(cls) => !cls.fullName.startsWith("org.sparkproject.jpmml") &&
!cls.fullName.startsWith("org.sparkproject.dmg.pmml")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,4 @@ case class NonEmptyNamespaceException(
"details" -> details)) {

def this(namespace: Array[String]) = this(namespace, "", None)

def this(details: String, cause: Option[Throwable]) =
this(Array.empty, details, cause)
}
Original file line number Diff line number Diff line change
Expand Up @@ -1180,12 +1180,15 @@ object JdbcUtils extends Logging with SQLConfHelper {
}
}

def classifyException[T](message: String, dialect: JdbcDialect)(f: => T): T = {
def classifyException[T](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a breaking change, isn't it? JDBCDialect is a public developer API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when was this API added? @beliefer do you know?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should keep two versions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And how will you combine exceptions from the both versions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are no nested exception here. We pass a default error class to classifyException and if a dialect cannot classify an exception from JDBC driver, it throws the default one otherwise it forms and throws another one. The exception from JDBC driver is added as cause.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if a dialect can classify an exception, we will lose the error class which is actually worse?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also enable MiMa for this? I believe this is being skipped because it's under execution package.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if a dialect can classify an exception, we will lose the error class which is actually worse?

@cloud-fan A dialect can override proposed error class and make our default error class more precise using the driver specific info. And yes, the original error class will be lost.

Can we also enable MiMa for this? I believe this is being skipped because it's under execution package.

@HyukjinKwon I made the modification because MiMa complained. Did you change MiMa to expect different behaviour?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think losing the original error class is bad. We added classifyException for getting more precise java exception type, but I don't think this is useful anymore as we have error classes. My suggestion is to deprecate classifyException, saying that Spark never calls it anymore.

errorClass: String,
messageParameters: Map[String, String],
dialect: JdbcDialect)(f: => T): T = {
try {
f
} catch {
case e: SparkThrowable with Throwable => throw e
case e: Throwable => throw dialect.classifyException(message, e)
case e: Throwable => throw dialect.classifyException(e, errorClass, messageParameters)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,18 @@ import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex}
import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.errors.DataTypeErrorsBase
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcOptionsInWrite, JdbcUtils}
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOptions)
extends Table with SupportsRead with SupportsWrite with SupportsIndex {
extends Table
with SupportsRead
with SupportsWrite
with SupportsIndex
with DataTypeErrorsBase {

override def name(): String = ident.toString

Expand All @@ -58,8 +63,13 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt
columnsProperties: util.Map[NamedReference, util.Map[String, String]],
properties: util.Map[String, String]): Unit = {
JdbcUtils.withConnection(jdbcOptions) { conn =>
JdbcUtils.classifyException(s"Failed to create index $indexName in ${name()}",
JdbcDialects.get(jdbcOptions.url)) {
JdbcUtils.classifyException(
errorClass = "FAILED_JDBC.CREATE_INDEX",
messageParameters = Map(
"url" -> jdbcOptions.url,
"indexName" -> toSQLId(indexName),
"tableName" -> toSQLId(name)),
dialect = JdbcDialects.get(jdbcOptions.url)) {
JdbcUtils.createIndex(
conn, indexName, ident, columns, columnsProperties, properties, jdbcOptions)
}
Expand All @@ -74,8 +84,13 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt

override def dropIndex(indexName: String): Unit = {
JdbcUtils.withConnection(jdbcOptions) { conn =>
JdbcUtils.classifyException(s"Failed to drop index $indexName in ${name()}",
JdbcDialects.get(jdbcOptions.url)) {
JdbcUtils.classifyException(
errorClass = "FAILED_JDBC.DROP_INDEX",
messageParameters = Map(
"url" -> jdbcOptions.url,
"indexName" -> toSQLId(indexName),
"tableName" -> toSQLId(name)),
dialect = JdbcDialects.get(jdbcOptions.url)) {
JdbcUtils.dropIndex(conn, indexName, ident, jdbcOptions)
}
}
Expand Down
Loading