Skip to content

Commit 1cec856

Browse files
szehon-hocloud-fan
authored andcommitted
[SPARK-51906][SQL] Dsv2 expressions in alter table add columns
### What changes were proposed in this pull request? Add DSV2 expression to alter table add columm. This harmonizes with the Create/Replace table default value code path (#50593) and now changes the plan to: 1. Change from modeling default value from string to DefaultValueExpression 2. Change its parent QualifiedColType to Expression so that it gets handled by the main loop This will then analyze/resolve the user-provided string to an Expression which can neatly be converted to V2Expression. It also has side effect that the existsDefault will use this as well (instead of being analyzed/optimized by manually calling ResolveDefaultColumns::analyze), hence the original exception flow in that method is now integrated into various other rules in the analyzer phases. ### Why are the changes needed? Data sources implementing DSV2 need to get a V2 expression to interpret the default value. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add new test in DataSourceV2DataFrameSuite ### Was this patch authored or co-authored using generative AI tooling? No Closes #50701 from szehon-ho/add_column_default_val. Authored-by: Szehon Ho <szehon.apache@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 6074fd9 commit 1cec856

File tree

8 files changed

+125
-26
lines changed

8 files changed

+125
-26
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5145,7 +5145,7 @@ class AstBuilder extends DataTypeAstBuilder
51455145
// Add the 'DEFAULT expression' clause in the column definition, if any, to the column metadata.
51465146
val defaultExpr = defaultExpression.map(visitDefaultExpression).map { field =>
51475147
if (conf.getConf(SQLConf.ENABLE_DEFAULT_COLUMNS)) {
5148-
field.originalSQL
5148+
field
51495149
} else {
51505150
throw QueryParsingErrors.defaultColumnNotEnabledError(ctx)
51515151
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,18 @@ object ColumnDefinition {
164164
}
165165
}
166166

167+
case cmd: AddColumns if cmd.columnsToAdd.exists(_.default.isDefined) =>
168+
// Wrap analysis errors for default values in a more user-friendly message.
169+
cmd.columnsToAdd.foreach { c =>
170+
c.default.foreach { d =>
171+
if (!d.resolved) {
172+
throw QueryCompilationErrors.defaultValuesUnresolvedExprError(
173+
"ALTER TABLE", c.colName, d.originalSQL, null)
174+
}
175+
validateDefaultValueExpr(d, "ALTER TABLE", c.colName, c.dataType)
176+
}
177+
}
178+
167179
case _ =>
168180
}
169181
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,9 @@
1818
package org.apache.spark.sql.catalyst.plans.logical
1919

2020
import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition}
21-
import org.apache.spark.sql.catalyst.expressions.Attribute
21+
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Unevaluable}
2222
import org.apache.spark.sql.catalyst.trees.{LeafLike, UnaryLike}
23-
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
2423
import org.apache.spark.sql.connector.catalog.ColumnDefaultValue
25-
import org.apache.spark.sql.connector.expressions.LiteralValue
2624
import org.apache.spark.sql.errors.QueryExecutionErrors
2725
import org.apache.spark.sql.types.DataType
2826

@@ -134,19 +132,20 @@ case class QualifiedColType(
134132
nullable: Boolean,
135133
comment: Option[String],
136134
position: Option[FieldPosition],
137-
default: Option[String]) {
135+
default: Option[DefaultValueExpression]) extends Expression with Unevaluable {
136+
override lazy val resolved: Boolean = path.forall(_.resolved) && position.forall(_.resolved) &&
137+
default.forall(_.resolved)
138+
138139
def name: Seq[String] = path.map(_.name).getOrElse(Nil) :+ colName
139140

140-
def resolved: Boolean = path.forall(_.resolved) && position.forall(_.resolved)
141+
def getV2Default(statement: String): ColumnDefaultValue =
142+
default.map(_.toV2(statement, colName)).orNull
143+
144+
override def children: Seq[Expression] = default.toSeq
141145

142-
def getV2Default: ColumnDefaultValue = {
143-
default.map { sql =>
144-
val e = ResolveDefaultColumns.analyze(colName, dataType, sql, "ALTER TABLE")
145-
assert(e.resolved && e.foldable,
146-
"The existence default value must be a simple SQL string that is resolved and foldable, " +
147-
"but got: " + sql)
148-
new ColumnDefaultValue(sql, LiteralValue(e.eval(), dataType))
149-
}.orNull
146+
override protected def withNewChildrenInternal(
147+
newChildren: IndexedSeq[Expression]): Expression = {
148+
copy(default = newChildren.headOption.map(_.asInstanceOf[DefaultValueExpression]))
150149
}
151150
}
152151

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ case class AddColumns(
123123
col.nullable,
124124
col.comment.orNull,
125125
col.position.map(_.position).orNull,
126-
col.getV2Default)
126+
col.getV2Default("ALTER TABLE"))
127127
}
128128
}
129129

