Skip to content

Commit

Permalink
[SPARK-44786][SQL][CONNECT] Convert common Spark exceptions
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

- Convert common Spark exceptions
- Extend common Spark exceptions to support single message parameter constructor

### Why are the changes needed?

- Achieve similar exception conversion coverage as [Python Client](https://github.com/apache/spark/blob/master/python/pyspark/errors/exceptions/connect.py#L57-L89)

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

- Existing tests

Closes apache#42472 from heyihong/SPARK-44786.

Authored-by: Yihong He <yihong.he@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
heyihong authored and HyukjinKwon committed Aug 22, 2023
1 parent 1a4cacf commit dc900b4
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 54 deletions.
264 changes: 215 additions & 49 deletions common/utils/src/main/scala/org/apache/spark/SparkException.scala
Original file line number Diff line number Diff line change
Expand Up @@ -133,50 +133,107 @@ private[spark] case class ExecutorDeadException(message: String)
/**
* Exception thrown when Spark returns different result after upgrading to a new version.
*/
private[spark] class SparkUpgradeException(
private[spark] class SparkUpgradeException private(
message: String,
cause: Option[Throwable],
errorClass: Option[String],
messageParameters: Map[String, String])
extends RuntimeException(message, cause.orNull) with SparkThrowable {

def this(
errorClass: String,
messageParameters: Map[String, String],
cause: Throwable)
extends RuntimeException(
SparkThrowableHelper.getMessage(errorClass, messageParameters), cause)
with SparkThrowable {
cause: Throwable) = {
this(
SparkThrowableHelper.getMessage(errorClass, messageParameters),
Option(cause),
Option(errorClass),
messageParameters
)
}

def this(message: String, cause: Option[Throwable]) = {
this(
message,
cause = cause,
errorClass = None,
messageParameters = Map.empty
)
}

override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava

override def getErrorClass: String = errorClass
override def getErrorClass: String = errorClass.orNull
}

/**
* Arithmetic exception thrown from Spark with an error class.
*/
private[spark] class SparkArithmeticException(
private[spark] class SparkArithmeticException private(
message: String,
errorClass: Option[String],
messageParameters: Map[String, String],
context: Array[QueryContext])
extends ArithmeticException(message) with SparkThrowable {

def this(
errorClass: String,
messageParameters: Map[String, String],
context: Array[QueryContext],
summary: String)
extends ArithmeticException(
SparkThrowableHelper.getMessage(errorClass, messageParameters, summary))
with SparkThrowable {
summary: String) = {
this(
SparkThrowableHelper.getMessage(errorClass, messageParameters, summary),
Option(errorClass),
messageParameters,
context
)
}

def this(message: String) = {
this(
message,
errorClass = None,
messageParameters = Map.empty,
context = Array.empty
)
}

override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava

override def getErrorClass: String = errorClass
override def getErrorClass: String = errorClass.orNull
override def getQueryContext: Array[QueryContext] = context
}

/**
* Unsupported operation exception thrown from Spark with an error class.
*/
private[spark] class SparkUnsupportedOperationException(
private[spark] class SparkUnsupportedOperationException private(
message: String,
errorClass: Option[String],
messageParameters: Map[String, String])
extends UnsupportedOperationException(message) with SparkThrowable {

def this(
errorClass: String,
messageParameters: Map[String, String])
extends UnsupportedOperationException(
SparkThrowableHelper.getMessage(errorClass, messageParameters))
with SparkThrowable {
messageParameters: Map[String, String]) = {
this(
SparkThrowableHelper.getMessage(errorClass, messageParameters),
Option(errorClass),
messageParameters
)
}

def this(message: String) = {
this(
message,
errorClass = None,
messageParameters = Map.empty
)
}

override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava

override def getErrorClass: String = errorClass
override def getErrorClass: String = errorClass.orNull
}

/**
Expand Down Expand Up @@ -214,18 +271,38 @@ private[spark] class SparkConcurrentModificationException(
/**
* Datetime exception thrown from Spark with an error class.
*/
private[spark] class SparkDateTimeException(
private[spark] class SparkDateTimeException private(
message: String,
errorClass: Option[String],
messageParameters: Map[String, String],
context: Array[QueryContext])
extends DateTimeException(message) with SparkThrowable {

def this(
errorClass: String,
messageParameters: Map[String, String],
context: Array[QueryContext],
summary: String)
extends DateTimeException(
SparkThrowableHelper.getMessage(errorClass, messageParameters, summary))
with SparkThrowable {
summary: String) = {
this(
SparkThrowableHelper.getMessage(errorClass, messageParameters, summary),
Option(errorClass),
messageParameters,
context
)
}

def this(message: String) = {
this(
message,
errorClass = None,
messageParameters = Map.empty,
context = Array.empty
)
}

override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava

override def getErrorClass: String = errorClass
override def getErrorClass: String = errorClass.orNull
override def getQueryContext: Array[QueryContext] = context
}

Expand All @@ -247,54 +324,122 @@ private[spark] class SparkFileNotFoundException(
/**
* Number format exception thrown from Spark with an error class.
*/
private[spark] class SparkNumberFormatException(
private[spark] class SparkNumberFormatException private(
message: String,
errorClass: Option[String],
messageParameters: Map[String, String],
context: Array[QueryContext])
extends NumberFormatException(message)
with SparkThrowable {

def this(
errorClass: String,
messageParameters: Map[String, String],
context: Array[QueryContext],
summary: String)
extends NumberFormatException(
SparkThrowableHelper.getMessage(errorClass, messageParameters, summary))
with SparkThrowable {
summary: String) = {
this(
SparkThrowableHelper.getMessage(errorClass, messageParameters, summary),
Option(errorClass),
messageParameters,
context
)
}

def this(message: String) = {
this(
message,
errorClass = None,
messageParameters = Map.empty,
context = Array.empty
)
}

override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava

override def getErrorClass: String = errorClass
override def getErrorClass: String = errorClass.orNull
override def getQueryContext: Array[QueryContext] = context
}

/**
* Illegal argument exception thrown from Spark with an error class.
*/
private[spark] class SparkIllegalArgumentException(
private[spark] class SparkIllegalArgumentException private(
message: String,
cause: Option[Throwable],
errorClass: Option[String],
messageParameters: Map[String, String],
context: Array[QueryContext])
extends IllegalArgumentException(message, cause.orNull)
with SparkThrowable {

def this(
errorClass: String,
messageParameters: Map[String, String],
context: Array[QueryContext] = Array.empty,
summary: String = "",
cause: Throwable = null)
extends IllegalArgumentException(
SparkThrowableHelper.getMessage(errorClass, messageParameters, summary), cause)
with SparkThrowable {
cause: Throwable = null) = {
this(
SparkThrowableHelper.getMessage(errorClass, messageParameters, summary),
Option(cause),
Option(errorClass),
messageParameters,
context
)
}

def this(message: String, cause: Option[Throwable]) = {
this(
message,
cause = cause,
errorClass = None,
messageParameters = Map.empty,
context = Array.empty
)
}

override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava

override def getErrorClass: String = errorClass
override def getErrorClass: String = errorClass.orNull
override def getQueryContext: Array[QueryContext] = context
}

private[spark] class SparkRuntimeException(
private[spark] class SparkRuntimeException private(
message: String,
cause: Option[Throwable],
errorClass: Option[String],
messageParameters: Map[String, String],
context: Array[QueryContext])
extends RuntimeException(message, cause.orNull)
with SparkThrowable {

def this(
errorClass: String,
messageParameters: Map[String, String],
cause: Throwable = null,
context: Array[QueryContext] = Array.empty,
summary: String = "")
extends RuntimeException(
SparkThrowableHelper.getMessage(errorClass, messageParameters, summary),
cause)
with SparkThrowable {
summary: String = "") = {
this(
SparkThrowableHelper.getMessage(errorClass, messageParameters, summary),
Option(cause),
Option(errorClass),
messageParameters,
context
)
}

def this(message: String, cause: Option[Throwable]) = {
this(
message,
cause = cause,
errorClass = None,
messageParameters = Map.empty,
context = Array.empty
)
}

override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava

override def getErrorClass: String = errorClass
override def getErrorClass: String = errorClass.orNull
override def getQueryContext: Array[QueryContext] = context
}

Expand Down Expand Up @@ -335,18 +480,39 @@ private[spark] class SparkSecurityException(
/**
* Array index out of bounds exception thrown from Spark with an error class.
*/
private[spark] class SparkArrayIndexOutOfBoundsException(
private[spark] class SparkArrayIndexOutOfBoundsException private(
message: String,
errorClass: Option[String],
messageParameters: Map[String, String],
context: Array[QueryContext])
extends ArrayIndexOutOfBoundsException(message)
with SparkThrowable {

def this(
errorClass: String,
messageParameters: Map[String, String],
context: Array[QueryContext],
summary: String)
extends ArrayIndexOutOfBoundsException(
SparkThrowableHelper.getMessage(errorClass, messageParameters, summary))
with SparkThrowable {
summary: String) = {
this(
SparkThrowableHelper.getMessage(errorClass, messageParameters, summary),
Option(errorClass),
messageParameters,
context
)
}

def this(message: String) = {
this(
message,
errorClass = None,
messageParameters = Map.empty,
context = Array.empty
)
}

override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava

override def getErrorClass: String = errorClass
override def getErrorClass: String = errorClass.orNull
override def getQueryContext: Array[QueryContext] = context
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.Random

import org.scalatest.matchers.must.Matchers._

import org.apache.spark.SparkException
import org.apache.spark.{SparkException, SparkIllegalArgumentException}
import org.apache.spark.sql.connect.client.util.RemoteSparkSession

class ClientDataFrameStatSuite extends RemoteSparkSession {
Expand Down Expand Up @@ -87,7 +87,7 @@ class ClientDataFrameStatSuite extends RemoteSparkSession {

val results = df.stat.cov("singles", "doubles")
assert(math.abs(results - 55.0 / 3) < 1e-12)
intercept[SparkException] {
intercept[SparkIllegalArgumentException] {
df.stat.cov("singles", "letters") // doesn't accept non-numerical dataTypes
}
val decimalData = Seq.tabulate(6)(i => (BigDecimal(i % 3), BigDecimal(i % 2))).toDF("a", "b")
Expand Down
Loading

0 comments on commit dc900b4

Please sign in to comment.