Skip to content

Conversation

@pull
Copy link

@pull pull bot commented Oct 27, 2022

See Commits and Changes for more details.


Created by pull[bot]

Can you help keep this open source service alive? 💖 Please sponsor : )

amaliujia and others added 4 commits October 27, 2022 09:56
### What changes were proposed in this pull request?

This PR proposes to mark non-public API as package private. E.g. private[connect].

### Why are the changes needed?

This is to control our API surface and don't expose non-public API.

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

UT

Closes #38392 from amaliujia/SPARK-40914.

Authored-by: Rui Wang <rui.wang@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
… to avoid wrong check result when `expectedMessageParameters.size <= 4`

### What changes were proposed in this pull request?
This pr refactor `AnalysisTest#assertAnalysisErrorClass` method to use `e.messageParameters != expectedMessageParameters` instead of `!e.messageParameters.sameElements(expectedMessageParameters)` to avoid wrong check result when `expectedMessageParameters.size <= 4`

### Why are the changes needed?
Avoid wrong check result of  `AnalysisTest#assertAnalysisErrorClass`  when `expectedMessageParameters.size <= 4`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Pass GitHub Actions
- Manual test:

```scala
Welcome to Scala 2.12.17 (OpenJDK 64-Bit Server VM, Java 1.8.0_352).
Type in expressions for evaluation. Or try :help.

scala> :paste
// Entering paste mode (ctrl-D to finish)

val messageParameters = Map(
      "exprName" -> "`window_duration`",
      "valueRange" -> s"(0, 9223372036854775807]",
      "currentValue" -> "-1000000L",
      "sqlExpr" -> "\"window(2016-01-01 01:01:01, -1000000, 1000000, 0)\""
    )
val expectedMessageParameters =  Map(
      "sqlExpr" -> "\"window(2016-01-01 01:01:01, -1000000, 1000000, 0)\"",
      "exprName" -> "`window_duration`",
      "valueRange" -> s"(0, 9223372036854775807]",
      "currentValue" -> "-1000000L"
    )

val tret = !messageParameters.sameElements(expectedMessageParameters)
val fret = messageParameters != expectedMessageParameters

// Exiting paste mode, now interpreting.

messageParameters: scala.collection.immutable.Map[String,String] = Map(exprName -> `window_duration`, valueRange -> (0, 9223372036854775807], currentValue -> -1000000L, sqlExpr -> "window(2016-01-01 01:01:01, -1000000, 1000000, 0)")
expectedMessageParameters: scala.collection.immutable.Map[String,String] = Map(sqlExpr -> "window(2016-01-01 01:01:01, -1000000, 1000000, 0)", exprName -> `window_duration`, valueRange -> (0, 9223372036854775807], currentValue -> -1000000L)
tret: Boolean = true
fret: Boolean = false
```

Closes #38396 from LuciferYang/SPARK-40919.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
…se in Scala side

### What changes were proposed in this pull request?

This PR adds `assume` in the Python test added in #33559.

### Why are the changes needed?

In some testing environment, Python does not exist. This is consistent with other tests in this file. Otherwise, it'd fails as below:

```
        java.lang.RuntimeException: Python availability: [true], pyspark availability: [false]
      at org.apache.spark.sql.IntegratedUDFTestUtils$.pythonFunc$lzycompute(IntegratedUDFTestUtils.scala:192)
      at org.apache.spark.sql.IntegratedUDFTestUtils$.org$apache$spark$sql$IntegratedUDFTestUtils$$pythonFunc(IntegratedUDFTestUtils.scala:172)
      at org.apache.spark.sql.IntegratedUDFTestUtils$TestPythonUDF$$anon$1.<init>(IntegratedUDFTestUtils.scala:337)
      at org.apache.spark.sql.IntegratedUDFTestUtils$TestPythonUDF.udf$lzycompute(IntegratedUDFTestUtils.scala:334)
      at org.apache.spark.sql.IntegratedUDFTestUtils$TestPythonUDF.udf(IntegratedUDFTestUtils.scala:334)
      at org.apache.spark.sql.IntegratedUDFTestUtils$TestPythonUDF.apply(IntegratedUDFTestUtils.scala:359)
      at org.apache.spark.sql.execution.python.PythonUDFSuite.$anonfun$new$11(PythonUDFSuite.scala:105)
...
```

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

Manually checked.

Closes #38407 from HyukjinKwon/SPARK-34265.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Upgrade actions/setup-python to v4

