Skip to content

Commit

Permalink
[SPARK-49542][SQL] Partition transform exception evaluate error
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Add user facing error for improper use of partition transform expressions.

### Why are the changes needed?
Replace internal error with user facing one.

### Does this PR introduce _any_ user-facing change?
Yes, new error condition.

### How was this patch tested?
Added tests to QueryExecutionErrorsSuite

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#48387 from dusantism-db/partition-transform-exception-evaluate-error.

Authored-by: Dušan Tišma <dusan.tisma@databricks.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
  • Loading branch information
dusantism-db authored and MaxGekk committed Oct 10, 2024
1 parent f0498f0 commit f003638
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 4 deletions.
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -3824,6 +3824,12 @@
],
"sqlState" : "42000"
},
"PARTITION_TRANSFORM_EXPRESSION_NOT_IN_PARTITIONED_BY" : {
"message" : [
"The expression <expression> must be inside 'partitionedBy'."
],
"sqlState" : "42S23"
},
"PATH_ALREADY_EXISTS" : {
"message" : [
"Path <outputPath> already exists. Set mode as \"overwrite\" to overwrite the existing path."
Expand Down
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-states.json
Original file line number Diff line number Diff line change
Expand Up @@ -4913,6 +4913,12 @@
"standard": "N",
"usedBy": ["SQL Server"]
},
"42S23": {
"description": "Partition transform expression not in 'partitionedBy'",
"origin": "Spark",
"standard": "N",
"usedBy": ["Spark"]
},
"44000": {
"description": "with check option violation",
"origin": "SQL/Foundation",
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/sql/tests/connect/test_parity_readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def test_api(self):

def test_partitioning_functions(self):
self.check_partitioning_functions(DataFrameWriterV2)
self.partitioning_functions_user_error()


if __name__ == "__main__":
Expand Down
30 changes: 30 additions & 0 deletions python/pyspark/sql/tests/test_readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ def check_api(self, tpe):

def test_partitioning_functions(self):
self.check_partitioning_functions(DataFrameWriterV2)
self.partitioning_functions_user_error()

def check_partitioning_functions(self, tpe):
import datetime
Expand All @@ -274,6 +275,35 @@ def check_partitioning_functions(self, tpe):
self.assertIsInstance(writer.partitionedBy(bucket(11, col("id"))), tpe)
self.assertIsInstance(writer.partitionedBy(bucket(3, "id"), hours(col("ts"))), tpe)

def partitioning_functions_user_error(self):
import datetime
from pyspark.sql.functions.partitioning import years, months, days, hours, bucket

df = self.spark.createDataFrame(
[(1, datetime.datetime(2000, 1, 1), "foo")], ("id", "ts", "value")
)

with self.assertRaisesRegex(
Exception, "PARTITION_TRANSFORM_EXPRESSION_NOT_IN_PARTITIONED_BY"
):
df.select(years("ts")).collect()
with self.assertRaisesRegex(
Exception, "PARTITION_TRANSFORM_EXPRESSION_NOT_IN_PARTITIONED_BY"
):
df.select(months("ts")).collect()
with self.assertRaisesRegex(
Exception, "PARTITION_TRANSFORM_EXPRESSION_NOT_IN_PARTITIONED_BY"
):
df.select(days("ts")).collect()
with self.assertRaisesRegex(
Exception, "PARTITION_TRANSFORM_EXPRESSION_NOT_IN_PARTITIONED_BY"
):
df.select(hours("ts")).collect()
with self.assertRaisesRegex(
Exception, "PARTITION_TRANSFORM_EXPRESSION_NOT_IN_PARTITIONED_BY"
):
df.select(bucket(2, "ts")).collect()

def test_create(self):
df = self.df
with self.table("test_table"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,10 @@ abstract class Expression extends TreeNode[Expression] {
trait FoldableUnevaluable extends Expression {
override def foldable: Boolean = true

final override def eval(input: InternalRow = null): Any =
override def eval(input: InternalRow = null): Any =
throw QueryExecutionErrors.cannotEvaluateExpressionError(this)

final override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode =
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode =
throw QueryExecutionErrors.cannotGenerateCodeForExpressionError(this)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.trees.UnaryLike
import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLExpr
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.{DataType, IntegerType}

Expand All @@ -37,8 +41,21 @@ import org.apache.spark.sql.types.{DataType, IntegerType}
abstract class PartitionTransformExpression extends Expression with Unevaluable
with UnaryLike[Expression] {
override def nullable: Boolean = true
}

override def eval(input: InternalRow): Any =
throw new SparkException(
errorClass = "PARTITION_TRANSFORM_EXPRESSION_NOT_IN_PARTITIONED_BY",
messageParameters = Map("expression" -> toSQLExpr(this)),
cause = null
)

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode =
throw new SparkException(
errorClass = "PARTITION_TRANSFORM_EXPRESSION_NOT_IN_PARTITIONED_BY",
messageParameters = Map("expression" -> toSQLExpr(this)),
cause = null
)
}
/**
* Expression for the v2 partition transform years.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, Encoder, Kry
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.{NamedParameter, UnresolvedGenerator}
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Concat, CreateArray, EmptyRow, Expression, Flatten, Grouping, Literal, RowNumber, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Concat, CreateArray, EmptyRow, Expression, Flatten, Grouping, Literal, RowNumber, UnaryExpression, Years}
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.expressions.objects.InitializeJavaBean
import org.apache.spark.sql.catalyst.rules.RuleIdCollection
import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLExpr
import org.apache.spark.sql.execution.datasources.jdbc.{DriverRegistry, JDBCOptions}
import org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider
import org.apache.spark.sql.execution.datasources.orc.OrcTest
Expand Down Expand Up @@ -1006,6 +1007,17 @@ class QueryExecutionErrorsSuite
sqlState = "XX000")
}

test("PartitionTransformExpression error on eval") {
val expr = Years(Literal("foo"))
val e = intercept[SparkException] {
expr.eval()
}
checkError(
exception = e,
condition = "PARTITION_TRANSFORM_EXPRESSION_NOT_IN_PARTITIONED_BY",
parameters = Map("expression" -> toSQLExpr(expr)))
}

test("INTERNAL_ERROR: Calling doGenCode on unresolved") {
val e = intercept[SparkException] {
val ctx = new CodegenContext
Expand Down

0 comments on commit f003638

Please sign in to comment.