-
Notifications
You must be signed in to change notification settings - Fork 28.5k
[SPARK-51906][SQL] Dsv2 expressions in alter table add columns #50701
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Note: this is dependent on same changes of #50593 and will rebase once that it in. |
8197cba
to
141285b
Compare
477aeee
to
c897dc6
Compare
c897dc6
to
2409b2d
Compare
@cloud-fan @aokolnychyi can you guys take a look when you have time? Thanks |
r.copy(columns = storeDefaultForColumnDefinition(r.columns)) | ||
case a: AddColumns => | ||
// Wrap analysis errors for default values in a more user-friendly message. | ||
a.columnsToAdd.foreach { c => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: This is to throw the previously expected exception (defaultValueUnresolved). Tried to move it to CheckAnalysis, but its a bit too late and the more generic Unresolved exception is already thrown there if any Expression is still unresolved, before it gets to the checkAlterTable part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My gut tells me to look for another way and keep this rule generic, as it was before. Let me think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should work as is but I am not sure about unresolved expressions. When can we hit this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated it to a better place
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Outdated
Show resolved
Hide resolved
colsToAdd.foreach { col => | ||
col.default.foreach { d => | ||
if (!d.deterministic) { | ||
throw QueryCompilationErrors.defaultValueNotConstantError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but the check is for deterministic, not constant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is to pass the test: https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala#L3564
sorry do you have thought what check I can do instead? If i change it to '!d.foldable', it wont work because optimizer hasnt run yet and we have expressions like 1 + 2.
if we dont have this here, we get the generic error 'INVALID_NON_DETERMINISTIC_EXPRESSIONS' from CheckAnalysis (https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L887)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, i moved this to ColumnDefinition to centralize the check code, but as is, didnt find a good way to change the check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but 1 + 2
is a foldable expression, and previously we also used the .foldable
check: https://github.com/apache/spark/pull/50701/files#diff-e28f12190a8d2f42328550508b1021fb22222ac924065aa2a769da68a51bb3beL145
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoops i think i tried d.foldable. It should be d.child.foldable. I made the change, let me try it and see if its better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually this one fails https://github.com/apache/spark/pull/50701/files#diff-1ca52084cb1c8f816d6002d507797e46ca495db46e0328220a168b6a59b4af60R528 (current_catalog() as default value)
I think the previous version ran 'FinishAnalysis', which replaced the 'current_xxx' before the foldable check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK , making another try, now I remove both checks (this one, and escape the existing one : 'INVALID_NON_DETERMINISTIC_EXPRESSIONS' from CheckAnalysis (https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L887), and rely on the later runtime toV2Col check.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala
Show resolved
Hide resolved
Let's make sure that other commands like REPLACE column are handled as well in a separate PR. |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
Outdated
Show resolved
Hide resolved
thanks, merging to master! |
@@ -438,6 +439,47 @@ class DataSourceV2DataFrameSuite | |||
} | |||
} | |||
|
|||
test("alter table with complex foldable default values") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new test tends to fail when executed in Non-ANSI Mode. We can reproduce the issue locally through the following approaches:
SPARK_ANSI_SQL_MODE=false build/sbt clean "sql/testOnly org.apache.spark.sql.connector.DataSourceV2DataFrameSuite"
[info] - alter table with complex foldable default values *** FAILED *** (27 milliseconds)
[info] ColumnDefaultValue{sql=(100 + 23), expression=null, value=123} did not equal ColumnDefaultValue{sql=(100 + 23), expression=100 + 23, value=123} Default value mismatch for column 'org.apache.spark.sql.connector.catalog.TableChange$AddColumn@c1d818d1': expected ColumnDefaultValue{sql=(100 + 23), expression=100 + 23, value=123} but found ColumnDefaultValue{sql=(100 + 23), expression=null, value=123} (DataSourceV2DataFrameSuite.scala:587)
[info] org.scalatest.exceptions.TestFailedException:
[info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info] at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info] at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info] at org.apache.spark.sql.connector.DataSourceV2DataFrameSuite.$anonfun$checkDefaultValues$2(DataSourceV2DataFrameSuite.scala:587)
[info] at scala.collection.ArrayOps$.foreach$extension(ArrayOps.scala:1324)
[info] at org.apache.spark.sql.connector.DataSourceV2DataFrameSuite.checkDefaultValues(DataSourceV2DataFrameSuite.scala:585)
[info] at org.apache.spark.sql.connector.DataSourceV2DataFrameSuite.$anonfun$new$52(DataSourceV2DataFrameSuite.scala:479)
[info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
[info] at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
[info] at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
[info] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:100)
[info] at org.apache.spark.sql.test.SQLTestUtilsBase.withTable(SQLTestUtils.scala:312)
[info] at org.apache.spark.sql.test.SQLTestUtilsBase.withTable$(SQLTestUtils.scala:310)
[info] at org.apache.spark.sql.connector.InsertIntoTests.withTable(InsertIntoTests.scala:42)
[info] at org.apache.spark.sql.connector.DataSourceV2DataFrameSuite.$anonfun$new$51(DataSourceV2DataFrameSuite.scala:444)
[info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
[info] at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
[info] at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
[info] at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
[info] at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
[info] at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
[info] at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
[info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info] at org.scalatest.Transformer.apply(Transformer.scala:22)
[info] at org.scalatest.Transformer.apply(Transformer.scala:20)
[info] at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info] at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
[info] at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info] at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info] at org.apache.spark.sql.connector.InsertIntoTests.org$scalatest$BeforeAndAfter$$super$runTest(InsertIntoTests.scala:42)
[info] at org.scalatest.BeforeAndAfter.runTest(BeforeAndAfter.scala:213)
[info] at org.scalatest.BeforeAndAfter.runTest$(BeforeAndAfter.scala:203)
[info] at org.apache.spark.sql.connector.InsertIntoTests.runTest(InsertIntoTests.scala:42)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info] at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info] at scala.collection.immutable.List.foreach(List.scala:334)
[info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info] at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info] at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info] at org.scalatest.Suite.run(Suite.scala:1114)
[info] at org.scalatest.Suite.run$(Suite.scala:1096)
[info] at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info] at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
[info] at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info] at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info] at org.apache.spark.sql.connector.InsertIntoTests.org$scalatest$BeforeAndAfter$$super$run(InsertIntoTests.scala:42)
[info] at org.scalatest.BeforeAndAfter.run(BeforeAndAfter.scala:273)
[info] at org.scalatest.BeforeAndAfter.run$(BeforeAndAfter.scala:271)
[info] at org.apache.spark.sql.connector.InsertIntoTests.run(InsertIntoTests.scala:42)
[info] at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info] at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info] at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info] at java.base/java.lang.Thread.run(Thread.java:840)
@szehon-ho Do you have time to take a look? Thanks
also cc @cloud-fan @aokolnychyi
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, let me take a look
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like its a common problem for both tests: create/replace table with complex foldable default values and this one. i can continue look tomorrow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@LuciferYang i discussed offline with @aokolnychyi , looks like its expected because Catalyst => V2Expression conversion is only enabled for many operators, if ANSI is enabled. I think, V2Expression is used for table format like Iceberg, so it only make sense to accept ANSI and not Spark dialect. I made the test fix in #50851, if you want to take a look if it solves it?
### What changes were proposed in this pull request? Limit the tests added in #50701 and #50593 only for ANSI_ENABLED mode. ### Why are the changes needed? These tests fail in non-ANSI mode. The reason is that https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala only converts majority of catalyst => V2 expressions in ANSI mode. So , we do not get any V2Expression in non-ANSI case. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit test run with SPARK_ANSI_SQL_MODE=false ### Was this patch authored or co-authored using generative AI tooling? No Closes #50851 from szehon-ho/add_column_default_val_follow. Authored-by: Szehon Ho <szehon.apache@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…lues ### What changes were proposed in this pull request? Add new exception subclass 'INVALID_DEFAULT_VALUE.NON_DETERMINISTIC', for Create/replace/alter table for column with non-deterministic default value. ### Why are the changes needed? It was pointed out in #50701 that the existing test was wrongly asserting 'NOT_CONSTANT' for non-deterministic functions like rand(). Non-constant is not the same as non-deterministic, as there are some functions like current_xxx() that are deterministic but not-constant. These are actually resolved later in FinishAnalysis (ReplaceCurrentLike rule), so they are constant in the end. In #50701 we temporarily reverted back to the generic INVALID_NON_DETERMINISTIC_EXPRESSIONS exception in this case, and in this pr we are making a more user-friendly one for Default value. So we should have another exception type for this case. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit test to DataSourceV2SQLSuite ### Was this patch authored or co-authored using generative AI tooling? No Closes #50879 from szehon-ho/default_val_deterministic_error. Authored-by: Szehon Ho <szehon.apache@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? "Alter table alter column" to pass in V2 Expression to DSV2. Like the similar changes (#50593) and (#50701), the existing logic is rewritten to use the main Analyzer loop to get this expression, instead of manual call to ResolveDefaultColumns to analyze. We enhance the UpdateColumnValue (TableChanges API) to return DefaultValue (which contains the V2 Expression), in addition to the existing API returning String representation of the default value. ### Why are the changes needed? DSV2 (example, Iceberg/Delta) should get modeled V2 Expression to set default value. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added tests in DataSourceV2DataFrameSuite ### Was this patch authored or co-authored using generative AI tooling? No Closes #50864 from szehon-ho/replace_col_default_val. Authored-by: Szehon Ho <szehon.apache@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Add DSV2 expression to alter table add columm.
This harmonizes with the Create/Replace table default value code path (#50593) and now changes the plan to:
This will then analyze/resolve the user-provided string to an Expression which can neatly be converted to V2Expression.
It also has side effect that the existsDefault will use this as well (instead of being analyzed/optimized by manually calling ResolveDefaultColumns::analyze), hence the original exception flow in that method is now integrated into various other rules in the analyzer phases.
Why are the changes needed?
Data sources implementing DSV2 need to get a V2 expression to interpret the default value.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Add new test in DataSourceV2DataFrameSuite
Was this patch authored or co-authored using generative AI tooling?
No