### Why are the changes needed?
- https://github.com/actions/setup-python/releases/tag/v3.0.0: upgrade to node 16
- https://github.com/actions/setup-python/releases/tag/v4.3.0: cleanup setoutput warning

### Does this PR introduce _any_ user-facing change?
No, dev only

### How was this patch tested?
CI passed

Closes #38408 from Yikun/setup-python.

Authored-by: Yikun Jiang <yikunkero@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…ror classes

### What changes were proposed in this pull request?
This pr replace `TypeCheckFailure` by `DataTypeMismatch` in type checks in the time window expressions:

### Why are the changes needed?
Migration onto error classes unifies Spark SQL error messages.

### Does this PR introduce _any_ user-facing change?
Yes. The PR changes user-facing error messages.

### How was this patch tested?

- Pass GitHub Actions
- Manual test:

```
build/sbt "catalyst/testOnly org.apache.spark.sql.catalyst.analysis.AnalysisErrorSuite" -Pscala-2.13
```
passed

Closes #38394 from LuciferYang/SPARK-40759.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
panbingkun and others added 2 commits October 27, 2022 13:47
…s onto error classes

### What changes were proposed in this pull request?
This pr aims to replace TypeCheckFailure by DataTypeMismatch in type checks in the high-order functions expressions, includes:
- 1. ArraySort (2):
https://github.com/apache/spark/blob/1431975723d8df30a25b2333eddcfd0bb6c57677/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala#L403-L407
- 2. ArrayAggregate (1):
https://github.com/apache/spark/blob/1431975723d8df30a25b2333eddcfd0bb6c57677/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala#L807
- 3. MapZipWith (1):
https://github.com/apache/spark/blob/1431975723d8df30a25b2333eddcfd0bb6c57677/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala#L1028

### Why are the changes needed?
Migration onto error classes unifies Spark SQL error messages.

### Does this PR introduce _any_ user-facing change?
Yes. The PR changes user-facing error messages.

### How was this patch tested?
- Update existed UT
- Pass GA.

Closes #38359 from panbingkun/SPARK-40751.

Authored-by: panbingkun <pbk1982@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
…as.read_csv`

### What changes were proposed in this pull request?

as discussed in https://issues.apache.org/jira/browse/SPARK-40922:

> The path argument of `pyspark.pandas.read_csv(path, ...)` currently has type annotation `str` and is documented as
>
>       path : str
>           The path string storing the CSV file to be read.
>The implementation however uses `pyspark.sql.DataFrameReader.csv(path, ...)` which does support multiple paths:
>
>        path : str or list
>            string, or list of strings, for input path(s),
>            or RDD of Strings storing CSV rows.
>

This PR updates the type annotation and documentation of `path` argument of `pyspark.pandas.read_csv`

### Why are the changes needed?

Loading multiple CSV files at once is a useful feature to have and should be documented

### Does this PR introduce _any_ user-facing change?
it documents and existing feature

### How was this patch tested?
No need for tests (so far): only type annotations and docblocks were changed

Closes #38399 from soxofaan/SPARK-40922-pyspark-pandas-read-csv-multiple-paths.

Authored-by: Stefaan Lippens <stefaan.lippens@vito.be>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
vitaliili-db and others added 2 commits October 27, 2022 14:12
### What changes were proposed in this pull request?

Fix for a bug in `Unhex` function when there is an odd number of symbols in the input string.

### Why are the changes needed?

`Unhex` function and other functions depending on it (e.g. `ToBinary`) produce incorrect output.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit tests

Closes #38402 from vitaliili-db/unhex.

Authored-by: Vitalii Li <vitalii.li@databricks.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
…supportedOperationException

### What changes were proposed in this pull request?
This pr aims to replace UnsupportedOperationException with SparkUnsupportedOperationException.

### Why are the changes needed?
1.When I work on https://issues.apache.org/jira/browse/SPARK-40889,
I found `QueryExecutionErrors.unsupportedPartitionTransformError` throw **UnsupportedOperationException**(but not **SparkUnsupportedOperationException**), it seem not to fit into the new error framework.
https://github.com/apache/spark/blob/a27b459be3ca2ad2d50b9d793b939071ca2270e2/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala#L71-L72

2.`QueryExecutionErrors.unsupportedPartitionTransformError` throw SparkUnsupportedOperationException, but UT catch `UnsupportedOperationException`.
https://github.com/apache/spark/blob/a27b459be3ca2ad2d50b9d793b939071ca2270e2/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala#L288-L301

https://github.com/apache/spark/blob/a27b459be3ca2ad2d50b9d793b939071ca2270e2/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala#L904-L909

https://github.com/apache/spark/blob/a27b459be3ca2ad2d50b9d793b939071ca2270e2/core/src/main/scala/org/apache/spark/SparkException.scala#L144-L154

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existed UT.

Closes #38387 from panbingkun/replace_UnsupportedOperationException.

Authored-by: panbingkun <pbk1982@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
@pull pull bot merged commit e0f4410 into huangxiaopingRD:master Oct 27, 2022
pull bot pushed a commit that referenced this pull request Feb 24, 2024
…n properly

### What changes were proposed in this pull request?
Make `ResolveRelations` handle plan id properly

### Why are the changes needed?
bug fix for Spark Connect, it won't affect classic Spark SQL

before this PR:
```
from pyspark.sql import functions as sf

spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1")
spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2")

df1 = spark.read.table("test_table_1")
df2 = spark.read.table("test_table_2")
df3 = spark.read.table("test_table_1")

join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2)
join2 = df3.join(join1, how="left", on=join1.index==df3.id)

join2.schema
```

fails with
```
AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704
```

That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect

```
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ===
 '[#12]Join LeftOuter, '`==`('index, 'id)                     '[#12]Join LeftOuter, '`==`('index, 'id)
!:- '[#9]UnresolvedRelation [test_table_1], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!+- '[#11]Project ['index, 'value_2]                          :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!   +- '[#10]Join Inner, '`==`('id, 'index)                   +- '[#11]Project ['index, 'value_2]
!      :- '[#7]UnresolvedRelation [test_table_1], [], false      +- '[#10]Join Inner, '`==`('id, 'index)
!      +- '[#8]UnresolvedRelation [test_table_2], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!                                                                   :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!                                                                   +- '[#8]SubqueryAlias spark_catalog.default.test_table_2
!                                                                      +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false

Can not resolve 'id with plan 7
```

`[#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one
```
:- '[#9]SubqueryAlias spark_catalog.default.test_table_1
   +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
```

### Does this PR introduce _any_ user-facing change?
yes, bug fix

### How was this patch tested?
added ut

### Was this patch authored or co-authored using generative AI tooling?
ci

Closes apache#45214 from zhengruifeng/connect_fix_read_join.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
pull bot pushed a commit that referenced this pull request Aug 19, 2025
…onicalized expressions

### What changes were proposed in this pull request?

Make PullOutNonDeterministic use canonicalized expressions to dedup group and  aggregate expressions. This affects pyspark udfs in particular. Example:

```
from pyspark.sql.functions import col, avg, udf

pythonUDF = udf(lambda x: x).asNondeterministic()

spark.range(10)\
.selectExpr("id", "id % 3 as value")\
.groupBy(pythonUDF(col("value")))\
.agg(avg("id"), pythonUDF(col("value")))\
.explain(extended=True)
```

Currently results in a plan like this:

```
Aggregate [_nondeterministic#15](#15), [_nondeterministic#15 AS dummyNondeterministicUDF(value)#12, avg(id#0L) AS avg(id)#13, dummyNondeterministicUDF(value#6L)#8 AS dummyNondeterministicUDF(value)#14](#15%20AS%20dummyNondeterministicUDF(value)#12,%20avg(id#0L)%20AS%20avg(id)#13,%20dummyNondeterministicUDF(value#6L)#8%20AS%20dummyNondeterministicUDF(value)#14)
+- Project [id#0L, value#6L, dummyNondeterministicUDF(value#6L)#7 AS _nondeterministic#15](#0L,%20value#6L,%20dummyNondeterministicUDF(value#6L)#7%20AS%20_nondeterministic#15)
   +- Project [id#0L, (id#0L % cast(3 as bigint)) AS value#6L](#0L,%20(id#0L%20%%20cast(3%20as%20bigint))%20AS%20value#6L)
      +- Range (0, 10, step=1, splits=Some(2))
```

and then it throws:

```
[[MISSING_AGGREGATION] The non-aggregating expression "value" is based on columns which are not participating in the GROUP BY clause. Add the columns or the expression to the GROUP BY, aggregate the expression, or use "any_value(value)" if you do not care which of the values within a group is returned. SQLSTATE: 42803
```

- how canonicalized fixes this:
  -  nondeterministic PythonUDF expressions always have distinct resultIds per udf
  - The fix is to canonicalize the expressions when matching. Canonicalized means that we're setting the resultIds to -1, allowing us to dedup the PythonUDF expressions.
- for deterministic UDFs, this rule does not apply and "Post Analysis" batch extracts and deduplicates the expressions, as expected

### Why are the changes needed?

- the output of the query with the fix applied still makes sense - the nondeterministic UDF is invoked only once, in the project.

### Does this PR introduce _any_ user-facing change?

Yes, it's additive, it enables queries to run that previously threw errors.

### How was this patch tested?

- added unit test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52061 from benrobby/adhoc-fix-pull-out-nondeterministic.

Authored-by: Ben Hurdelhey <ben.hurdelhey@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
huangxiaopingRD pushed a commit that referenced this pull request Sep 2, 2025
…plan properly

### What changes were proposed in this pull request?
Make `ResolveRelations` handle plan id properly

cherry-pick bugfix apache#45214 to 3.5

### Why are the changes needed?
bug fix for Spark Connect, it won't affect classic Spark SQL

before this PR:
```
from pyspark.sql import functions as sf

spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1")
spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2")

df1 = spark.read.table("test_table_1")
df2 = spark.read.table("test_table_2")
df3 = spark.read.table("test_table_1")

join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2)
join2 = df3.join(join1, how="left", on=join1.index==df3.id)

join2.schema
```

fails with
```
AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704
```

That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect

```
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ===
 '[#12]Join LeftOuter, '`==`('index, 'id)                     '[#12]Join LeftOuter, '`==`('index, 'id)
!:- '[#9]UnresolvedRelation [test_table_1], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!+- '[#11]Project ['index, 'value_2]                          :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!   +- '[#10]Join Inner, '`==`('id, 'index)                   +- '[#11]Project ['index, 'value_2]
!      :- '[#7]UnresolvedRelation [test_table_1], [], false      +- '[#10]Join Inner, '`==`('id, 'index)
!      +- '[#8]UnresolvedRelation [test_table_2], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!                                                                   :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!                                                                   +- '[#8]SubqueryAlias spark_catalog.default.test_table_2
!                                                                      +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false

Can not resolve 'id with plan 7
```

`[#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one
```
:- '[#9]SubqueryAlias spark_catalog.default.test_table_1
   +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
```

### Does this PR introduce _any_ user-facing change?
yes, bug fix

### How was this patch tested?
added ut

### Was this patch authored or co-authored using generative AI tooling?
ci

Closes apache#46291 from zhengruifeng/connect_fix_read_join_35.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
huangxiaopingRD pushed a commit that referenced this pull request Sep 2, 2025
…plan properly

### What changes were proposed in this pull request?
Make `ResolveRelations` handle plan id properly

cherry-pick bugfix apache#45214 to 3.4

### Why are the changes needed?
bug fix for Spark Connect, it won't affect classic Spark SQL

before this PR:
```
from pyspark.sql import functions as sf

spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1")
spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2")

df1 = spark.read.table("test_table_1")
df2 = spark.read.table("test_table_2")
df3 = spark.read.table("test_table_1")

join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2)
join2 = df3.join(join1, how="left", on=join1.index==df3.id)

join2.schema
```

fails with
```
AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704
```

That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect

```
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ===
 '[#12]Join LeftOuter, '`==`('index, 'id)                     '[#12]Join LeftOuter, '`==`('index, 'id)
!:- '[#9]UnresolvedRelation [test_table_1], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!+- '[#11]Project ['index, 'value_2]                          :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!   +- '[#10]Join Inner, '`==`('id, 'index)                   +- '[#11]Project ['index, 'value_2]
!      :- '[#7]UnresolvedRelation [test_table_1], [], false      +- '[#10]Join Inner, '`==`('id, 'index)
!      +- '[#8]UnresolvedRelation [test_table_2], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!                                                                   :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!                                                                   +- '[#8]SubqueryAlias spark_catalog.default.test_table_2
!                                                                      +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false

Can not resolve 'id with plan 7
```

`[#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one
```
:- '[#9]SubqueryAlias spark_catalog.default.test_table_1
   +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
```

### Does this PR introduce _any_ user-facing change?
yes, bug fix

### How was this patch tested?
added ut

### Was this patch authored or co-authored using generative AI tooling?
ci

Closes apache#46290 from zhengruifeng/connect_fix_read_join_34.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants