Skip to content

[SPARK-51906][SQL] Dsv2 expressions in alter table add columns #50701

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from

Conversation

szehon-ho
Copy link
Contributor

@szehon-ho szehon-ho commented Apr 25, 2025

What changes were proposed in this pull request?

Add DSV2 expression to alter table add columm.

This harmonizes with the Create/Replace table default value code path (#50593) and now changes the plan to:

  1. Change from modeling default value from string to DefaultValueExpression
  2. Change its parent QualifiedColType to Expression so that it gets handled by the main loop

This will then analyze/resolve the user-provided string to an Expression which can neatly be converted to V2Expression.

It also has side effect that the existsDefault will use this as well (instead of being analyzed/optimized by manually calling ResolveDefaultColumns::analyze), hence the original exception flow in that method is now integrated into various other rules in the analyzer phases.

Why are the changes needed?

Data sources implementing DSV2 need to get a V2 expression to interpret the default value.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Add new test in DataSourceV2DataFrameSuite

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Apr 25, 2025
@szehon-ho
Copy link
Contributor Author

Note: this is dependent on same changes of #50593 and will rebase once that it in.

@szehon-ho szehon-ho changed the title [SPARK-51906][SQL] Dsv2 expressions in alter table add columns [WIP][SPARK-51906][SQL] Dsv2 expressions in alter table add columns Apr 25, 2025
@szehon-ho szehon-ho force-pushed the add_column_default_val branch 2 times, most recently from 8197cba to 141285b Compare April 30, 2025 23:52
@szehon-ho szehon-ho changed the title [WIP][SPARK-51906][SQL] Dsv2 expressions in alter table add columns [SPARK-51906][SQL] Dsv2 expressions in alter table add columns Apr 30, 2025
@szehon-ho szehon-ho force-pushed the add_column_default_val branch from 477aeee to c897dc6 Compare May 1, 2025 19:02
@szehon-ho szehon-ho force-pushed the add_column_default_val branch from c897dc6 to 2409b2d Compare May 2, 2025 01:09
@szehon-ho
Copy link
Contributor Author

@cloud-fan @aokolnychyi can you guys take a look when you have time? Thanks

r.copy(columns = storeDefaultForColumnDefinition(r.columns))
case a: AddColumns =>
// Wrap analysis errors for default values in a more user-friendly message.
a.columnsToAdd.foreach { c =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: This is to throw the previously expected exception (defaultValueUnresolved). Tried to move it to CheckAnalysis, but its a bit too late and the more generic Unresolved exception is already thrown there if any Expression is still unresolved, before it gets to the checkAlterTable part.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My gut tells me to look for another way and keep this rule generic, as it was before. Let me think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should work as is but I am not sure about unresolved expressions. When can we hit this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated it to a better place

colsToAdd.foreach { col =>
col.default.foreach { d =>
if (!d.deterministic) {
throw QueryCompilationErrors.defaultValueNotConstantError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but the check is for deterministic, not constant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to pass the test: https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala#L3564

sorry do you have thought what check I can do instead? If i change it to '!d.foldable', it wont work because optimizer hasnt run yet and we have expressions like 1 + 2.

if we dont have this here, we get the generic error 'INVALID_NON_DETERMINISTIC_EXPRESSIONS' from CheckAnalysis (https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L887)

Copy link
Contributor Author

@szehon-ho szehon-ho May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, i moved this to ColumnDefinition to centralize the check code, but as is, didnt find a good way to change the check

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but 1 + 2 is a foldable expression, and previously we also used the .foldable check: https://github.com/apache/spark/pull/50701/files#diff-e28f12190a8d2f42328550508b1021fb22222ac924065aa2a769da68a51bb3beL145

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops i think i tried d.foldable. It should be d.child.foldable. I made the change, let me try it and see if its better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually this one fails https://github.com/apache/spark/pull/50701/files#diff-1ca52084cb1c8f816d6002d507797e46ca495db46e0328220a168b6a59b4af60R528 (current_catalog() as default value)

I think the previous version ran 'FinishAnalysis', which replaced the 'current_xxx' before the foldable check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK , making another try, now I remove both checks (this one, and escape the existing one : 'INVALID_NON_DETERMINISTIC_EXPRESSIONS' from CheckAnalysis (https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L887), and rely on the later runtime toV2Col check.

@aokolnychyi
Copy link
Contributor

Let's make sure that other commands like REPLACE column are handled as well in a separate PR.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 1cec856 May 9, 2025
@@ -438,6 +439,47 @@ class DataSourceV2DataFrameSuite
}
}

test("alter table with complex foldable default values") {
Copy link
Contributor

@LuciferYang LuciferYang May 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new test tends to fail when executed in Non-ANSI Mode. We can reproduce the issue locally through the following approaches:

SPARK_ANSI_SQL_MODE=false build/sbt clean "sql/testOnly org.apache.spark.sql.connector.DataSourceV2DataFrameSuite"
[info] - alter table with complex foldable default values *** FAILED *** (27 milliseconds)
[info]   ColumnDefaultValue{sql=(100 + 23), expression=null, value=123} did not equal ColumnDefaultValue{sql=(100 + 23), expression=100 + 23, value=123} Default value mismatch for column 'org.apache.spark.sql.connector.catalog.TableChange$AddColumn@c1d818d1': expected ColumnDefaultValue{sql=(100 + 23), expression=100 + 23, value=123} but found ColumnDefaultValue{sql=(100 + 23), expression=null, value=123} (DataSourceV2DataFrameSuite.scala:587)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at org.apache.spark.sql.connector.DataSourceV2DataFrameSuite.$anonfun$checkDefaultValues$2(DataSourceV2DataFrameSuite.scala:587)
[info]   at scala.collection.ArrayOps$.foreach$extension(ArrayOps.scala:1324)
[info]   at org.apache.spark.sql.connector.DataSourceV2DataFrameSuite.checkDefaultValues(DataSourceV2DataFrameSuite.scala:585)
[info]   at org.apache.spark.sql.connector.DataSourceV2DataFrameSuite.$anonfun$new$52(DataSourceV2DataFrameSuite.scala:479)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
[info]   at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
[info]   at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
[info]   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:100)
[info]   at org.apache.spark.sql.test.SQLTestUtilsBase.withTable(SQLTestUtils.scala:312)
[info]   at org.apache.spark.sql.test.SQLTestUtilsBase.withTable$(SQLTestUtils.scala:310)
[info]   at org.apache.spark.sql.connector.InsertIntoTests.withTable(InsertIntoTests.scala:42)
[info]   at org.apache.spark.sql.connector.DataSourceV2DataFrameSuite.$anonfun$new$51(DataSourceV2DataFrameSuite.scala:444)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
[info]   at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
[info]   at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
[info]   at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
[info]   at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info]   at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info]   at org.apache.spark.sql.connector.InsertIntoTests.org$scalatest$BeforeAndAfter$$super$runTest(InsertIntoTests.scala:42)
[info]   at org.scalatest.BeforeAndAfter.runTest(BeforeAndAfter.scala:213)
[info]   at org.scalatest.BeforeAndAfter.runTest$(BeforeAndAfter.scala:203)
[info]   at org.apache.spark.sql.connector.InsertIntoTests.runTest(InsertIntoTests.scala:42)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:334)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info]   at org.scalatest.Suite.run(Suite.scala:1114)
[info]   at org.scalatest.Suite.run$(Suite.scala:1096)
[info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.spark.sql.connector.InsertIntoTests.org$scalatest$BeforeAndAfter$$super$run(InsertIntoTests.scala:42)
[info]   at org.scalatest.BeforeAndAfter.run(BeforeAndAfter.scala:273)
[info]   at org.scalatest.BeforeAndAfter.run$(BeforeAndAfter.scala:271)
[info]   at org.apache.spark.sql.connector.InsertIntoTests.run(InsertIntoTests.scala:42)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info]   at java.base/java.lang.Thread.run(Thread.java:840)