@@ -159,7 +159,7 @@ case class ReplaceColumns(
159159
col.nullable,
160160
col.comment.orNull,
161161
null,
162-
col.getV2Default)
162+
col.getV2Default("ALTER TABLE"))
163163
}
164164
(deleteChanges ++ addChanges).toImmutableArraySeq
165165
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2723,7 +2723,8 @@ class DDLParserSuite extends AnalysisTest {
27232723
comparePlans(
27242724
parsePlan("ALTER TABLE t1 ADD COLUMN x int NOT NULL DEFAULT 42"),
27252725
AddColumns(UnresolvedTable(Seq("t1"), "ALTER TABLE ... ADD COLUMN"),
2726-
Seq(QualifiedColType(None, "x", IntegerType, false, None, None, Some("42")))))
2726+
Seq(QualifiedColType(None, "x", IntegerType, false, None, None,
2727+
Some(DefaultValueExpression(Literal(42), "42"))))))
27272728
comparePlans(
27282729
parsePlan("ALTER TABLE t1 ALTER COLUMN a.b.c SET DEFAULT 42"),
27292730
AlterColumns(
@@ -3138,7 +3139,7 @@ class DDLParserSuite extends AnalysisTest {
31383139
nullable = false,
31393140
comment = Some("a"),
31403141
position = Some(UnresolvedFieldPosition(first())),
3141-
default = Some("'abc'")
3142+
default = Some(DefaultValueExpression(Literal("abc"), "'abc'"))
31423143
)
31433144
)
31443145
)

sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -699,7 +699,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
699699
val builder = new MetadataBuilder
700700
col.comment.foreach(builder.putString("comment", _))
701701
col.default.map {
702-
value: String => builder.putString(DefaultCols.CURRENT_DEFAULT_COLUMN_METADATA_KEY, value)
702+
value: DefaultValueExpression => builder.putString(
703+
DefaultCols.CURRENT_DEFAULT_COLUMN_METADATA_KEY, value.originalSQL)
703704
}
704705
StructField(col.name.head, col.dataType, nullable = true, builder.build())
705706
}

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@ import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured
2424
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
2525
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect}
2626
import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, Identifier, InMemoryTableCatalog}
27+
import org.apache.spark.sql.connector.catalog.TableChange.AddColumn
2728
import org.apache.spark.sql.connector.expressions.{Cast => V2Cast, GeneralScalarExpression, LiteralValue, Transform}
2829
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
2930
import org.apache.spark.sql.execution.ExplainUtils.stripAQEPlan
30-
import org.apache.spark.sql.execution.datasources.v2.{CreateTableExec, DataSourceV2Relation, ReplaceTableExec}
31+
import org.apache.spark.sql.execution.datasources.v2.{AlterTableExec, CreateTableExec, DataSourceV2Relation, ReplaceTableExec}
3132
import org.apache.spark.sql.internal.SQLConf
3233
import org.apache.spark.sql.types.{BooleanType, CalendarIntervalType, IntegerType, StringType}
3334
import org.apache.spark.sql.util.QueryExecutionListener
@@ -438,6 +439,47 @@ class DataSourceV2DataFrameSuite
438439
}
439440
}
440441

442+
test("alter table with complex foldable default values") {
443+
val tableName = "testcat.ns1.ns2.tbl"
444+
withTable(tableName) {
445+
sql(
446+
s"""
447+
|CREATE TABLE $tableName (
448+
| dummy INT
449+
|) USING foo
450+
|""".stripMargin)
451+
452+
val alterExec = executeAndKeepPhysicalPlan[AlterTableExec] {
453+
sql(s"ALTER TABLE $tableName ADD COLUMNS (" +
454+
s"salary INT DEFAULT (100 + 23), " +
455+
s"dep STRING DEFAULT ('h' || 'r'), " +
456+
s"active BOOLEAN DEFAULT CAST(1 AS BOOLEAN))")
457+
}
458+
459+
checkDefaultValues(
460+
alterExec.changes.map(_.asInstanceOf[AddColumn]).toArray,
461+
Array(
462+
new ColumnDefaultValue(
463+
"(100 + 23)",
464+
new GeneralScalarExpression(
465+
"+",
466+
Array(LiteralValue(100, IntegerType), LiteralValue(23, IntegerType))),
467+
LiteralValue(123, IntegerType)),
468+
new ColumnDefaultValue(
469+
"('h' || 'r')",
470+
new GeneralScalarExpression(
471+
"CONCAT",
472+
Array(
473+
LiteralValue(UTF8String.fromString("h"), StringType),
474+
LiteralValue(UTF8String.fromString("r"), StringType))),
475+
LiteralValue(UTF8String.fromString("hr"), StringType)),
476+
new ColumnDefaultValue(
477+
"CAST(1 AS BOOLEAN)",
478+
new V2Cast(LiteralValue(1, IntegerType), IntegerType, BooleanType),
479+
LiteralValue(true, BooleanType))))
480+
}
481+
}
482+
441483
test("create/replace table with current like default values") {
442484
val tableName = "testcat.ns1.ns2.tbl"
443485
withTable(tableName) {
@@ -483,6 +525,37 @@ class DataSourceV2DataFrameSuite
483525
}
484526
}
485527

