Skip to content

Commit

Permalink
[SPARK-48585][SQL] Make built-in JdbcDialect's method `classifyExce…
Browse files Browse the repository at this point in the history
…ption` throw out the `original` exception

### What changes were proposed in this pull request?
The pr aims to make `built-in` JdbcDialect's method classifyException throw out the `original` exception.

### Why are the changes needed?
As discussed in apache#46912 (comment), the following code:
https://github.com/apache/spark/blob/df4156aa3217cf0f58b4c6cbf33c967bb43f7155/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala#L746-L751
have lost the original cause of the error, let's correct it.

### Does this PR introduce _any_ user-facing change?
Yes, more accurate error conditions for end users.

### How was this patch tested?
- Manually test.
- Update existed UT & Pass GA.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#46937 from panbingkun/improve_JDBCTableCatalog.

Authored-by: panbingkun <panbingkun@baidu.com>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
  • Loading branch information
panbingkun authored and LuciferYang committed Jun 18, 2024
1 parent c5809b6 commit a3feffd
Show file tree
Hide file tree
Showing 14 changed files with 52 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,16 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu

def testCreateTableWithProperty(tbl: String): Unit = {}

def checkErrorFailedLoadTable(e: AnalysisException, tbl: String): Unit = {
checkError(
private def checkErrorFailedJDBC(
e: AnalysisException,
errorClass: String,
tbl: String): Unit = {
checkErrorMatchPVals(
exception = e,
errorClass = "FAILED_JDBC.UNCLASSIFIED",
errorClass = errorClass,
parameters = Map(
"url" -> "jdbc:",
"message" -> s"Failed to load table: $tbl"
)
"url" -> "jdbc:.*",
"tableName" -> s"`$tbl`")
)
}

Expand Down Expand Up @@ -132,7 +134,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table ADD COLUMNS (C4 STRING)")
}
checkErrorFailedLoadTable(e, "not_existing_table")
checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table")
}

test("SPARK-33034: ALTER TABLE ... drop column") {
Expand All @@ -154,7 +156,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table DROP COLUMN C1")
}
checkErrorFailedLoadTable(e, "not_existing_table")
checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table")
}

test("SPARK-33034: ALTER TABLE ... update column type") {
Expand All @@ -170,7 +172,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN id TYPE DOUBLE")
}
checkErrorFailedLoadTable(e, "not_existing_table")
checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table")
}

test("SPARK-33034: ALTER TABLE ... rename column") {
Expand Down Expand Up @@ -198,7 +200,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table RENAME COLUMN ID TO C")
}
checkErrorFailedLoadTable(e, "not_existing_table")
checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table")
}

test("SPARK-33034: ALTER TABLE ... update column nullability") {
Expand All @@ -209,7 +211,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN ID DROP NOT NULL")
}
checkErrorFailedLoadTable(e, "not_existing_table")
checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table")
}

test("CREATE TABLE with table comment") {
Expand All @@ -231,7 +233,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"CREATE TABLE $catalogName.new_table (i INT) TBLPROPERTIES('a'='1')")
}
assert(e.getErrorClass == "FAILED_JDBC.UNCLASSIFIED")
checkErrorFailedJDBC(e, "FAILED_JDBC.CREATE_TABLE", "new_table")
testCreateTableWithProperty(s"$catalogName.new_table")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import org.apache.spark.sql.types.{DataType, MetadataBuilder}
*
* @param dialects List of dialects.
*/
private class AggregatedDialect(dialects: List[JdbcDialect]) extends JdbcDialect {
private class AggregatedDialect(dialects: List[JdbcDialect])
extends JdbcDialect with NoLegacyJDBCError {

require(dialects.nonEmpty)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.connector.expressions.Expression
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.types._

private case class DB2Dialect() extends JdbcDialect with SQLConfHelper {
private case class DB2Dialect() extends JdbcDialect with SQLConfHelper with NoLegacyJDBCError {

override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:db2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo
import org.apache.spark.sql.types._

private case class DatabricksDialect() extends JdbcDialect {
private case class DatabricksDialect() extends JdbcDialect with NoLegacyJDBCError {

override def canHandle(url: String): Boolean = {
url.startsWith("jdbc:databricks")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors
import org.apache.spark.sql.types._


private case class DerbyDialect() extends JdbcDialect {
private case class DerbyDialect() extends JdbcDialect with NoLegacyJDBCError {

override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:derby")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, N
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, MetadataBuilder, ShortType, StringType, TimestampType}

private[sql] case class H2Dialect() extends JdbcDialect {
private[sql] case class H2Dialect() extends JdbcDialect with NoLegacyJDBCError {
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:h2")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,23 @@ abstract class JdbcDialect extends Serializable with Logging {
metadata: MetadataBuilder): Unit = {}
}

/**
* Make the `classifyException` method throw out the original exception
*/
trait NoLegacyJDBCError extends JdbcDialect {

override def classifyException(
e: Throwable,
errorClass: String,
messageParameters: Map[String, String],
description: String): AnalysisException = {
new AnalysisException(
errorClass = errorClass,
messageParameters = messageParameters,
cause = Some(e))
}
}

/**
* :: DeveloperApi ::
* Registry of dialects that apply to every new jdbc `org.apache.spark.sql.DataFrame`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.jdbc.MsSqlServerDialect.{GEOGRAPHY, GEOMETRY}
import org.apache.spark.sql.types._


private case class MsSqlServerDialect() extends JdbcDialect {
private case class MsSqlServerDialect() extends JdbcDialect with NoLegacyJDBCError {
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:sqlserver")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.types._

private case class MySQLDialect() extends JdbcDialect with SQLConfHelper {
private case class MySQLDialect() extends JdbcDialect with SQLConfHelper with NoLegacyJDBCError {

override def canHandle(url : String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:mysql")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.jdbc.OracleDialect._
import org.apache.spark.sql.types._


private case class OracleDialect() extends JdbcDialect with SQLConfHelper {
private case class OracleDialect() extends JdbcDialect with SQLConfHelper with NoLegacyJDBCError {
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:oracle")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo
import org.apache.spark.sql.types._


private case class PostgresDialect() extends JdbcDialect with SQLConfHelper {
private case class PostgresDialect()
extends JdbcDialect with SQLConfHelper with NoLegacyJDBCError {

override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:postgresql")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Locale
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.types.{BooleanType, DataType}

private case class SnowflakeDialect() extends JdbcDialect {
private case class SnowflakeDialect() extends JdbcDialect with NoLegacyJDBCError {
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:snowflake")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.types._


private case class TeradataDialect() extends JdbcDialect {
private case class TeradataDialect() extends JdbcDialect with NoLegacyJDBCError {

override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:teradata")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,15 +619,15 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {

test("CREATE TABLE with table property") {
withTable("h2.test.new_table") {
checkError(
checkErrorMatchPVals(
exception = intercept[AnalysisException] {
sql("CREATE TABLE h2.test.new_table(i INT, j STRING)" +
" TBLPROPERTIES('ENGINE'='tableEngineName')")
},
errorClass = "FAILED_JDBC.UNCLASSIFIED",
errorClass = "FAILED_JDBC.CREATE_TABLE",
parameters = Map(
"url" -> "jdbc:",
"message" -> "Failed table creation: test.new_table"))
"url" -> "jdbc:.*",
"tableName" -> "`test`.`new_table`"))
}
}

Expand All @@ -639,14 +639,14 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
}

test("SPARK-42904: CREATE TABLE with char/varchar with invalid char length") {
checkError(
checkErrorMatchPVals(
exception = intercept[AnalysisException]{
sql("CREATE TABLE h2.test.new_table(c CHAR(1000000001))")
},
errorClass = "FAILED_JDBC.UNCLASSIFIED",
errorClass = "FAILED_JDBC.CREATE_TABLE",
parameters = Map(
"url" -> "jdbc:",
"message" -> "Failed table creation: test.new_table"))
"url" -> "jdbc:.*",
"tableName" -> "`test`.`new_table`"))
}

test("SPARK-42955: Skip classifyException and wrap AnalysisException for SparkThrowable") {
Expand Down

0 comments on commit a3feffd

Please sign in to comment.