Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal
import org.apache.spark.sql.catalyst.expressions.{Expression, _}
import org.apache.spark.sql.catalyst.expressions.objects._
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

Expand Down Expand Up @@ -1003,7 +1004,7 @@ trait ScalaReflection extends Logging {
val primaryConstructorSymbol: Option[Symbol] = constructorSymbol.asTerm.alternatives.find(
s => s.isMethod && s.asMethod.isPrimaryConstructor)
if (primaryConstructorSymbol.isEmpty) {
sys.error("Internal SQL error: Product object did not have a primary constructor.")
throw QueryExecutionErrors.primaryConstructorNotFoundError(tpe.getClass)
} else {
primaryConstructorSymbol.get.asMethod.paramLists
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3338,7 +3338,7 @@ class Analyzer(override val catalogManager: CatalogManager)
case _ : InnerLike =>
(leftKeys ++ lUniqueOutput ++ rUniqueOutput, rightKeys)
case _ =>
sys.error("Unsupported natural join type " + joinType)
throw QueryExecutionErrors.unsupportedNaturalJoinTypeError(joinType)
}
// use Project to hide duplicated common keys
// propagate hidden columns from nested USING/NATURAL JOINs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
import org.apache.spark.sql.catalyst.expressions.objects.{AssertNotNull, InitializeJavaBean, Invoke, NewInstance}
import org.apache.spark.sql.catalyst.optimizer.{ReassignLambdaVariableID, SimplifyCasts}
import org.apache.spark.sql.catalyst.plans.logical.{CatalystSerde, DeserializeToObject, LeafNode, LocalRelation}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ObjectType, StringType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -404,7 +405,7 @@ case class ExpressionEncoder[T](
def assertUnresolved(): Unit = {
(deserializer +: serializer).foreach(_.foreach {
case a: AttributeReference if a.name != "loopVar" =>
sys.error(s"Unresolved encoder expected, but $a was found.")
throw QueryExecutionErrors.notExpectedUnresolvedEncoderError(a)
case _ =>
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst

import org.apache.spark.sql.Encoder
import org.apache.spark.sql.errors.QueryExecutionErrors

package object encoders {
/**
Expand All @@ -30,6 +31,6 @@ package object encoders {
case e: ExpressionEncoder[A] =>
e.assertUnresolved()
e
case _ => sys.error(s"Only expression encoders are supported today")
case _ => throw QueryExecutionErrors.unsupportedEncoderError()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,8 @@ abstract class UnaryExpression extends Expression with UnaryLike[Expression] {
* of evaluation process, we should override [[eval]].
*/
protected def nullSafeEval(input: Any): Any =
sys.error(s"UnaryExpressions must override either eval or nullSafeEval")
throw QueryExecutionErrors.notOverrideExpectedMethodsError("UnaryExpressions",
"eval", "nullSafeEval")

/**
* Called by unary expressions to generate a code block that returns null if its parent returns
Expand Down Expand Up @@ -578,7 +579,8 @@ abstract class BinaryExpression extends Expression with BinaryLike[Expression] {
* of evaluation process, we should override [[eval]].
*/
protected def nullSafeEval(input1: Any, input2: Any): Any =
sys.error(s"BinaryExpressions must override either eval or nullSafeEval")
throw QueryExecutionErrors.notOverrideExpectedMethodsError("BinaryExpressions",
"eval", "nullSafeEval")

/**
* Short hand for generating binary evaluation code.
Expand Down Expand Up @@ -722,7 +724,8 @@ abstract class TernaryExpression extends Expression with TernaryLike[Expression]
* of evaluation process, we should override [[eval]].
*/
protected def nullSafeEval(input1: Any, input2: Any, input3: Any): Any =
sys.error(s"TernaryExpressions must override either eval or nullSafeEval")
throw QueryExecutionErrors.notOverrideExpectedMethodsError("TernaryExpressions",
"eval", "nullSafeEval")

/**
* Short hand for generating ternary evaluation code.
Expand Down Expand Up @@ -822,7 +825,8 @@ abstract class QuaternaryExpression extends Expression with QuaternaryLike[Expre
* full control of evaluation process, we should override [[eval]].
*/
protected def nullSafeEval(input1: Any, input2: Any, input3: Any, input4: Any): Any =
sys.error(s"QuaternaryExpressions must override either eval or nullSafeEval")
throw QueryExecutionErrors.notOverrideExpectedMethodsError("QuaternaryExpressions",
"eval", "nullSafeEval")

/**
* Short hand for generating quaternary evaluation code.
Expand Down Expand Up @@ -947,7 +951,8 @@ abstract class SeptenaryExpression extends Expression {
input5: Any,
input6: Any,
input7: Option[Any]): Any = {
sys.error("SeptenaryExpression must override either eval or nullSafeEval")
throw QueryExecutionErrors.notOverrideExpectedMethodsError("SeptenaryExpression",
"eval", "nullSafeEval")
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,13 @@ abstract class BinaryArithmetic extends BinaryOperator with NullIntolerant {

/** Name of the function for this expression on a [[Decimal]] type. */
def decimalMethod: String =
sys.error("BinaryArithmetics must override either decimalMethod or genCode")
throw QueryExecutionErrors.notOverrideExpectedMethodsError("BinaryArithmetics",
"decimalMethod", "genCode")

/** Name of the function for this expression on a [[CalendarInterval]] type. */
def calendarIntervalMethod: String =
sys.error("BinaryArithmetics must override either calendarIntervalMethod or genCode")
throw QueryExecutionErrors.notOverrideExpectedMethodsError("BinaryArithmetics",
"calendarIntervalMethod", "genCode")

// Name of the function for the exact version of this expression in [[Math]].
// If the option "spark.sql.ansi.enabled" is enabled and there is corresponding
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ trait SimpleHigherOrderFunction extends HigherOrderFunction with BinaryLike[Expr
* in order to save null-check code.
*/
protected def nullSafeEval(inputRow: InternalRow, argumentValue: Any): Any =
sys.error(s"UnaryHigherOrderFunction must override either eval or nullSafeEval")
throw QueryExecutionErrors.notOverrideExpectedMethodsError("UnaryHigherOrderFunction",
"eval", "nullSafeEval")

override def eval(inputRow: InternalRow): Any = {
val value = argument.eval(inputRow)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ case class NewInstance(
val paramTypes = ScalaReflection.expressionJavaClasses(arguments)
val getConstructor = (paramClazz: Seq[Class[_]]) => {
ScalaReflection.findConstructor(cls, paramClazz).getOrElse {
sys.error(s"Couldn't find a valid constructor on $cls")
throw QueryExecutionErrors.constructorNotFoundError(cls.toString)
}
}
outerPointer.map { p =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -182,8 +183,7 @@ private[sql] class JacksonGenerator(
case _ =>
(row: SpecializedGetters, ordinal: Int) =>
val v = row.get(ordinal, dataType)
sys.error(s"Failed to convert value $v (class of ${v.getClass}}) " +
s"with the type of $dataType to JSON.")
throw QueryExecutionErrors.failToConvertValueToJsonError(v, v.getClass, dataType)
}

private def writeObject(f: => Unit): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.trees.TreePattern.{EXISTS_SUBQUERY, FILTER, IN_SUBQUERY,
LIST_SUBQUERY, SCALAR_SUBQUERY}
import org.apache.spark.sql.catalyst.trees.TreePattern.{EXISTS_SUBQUERY, FILTER, IN_SUBQUERY, LIST_SUBQUERY, SCALAR_SUBQUERY}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -502,13 +502,14 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] with AliasHelpe
bottomPart = child

case Filter(_, op) =>
sys.error(s"Correlated subquery has unexpected operator $op below filter")
throw QueryExecutionErrors.unexpectedOperatorInCorrelatedSubquery(op, " below filter")

case op @ _ => sys.error(s"Unexpected operator $op in correlated subquery")
case op @ _ => throw QueryExecutionErrors.unexpectedOperatorInCorrelatedSubquery(op)
}
}

sys.error("This line should be unreachable")
throw QueryExecutionErrors.unreachableError()

}

// Name of generated column used in rewrite below
Expand Down Expand Up @@ -576,7 +577,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] with AliasHelpe
subqueryRoot = Project(projList ++ havingInputs, subqueryRoot)
case s @ SubqueryAlias(alias, _) =>
subqueryRoot = SubqueryAlias(alias, subqueryRoot)
case op => sys.error(s"Unexpected operator $op in correlated subquery")
case op => throw QueryExecutionErrors.unexpectedOperatorInCorrelatedSubquery(op)
}

// CASE WHEN alwaysTrue IS NULL THEN resultOnZeroTups
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats
import org.apache.spark.sql.catalyst.trees.{BinaryLike, LeafLike, UnaryLike}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types.StructType


Expand Down Expand Up @@ -85,7 +86,7 @@ abstract class LogicalPlan
schema.map { field =>
resolve(field.name :: Nil, resolver).map {
case a: AttributeReference => a
case _ => sys.error(s"can not handle nested schema yet... plan $this")
case _ => throw QueryExecutionErrors.resolveCannotHandleNestedSchema(this)
}.getOrElse {
throw new AnalysisException(
s"Unable to resolve ${field.name} given [${output.map(_.name).mkString(", ")}]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.rules.UnknownRuleId
import org.apache.spark.sql.catalyst.trees.TreePattern.TreePattern
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -674,7 +675,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Tre
// Skip no-arg constructors that are just there for kryo.
val ctors = allCtors.filter(allowEmptyArgs || _.getParameterTypes.size != 0)
if (ctors.isEmpty) {
sys.error(s"No valid constructor for $nodeName")
throw QueryExecutionErrors.constructorNotFoundError(nodeName)
}
val allArgs: Array[AnyRef] = if (otherCopyArgs.isEmpty) {
newArgs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ object DateTimeUtils {
case TRUNC_TO_YEAR => days - getDayInYear(days) + 1
case _ =>
// caller make sure that this should never be reached
sys.error(s"Invalid trunc level: $level")
throw QueryExecutionErrors.unreachableError(s": Invalid trunc level: $level")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.memory.SparkOutOfMemoryError
import org.apache.spark.sql.catalyst.analysis.UnresolvedGenerator
import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
import org.apache.spark.sql.catalyst.expressions.{Expression, UnevaluableAggregate}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, UnevaluableAggregate}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.expressions.Transform
Expand Down Expand Up @@ -247,6 +249,51 @@ object QueryExecutionErrors {
"in any enclosing class nor any supertype")
}

def constructorNotFoundError(cls: String): Throwable = {
new RuntimeException(s"Couldn't find a valid constructor on $cls")
}

def primaryConstructorNotFoundError(cls: Class[_]): Throwable = {
new RuntimeException(s"Couldn't find a primary constructor on $cls")
}

def unsupportedNaturalJoinTypeError(joinType: JoinType): Throwable = {
new RuntimeException("Unsupported natural join type " + joinType)
}

def notExpectedUnresolvedEncoderError(attr: AttributeReference): Throwable = {
new RuntimeException(s"Unresolved encoder expected, but $attr was found.")
}

def unsupportedEncoderError(): Throwable = {
new RuntimeException("Only expression encoders are supported for now.")
}

def notOverrideExpectedMethodsError(className: String, m1: String, m2: String): Throwable = {
new RuntimeException(s"$className must override either $m1 or $m2")
}

def failToConvertValueToJsonError(value: AnyRef, cls: Class[_], dataType: DataType): Throwable = {
new RuntimeException(s"Failed to convert value $value (class of $cls) " +
s"with the type of $dataType to JSON.")
}

def unexpectedOperatorInCorrelatedSubquery(op: LogicalPlan, pos: String = ""): Throwable = {
new RuntimeException(s"Unexpected operator $op in correlated subquery" + pos)
}

def unreachableError(err: String = ""): Throwable = {
new RuntimeException("This line should be unreachable" + err)
}

def unsupportedRoundingMode(roundMode: BigDecimal.RoundingMode.Value): Throwable = {
new RuntimeException(s"Not supported rounding mode: $roundMode")
}

def resolveCannotHandleNestedSchema(plan: LogicalPlan): Throwable = {
new RuntimeException(s"Can not handle nested schema yet... plan $plan")
}

def inputExternalRowCannotBeNullError(): RuntimeException = {
new RuntimeException("The input external row cannot be null.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ final class Decimal extends Ordered[Decimal] with Serializable {
longVal += (if (droppedDigits < 0) -1L else 1L)
}
case _ =>
sys.error(s"Not supported rounding mode: $roundMode")
throw QueryExecutionErrors.unsupportedRoundingMode(roundMode)
}
} else if (scale > _scale) {
// We might be able to multiply longVal by a power of 10 and not overflow, but if not,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.Locale
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, NamedTransform, Transform}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType}


Expand Down Expand Up @@ -82,7 +83,8 @@ private[spark] object SchemaUtils {
} else if (resolver == caseInsensitiveResolution) {
false
} else {
sys.error("A resolver to check if two identifiers are equal must be " +
throw QueryExecutionErrors.unreachableError(
": A resolver to check if two identifiers are equal must be " +
"`caseSensitiveResolution` or `caseInsensitiveResolution` in o.a.s.sql.catalyst.")
}
}
Expand Down