@szehon-ho Do you have time to take a look? Thanks
also cc @cloud-fan @aokolnychyi

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, let me take a look

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like its a common problem for both tests: create/replace table with complex foldable default values and this one. i can continue look tomorrow

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LuciferYang i discussed offline with @aokolnychyi , looks like its expected because Catalyst => V2Expression conversion is only enabled for many operators, if ANSI is enabled. I think, V2Expression is used for table format like Iceberg, so it only make sense to accept ANSI and not Spark dialect. I made the test fix in #50851, if you want to take a look if it solves it?

HyukjinKwon pushed a commit that referenced this pull request May 12, 2025
### What changes were proposed in this pull request?
Limit the tests added in #50701 and #50593   only for ANSI_ENABLED mode.

### Why are the changes needed?
These tests fail in non-ANSI mode.  The reason is that https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala only converts majority of catalyst => V2 expressions in ANSI mode.  So , we do not get any V2Expression in non-ANSI case.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing unit test run with SPARK_ANSI_SQL_MODE=false

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #50851 from szehon-ho/add_column_default_val_follow.

Authored-by: Szehon Ho <szehon.apache@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
cloud-fan pushed a commit that referenced this pull request May 14, 2025
…lues

### What changes were proposed in this pull request?
Add new exception subclass 'INVALID_DEFAULT_VALUE.NON_DETERMINISTIC', for Create/replace/alter table for column with non-deterministic default value.

### Why are the changes needed?
It was pointed out in #50701 that the existing test was wrongly asserting 'NOT_CONSTANT' for non-deterministic functions like rand().  Non-constant is not the same as non-deterministic, as there are some functions like current_xxx() that are deterministic but not-constant.  These are actually resolved later in FinishAnalysis (ReplaceCurrentLike rule), so they are constant in the end.

In #50701  we temporarily reverted back to the generic INVALID_NON_DETERMINISTIC_EXPRESSIONS exception in this case, and in this pr we are making a more user-friendly one for Default value.
So we should have another exception type for this case.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit test to DataSourceV2SQLSuite

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #50879 from szehon-ho/default_val_deterministic_error.

Authored-by: Szehon Ho <szehon.apache@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request May 20, 2025
### What changes were proposed in this pull request?
"Alter table alter column" to pass in V2 Expression to DSV2.
Like the similar changes (#50593) and (#50701), the existing logic is rewritten to use the main Analyzer loop to get this expression, instead of manual call to ResolveDefaultColumns to analyze.

We enhance the UpdateColumnValue (TableChanges API) to return DefaultValue (which contains the V2 Expression), in addition to the existing API returning String representation of the default value.

### Why are the changes needed?
DSV2 (example, Iceberg/Delta) should get modeled V2 Expression to set default value.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added tests in DataSourceV2DataFrameSuite

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #50864 from szehon-ho/replace_col_default_val.

Authored-by: Szehon Ho <szehon.apache@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants