Skip to content

[SPARK-33074][SQL] Classify dialect exceptions in JDBC v2 Table Catalog #29952

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class NamespaceAlreadyExistsException(message: String) extends AnalysisException
}
}

class TableAlreadyExistsException(message: String) extends AnalysisException(message) {
class TableAlreadyExistsException(message: String, cause: Option[Throwable] = None)
extends AnalysisException(message, cause = cause) {
def this(db: String, table: String) = {
this(s"Table or view '$table' already exists in database '$db'")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ import org.apache.spark.sql.types.StructType
class NoSuchDatabaseException(
val db: String) extends NoSuchNamespaceException(s"Database '$db' not found")

class NoSuchNamespaceException(message: String) extends AnalysisException(message) {
class NoSuchNamespaceException(message: String, cause: Option[Throwable] = None)
extends AnalysisException(message, cause = cause) {
def this(namespace: Array[String]) = {
this(s"Namespace '${namespace.quoted}' not found")
}
}

class NoSuchTableException(message: String) extends AnalysisException(message) {
class NoSuchTableException(message: String, cause: Option[Throwable] = None)
extends AnalysisException(message, cause = cause) {
def this(db: String, table: String) = {
this(s"Table or view '$table' not found in database '$db'")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ class JDBCTableCatalog extends TableCatalog with Logging {
checkNamespace(ident.namespace())
val writeOptions = new JdbcOptionsInWrite(
options.parameters + (JDBCOptions.JDBC_TABLE_NAME -> getTableName(ident)))
withConnection(JdbcUtils.tableExists(_, writeOptions))
classifyException(s"Failed table existence check: $ident") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we be consistent and always put classifyException inside withConnection?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that mixing two conceptually different things will make them consistent.

withConnection(JdbcUtils.tableExists(_, writeOptions))
}
}

override def dropTable(ident: Identifier): Boolean = {
Expand All @@ -88,7 +90,9 @@ class JDBCTableCatalog extends TableCatalog with Logging {
override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = {
checkNamespace(oldIdent.namespace())
withConnection { conn =>
JdbcUtils.renameTable(conn, getTableName(oldIdent), getTableName(newIdent), options)
classifyException(s"Failed table renaming from $oldIdent to $newIdent") {
JdbcUtils.renameTable(conn, getTableName(oldIdent), getTableName(newIdent), options)
}
}
}

Expand Down Expand Up @@ -123,7 +127,9 @@ class JDBCTableCatalog extends TableCatalog with Logging {
options.parameters + (JDBCOptions.JDBC_TABLE_NAME -> getTableName(ident)))
val caseSensitive = SQLConf.get.caseSensitiveAnalysis
withConnection { conn =>
JdbcUtils.createTable(conn, getTableName(ident), schema, caseSensitive, writeOptions)
classifyException(s"Failed table creation: $ident") {
JdbcUtils.createTable(conn, getTableName(ident), schema, caseSensitive, writeOptions)
}
}

JDBCTable(ident, schema, writeOptions)
Expand All @@ -132,7 +138,9 @@ class JDBCTableCatalog extends TableCatalog with Logging {
override def alterTable(ident: Identifier, changes: TableChange*): Table = {
checkNamespace(ident.namespace())
withConnection { conn =>
JdbcUtils.alterTable(conn, getTableName(ident), changes, options)
classifyException(s"Failed table altering: $ident") {
JdbcUtils.alterTable(conn, getTableName(ident), changes, options)
}
loadTable(ident)
}
}
Expand All @@ -156,4 +164,12 @@ class JDBCTableCatalog extends TableCatalog with Logging {
private def getTableName(ident: Identifier): String = {
(ident.namespace() :+ ident.name()).map(dialect.quoteIdentifier).mkString(".")
}

private def classifyException[T](message: String)(f: => T): T = {
try {
f
} catch {
case e: Throwable => throw dialect.classifyException(message, e)
}
}
}
48 changes: 48 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.jdbc

import java.sql.SQLException
import java.util.Locale

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}

private object H2Dialect extends JdbcDialect {
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:h2")

override def classifyException(message: String, e: Throwable): AnalysisException = {
if (e.isInstanceOf[SQLException]) {
// Error codes are from https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html
e.asInstanceOf[SQLException].getErrorCode match {
// TABLE_OR_VIEW_ALREADY_EXISTS_1
case 42101 =>
throw new TableAlreadyExistsException(message, cause = Some(e))
// TABLE_OR_VIEW_NOT_FOUND_1
case 42102 =>
throw new NoSuchTableException(message, cause = Some(e))
// SCHEMA_NOT_FOUND_1
case 90079 =>
throw new NoSuchNamespaceException(message, cause = Some(e))
case _ =>
}
}
super.classifyException(message, e)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuilder
import org.apache.commons.lang3.StringUtils

import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
Expand Down Expand Up @@ -253,6 +254,16 @@ abstract class JdbcDialect extends Serializable {
val nullable = if (isNullable) "NULL" else "NOT NULL"
s"ALTER TABLE $tableName ALTER COLUMN $columnName SET $nullable"
}

/**
* Gets a dialect exception, classifies it and wraps it by `AnalysisException`.
* @param message The error message to be placed to the returned exception.
* @param e The dialect specific exception.
* @return `AnalysisException` or its sub-class.
*/
def classifyException(message: String, e: Throwable): AnalysisException = {
new AnalysisException(message, cause = Some(e))
}
}

/**
Expand Down Expand Up @@ -297,6 +308,7 @@ object JdbcDialects {
registerDialect(DerbyDialect)
registerDialect(OracleDialect)
registerDialect(TeradataDialect)
registerDialect(H2Dialect)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we already have a testH2Dialect, how about we update testH2Dialect to implement classifyException, and use it in JDBCTableCatalogSuite? Then we don't need to have an official H2Dialect.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you look at testH2Dialect, it has test specific settings. Why should we have the settings in JDBCTableCatalogSuite?

Then we don't need to have an official H2Dialect.

What is the problem to have built-in H2Dialect?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, not related to this. If we don't want to support H2 officially as a dialect, why do we test it so broadly in Spark.?Maybe it makes sense to switch all internal Spark tests to Derby?


/**
* Fetch the JdbcDialect class corresponding to a given database url.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.Properties

import org.apache.spark.SparkConf
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -101,15 +101,18 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
Seq(Row("test", "dst_table"), Row("test", "people")))
}
// Rename not existing table or namespace
Seq(
"h2.test.not_existing_table" -> "Table \"not_existing_table\" not found",
"h2.bad_test.not_existing_table" -> "Schema \"bad_test\" not found"
).foreach { case (table, expectedMsg) =>
val msg = intercept[org.h2.jdbc.JdbcSQLException] {
sql(s"ALTER TABLE $table RENAME TO test.dst_table")
}.getMessage
assert(msg.contains(expectedMsg))
val exp1 = intercept[NoSuchTableException] {
sql(s"ALTER TABLE h2.test.not_existing_table RENAME TO test.dst_table")
}
assert(exp1.getMessage.contains(
"Failed table renaming from test.not_existing_table to test.dst_table"))
assert(exp1.cause.get.getMessage.contains("Table \"not_existing_table\" not found"))
val exp2 = intercept[NoSuchNamespaceException] {
sql(s"ALTER TABLE h2.bad_test.not_existing_table RENAME TO test.dst_table")
}
assert(exp2.getMessage.contains(
"Failed table renaming from bad_test.not_existing_table to test.dst_table"))
assert(exp2.cause.get.getMessage.contains("Schema \"bad_test\" not found"))
// Rename to an existing table
withTable("h2.test.dst_table") {
withConnection { conn =>
Expand All @@ -119,10 +122,12 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
withConnection { conn =>
conn.prepareStatement("""CREATE TABLE "test"."src_table" (id INTEGER)""").executeUpdate()
}
val msg = intercept[org.h2.jdbc.JdbcSQLException] {
val exp = intercept[TableAlreadyExistsException] {
sql("ALTER TABLE h2.test.src_table RENAME TO test.dst_table")
}.getMessage
assert(msg.contains("Table \"dst_table\" already exists"))
}
assert(exp.getMessage.contains(
"Failed table renaming from test.src_table to test.dst_table"))
assert(exp.cause.get.getMessage.contains("Table \"dst_table\" already exists"))
}
}
}
Expand Down Expand Up @@ -156,10 +161,11 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
}.getMessage
assert(msg.contains("Table test.new_table already exists"))
}
val msg = intercept[org.h2.jdbc.JdbcSQLException] {
val exp = intercept[NoSuchNamespaceException] {
sql("CREATE TABLE h2.bad_test.new_table(i INT, j STRING) USING _")
}.getMessage
assert(msg.contains("Schema \"bad_test\" not found"))
}
assert(exp.getMessage.contains("Failed table creation: bad_test.new_table"))
assert(exp.cause.get.getMessage.contains("Schema \"bad_test\" not found"))
}

test("alter table ... add column") {
Expand Down Expand Up @@ -289,15 +295,16 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
test("alter table ... update column comment not supported") {
withTable("h2.test.alt_table") {
sql("CREATE TABLE h2.test.alt_table (ID INTEGER) USING _")
val msg1 = intercept[java.sql.SQLFeatureNotSupportedException] {
val exp = intercept[AnalysisException] {
sql("ALTER TABLE h2.test.alt_table ALTER COLUMN ID COMMENT 'test'")
}.getMessage
assert(msg1.contains("Unsupported TableChange"))
}
assert(exp.getMessage.contains("Failed table altering: test.alt_table"))
assert(exp.cause.get.getMessage.contains("Unsupported TableChange"))
// Update comment for not existing column
val msg2 = intercept[AnalysisException] {
val msg = intercept[AnalysisException] {
sql("ALTER TABLE h2.test.alt_table ALTER COLUMN bad_column COMMENT 'test'")
}.getMessage
assert(msg2.contains("Cannot update missing field bad_column in test.alt_table"))
assert(msg.contains("Cannot update missing field bad_column in test.alt_table"))
}
// Update column comments in not existing table and namespace
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,9 +770,14 @@ class JDBCSuite extends QueryTest
}

test("Dialect unregister") {
JdbcDialects.registerDialect(testH2Dialect)
JdbcDialects.unregisterDialect(testH2Dialect)
assert(JdbcDialects.get(urlWithUserAndPass) == NoopDialect)
JdbcDialects.unregisterDialect(H2Dialect)
try {
JdbcDialects.registerDialect(testH2Dialect)
JdbcDialects.unregisterDialect(testH2Dialect)
assert(JdbcDialects.get(urlWithUserAndPass) == NoopDialect)
} finally {
JdbcDialects.registerDialect(H2Dialect)
}
}

test("Aggregated dialects") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,24 +194,29 @@ class JDBCWriteSuite extends SharedSparkSession with BeforeAndAfter {
}

test("Truncate") {
JdbcDialects.registerDialect(testH2Dialect)
val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2)
val df3 = spark.createDataFrame(sparkContext.parallelize(arr2x3), schema3)

df.write.jdbc(url1, "TEST.TRUNCATETEST", properties)
df2.write.mode(SaveMode.Overwrite).option("truncate", true)
.jdbc(url1, "TEST.TRUNCATETEST", properties)
assert(1 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count())
assert(2 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length)
JdbcDialects.unregisterDialect(H2Dialect)
try {
JdbcDialects.registerDialect(testH2Dialect)
val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2)
val df3 = spark.createDataFrame(sparkContext.parallelize(arr2x3), schema3)

val m = intercept[AnalysisException] {
df3.write.mode(SaveMode.Overwrite).option("truncate", true)
df.write.jdbc(url1, "TEST.TRUNCATETEST", properties)
df2.write.mode(SaveMode.Overwrite).option("truncate", true)
.jdbc(url1, "TEST.TRUNCATETEST", properties)
}.getMessage
assert(m.contains("Column \"seq\" not found"))
assert(0 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count())
JdbcDialects.unregisterDialect(testH2Dialect)
assert(1 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count())
assert(2 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length)

val m = intercept[AnalysisException] {
df3.write.mode(SaveMode.Overwrite).option("truncate", true)
.jdbc(url1, "TEST.TRUNCATETEST", properties)
}.getMessage
assert(m.contains("Column \"seq\" not found"))
assert(0 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count())
} finally {
JdbcDialects.unregisterDialect(testH2Dialect)
JdbcDialects.registerDialect(H2Dialect)
}
}

test("createTableOptions") {
Expand Down