528+
test("alter table with current like default values") {
529+
val tableName = "testcat.ns1.ns2.tbl"
530+
withTable(tableName) {
531+
sql(
532+
s"""
533+
|CREATE TABLE $tableName (
534+
| dummy INT
535+
|) USING foo
536+
|""".stripMargin)
537+
538+
val alterExec = executeAndKeepPhysicalPlan[AlterTableExec] {
539+
sql(s"ALTER TABLE $tableName ADD COLUMNS (cat STRING DEFAULT current_catalog())")
540+
}
541+
542+
checkDefaultValues(
543+
alterExec.changes.map(_.asInstanceOf[AddColumn]).toArray,
544+
Array(
545+
new ColumnDefaultValue(
546+
"current_catalog()",
547+
null, /* no V2 expression */
548+
LiteralValue(UTF8String.fromString("spark_catalog"), StringType))))
549+
550+
val df1 = Seq(1).toDF("dummy")
551+
df1.writeTo(tableName).append()
552+
553+
checkAnswer(
554+
sql(s"SELECT * FROM $tableName"),
555+
Seq(Row(1, "spark_catalog")))
556+
}
557+
}
558+
486559
private def executeAndKeepPhysicalPlan[T <: SparkPlan](func: => Unit): T = {
487560
val qe = withQueryExecutionsCaptured(spark) {
488561
func
@@ -503,4 +576,18 @@ class DataSourceV2DataFrameSuite
503576
s"expected $expectedDefault but found ${column.defaultValue}")
504577
}
505578
}
579+
580+
private def checkDefaultValues(
581+
columns: Array[AddColumn],
582+
expectedDefaultValues: Array[ColumnDefaultValue]): Unit = {
583+
assert(columns.length == expectedDefaultValues.length)
584+
585+
columns.zip(expectedDefaultValues).foreach {
586+
case (column, expectedDefault) =>
587+
assert(
588+
column.defaultValue == expectedDefault,
589+
s"Default value mismatch for column '${column.toString}': " +
590+
s"expected $expectedDefault but found ${column.defaultValue}")
591+
}
592+
}
506593
}

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3552,19 +3552,18 @@ class DataSourceV2SQLSuiteV1Filter
35523552
}
35533553
}
35543554

3555-
test("SPARK-48286: Add new column with default value which is not foldable") {
3555+
test("SPARK-48286: Add new column with default value which is not deterministic") {
35563556
val foldableExpressions = Seq("1", "2 + 1")
35573557
withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> v2Source) {
35583558
withTable("tab") {
35593559
spark.sql(s"CREATE TABLE tab (col1 INT DEFAULT 100) USING $v2Source")
35603560
val exception = analysisException(
35613561
// Rand function is not foldable
35623562
s"ALTER TABLE tab ADD COLUMN col2 DOUBLE DEFAULT rand()")
3563-
assert(exception.getSqlState == "42623")
3564-
assert(exception.errorClass.get == "INVALID_DEFAULT_VALUE.NOT_CONSTANT")
3565-
assert(exception.messageParameters("colName") == "`col2`")
3566-
assert(exception.messageParameters("defaultValue") == "rand()")
3567-
assert(exception.messageParameters("statement") == "ALTER TABLE")
3563+
assert(exception.getSqlState == "42K0E")
3564+
assert(exception.errorClass.get == "INVALID_NON_DETERMINISTIC_EXPRESSIONS")
3565+
assert(exception.messageParameters("sqlExprs") ==
3566+
"\"qualifiedcoltype(defaultvalueexpression(rand()))\"")
35683567
}
35693568
foldableExpressions.foreach(expr => {
35703569
withTable("tab") {

0 commit comments

Comments
 (0)