forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 1
[pull] master from apache:master #41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
### What changes were proposed in this pull request?
Make `DataFrame.select` support `a.*`
### Why are the changes needed?
bugfix:
```
Traceback (most recent call last):
File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/tests/connect/test_connect_basic.py", line 1377, in test_select_star
cdf.select("a", "b.*").collect(),
File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/dataframe.py", line 1305, in collect
table = self._session.client.to_table(query)
File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/client.py", line 445, in to_table
table, _ = self._execute_and_fetch(req)
File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/client.py", line 639, in _execute_and_fetch
self._handle_error(rpc_error)
File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/client.py", line 675, in _handle_error
raise SparkConnectAnalysisException(
pyspark.errors.exceptions.SparkConnectAnalysisException: [FIELD_NOT_FOUND] No such struct field `*` in `c`, `d`.
```
### Does this PR introduce _any_ user-facing change?
yes
### How was this patch tested?
added ut
Closes #39934 from zhengruifeng/connect_select_star.
Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
… JoinType string ### What changes were proposed in this pull request? standardize the JoinType string be consistent with PySpark https://github.com/apache/spark/blob/05c0fa573881b49d8ead9a5e16071190e5841e1b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala#L25 ### Why are the changes needed? ``` >>> df = spark.range(1) >>> df2 = spark.range(2) >>> df.join(df2, how="left_outer") Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/Users/xinrong.meng/spark/python/pyspark/sql/connect/dataframe.py", line 438, in join plan.Join(left=self._plan, right=other._plan, on=on, how=how), File "/Users/xinrong.meng/spark/python/pyspark/sql/connect/plan.py", line 730, in _init_ raise NotImplementedError( NotImplementedError: Unsupported join type: left_outer. Supported join types include: "inner", "outer", "full", "fullouter", "full_outer", "leftouter", "left", "left_outer", "rightouter", "right", "right_outer", "leftsemi", "left_semi", "semi", "leftanti", "left_anti", "anti", "cross", ``` ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? updated UT Closes #39938 from zhengruifeng/connect_join_types. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request? `CreateDataFrame` should accept objects ### Why are the changes needed? for parity ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? enabled UT and added UT Closes #39939 from zhengruifeng/connect_createDF_objects. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
…ilter_on_top_of_outer_join ` ### What changes were proposed in this pull request? Enable `test_udf_in_filter_on_top_of_outer_join ` ### Why are the changes needed? for test coverage ### Does this PR introduce _any_ user-facing change? no, test-only ### How was this patch tested? enabled test Closes #39940 from zhengruifeng/connect_enable_42267. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
### What changes were proposed in this pull request? Introduces base hierarchy to exceptions. As a common hierarchy for users, base exception classes are subclasses of `PySparkException`. The concrete classes for both PySpark and Spark Connect inherits the base classes that should not be exposed to users. ### Why are the changes needed? Currently exception class hierarchy is separated between PySpark and Spark Connect. If users want to check the exception type, they need to switch the error classes based on whether they are running on PySpark or Spark Connect, but it's not ideal. ### Does this PR introduce _any_ user-facing change? No. Users still can use the existing exception classes to check the exception type. ### How was this patch tested? Updated tests. Closes #39882 from ueshin/issues/SPARK-42342/exceptions. Authored-by: Takuya UESHIN <ueshin@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…tency ### What changes were proposed in this pull request? This PR follow-ups for #39815 to fix error messages that's not meet the consistency with other error messages. ### Why are the changes needed? To keep the consistency across all error messages. ### Does this PR introduce _any_ user-facing change? No, error message changes. ### How was this patch tested? The existing CI should pass. Closes #39935 from itholic/SPARK-42244-followup. Authored-by: itholic <haejoon.lee@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request? This is a followup of #39610 . The non-greedy mode does not work well if the ending class name appears more than once, like `file://myPath/.../clsName/.../clsName`. We should still use greedy mode, but to match any chars that are not space or comma (comma is used to combine multiple paths in `FileIndex.toString`). ### Why are the changes needed? make the test framework more stable ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? N/A Closes #39924 from cloud-fan/minor. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request? This PR proposes to assign name to _LEGACY_ERROR_TEMP_1326, "CANNOT_MODIFY_CONFIG". ### Why are the changes needed? We should assign proper name to _LEGACY_ERROR_TEMP_* ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? `./build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*"` Closes #39873 from itholic/LEGACY_1326. Authored-by: itholic <haejoon.lee@databricks.com> Signed-off-by: Max Gekk <max.gekk@gmail.com>
…tement for JDBC dialect
### What changes were proposed in this pull request?
Currently, JDBCRDD uses fixed format for SELECT statement.
```
val sqlText = options.prepareQuery +
s"SELECT $columnList FROM ${options.tableOrQuery} $myTableSampleClause" +
s" $myWhereClause $getGroupByClause $getOrderByClause $myLimitClause $myOffsetClause"
```
But some databases have different syntax. For example, MS SQL Server uses keyword TOP to describe LIMIT clause or Top N.
The LIMIT clause of MS SQL Server show below.
```
SELECT TOP(1) Model, Color, Price
FROM dbo.Cars
WHERE Color = 'blue'
```
The Top N of MS SQL Server show below.
```
SELECT TOP(1) Model, Color, Price
FROM dbo.Cars
WHERE Color = 'blue'
ORDER BY Price ASC
```
This PR lets JDBC dialect could define their own syntax.
### Why are the changes needed?
Extract the function that construct the select statement for JDBC dialect.
### Does this PR introduce _any_ user-facing change?
'No'.
New feature.
### How was this patch tested?
N/A
Closes #39667 from beliefer/SPARK-42131.
Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…PRECISION_EXCEEDS_MAX_PRECISION`
### What changes were proposed in this pull request?
This PR proposes to integrate `_LEGACY_ERROR_TEMP_1229` into `DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION`.
**_LEGACY_ERROR_TEMP_1229**
```json
"_LEGACY_ERROR_TEMP_1229" : {
"message" : [
"<decimalType> can only support precision up to <precision>."
]
},
```
**DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION**
```json
"DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION" : {
"message" : [
"Decimal precision <precision> exceeds max precision <maxPrecision>."
],
"sqlState" : "22003"
},
```
### Why are the changes needed?
We should assign proper name to _LEGACY_ERROR_TEMP_*
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
`./build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*"`
Closes #39875 from itholic/LEGACY_1229.
Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
### What changes were proposed in this pull request? This PR proposes to assign name to _LEGACY_ERROR_TEMP_2127, "DUPLICATED_MAP_KEY". ### Why are the changes needed? We should assign proper name to _LEGACY_ERROR_TEMP_* ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? `./build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*` Closes #39890 from itholic/LEGACY_2127. Lead-authored-by: itholic <haejoon.lee@databricks.com> Co-authored-by: Haejoon Lee <44108233+itholic@users.noreply.github.com> Signed-off-by: Max Gekk <max.gekk@gmail.com>
…23|2125) ### What changes were proposed in this pull request? This PR proposes to assign name to _LEGACY_ERROR_TEMP_2123 and _LEGACY_ERROR_TEMP_2125, "CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE". ### Why are the changes needed? We should assign proper name to _LEGACY_ERROR_TEMP_* ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? `./build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*` Closes #39891 from itholic/LEGACY_2125. Authored-by: itholic <haejoon.lee@databricks.com> Signed-off-by: Max Gekk <max.gekk@gmail.com>
…ers set it explicitly in CSV dataSource ### What changes were proposed in this pull request? Pass the comment option through to univocity if users set it explicitly in CSV dataSource. ### Why are the changes needed? In #29516 , in order to fix some bugs, univocity-parsers was upgrade from 2.8.3 to 2.9.0, it also involved a new feature of univocity-parsers that quoting values of the first column that start with the comment character. It made a breaking for users downstream that handing a whole row as input. Before this change: #abc,1 After this change: "#abc",1 We change the related `isCommentSet` check logic to enable users to keep behavior as before. ### Does this PR introduce _any_ user-facing change? Yes, a little. If users set comment option as '\u0000' explicitly, now they should remove it to keep comment option unset. ### How was this patch tested? Add a full new test. Closes #39878 from wayneguow/comment. Authored-by: wayneguow <guow93@gmail.com> Signed-off-by: Sean Owen <srowen@gmail.com>
…ileManager.exists ### What changes were proposed in this pull request? This PR proposes to use FileSystem.exists in FileSystemBasedCheckpointFileManager.exists, which is consistent with other methods in FileSystemBasedCheckpointFileManager. This PR also removes the test case QueryExecutionErrorsSuite.FAILED_RENAME_PATH: rename when destination path already exists because the test relies on incorrect custom file system instance with non-symmetric implementation between `FileSystemBasedCheckpointFileManager.exists` vs `FileSystem.exists`. (See detailed explanation from #39936 (comment)) ### Why are the changes needed? Other methods in FileSystemBasedCheckpointFileManager already uses FileSystem.exists for all cases checking existence of the path. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #39936 from HeartSaVioR/MINOR-FileSystemBasedCheckpointFileManager-calls-fs-exists-in-exists. Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
…onfiguration As requested by HyukjinKwon in #38312 NB: This change needs to be backported ### What changes were proposed in this pull request? Update version set for "spark.sql.legacy.parquet.nanosAsLong" configuration in SqlConf. This update is required because the previous PR set version to `3.2.3` which has already been released. Updating to version `3.2.4` will correctly reflect when this configuration element was added ### Why are the changes needed? Correctness and to complete SPARK-40819 ### Does this PR introduce _any_ user-facing change? No, this is merely so this configuration element has the correct version ### How was this patch tested? N/A Closes #39943 from awdavidson/SPARK-40819_sql-conf. Authored-by: awdavidson <54780428+awdavidson@users.noreply.github.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…chema mismatch
### What changes were proposed in this pull request?
Improve the error messages when a Python method provided to `DataFrame.groupby(...).applyInPandas` / `DataFrame.groupby(...).cogroup(...).applyInPandas` returns a Pandas DataFrame that does not match the expected schema.
With
```Python
gdf = spark.range(2).join(spark.range(3).withColumnRenamed("id", "val")).groupby("id")
```
**Mismatching column names, matching number of columns:**
```Python
gdf.applyInPandas(lambda pdf: pdf.rename(columns={"val": "v"}), "id long, val long").show()
# was: KeyError: 'val'
# now: RuntimeError: Column names of the returned pandas.DataFrame do not match specified schema.
# Missing: val Unexpected: v
```
**Mismatching column names, different number of columns:**
```Python
gdf.applyInPandas(lambda pdf: pdf.assign(foo=[3, 3, 3]).rename(columns={"val": "v"}), "id long, val long").show()
# was: RuntimeError: Number of columns of the returned pandas.DataFrame doesn't match specified schema.
# Expected: 2 Actual: 3
# now: RuntimeError: Column names of the returned pandas.DataFrame do not match specified schema.
# Missing: val Unexpected: foo, v
```
**Expected schema matches but has duplicates (`id`) so that number of columns match:**
```Python
gdf.applyInPandas(lambda pdf: pdf.rename(columns={"val": "v"}), "id long, id long").show()
# was: java.lang.IllegalArgumentException: not all nodes and buffers were consumed.
# nodes: [ArrowFieldNode [length=3, nullCount=0]]
# buffers: [ArrowBuf[304], address:139860828549160, length:0, ArrowBuf[305], address:139860828549160, length:24]
# now: RuntimeError: Column names of the returned pandas.DataFrame do not match specified schema.
# Unexpected: v
```
**In case the returned Pandas DataFrame contains no column names (none of the column labels is a string):**
```Python
gdf.applyInPandas(lambda pdf: pdf.assign(foo=[3, 3, 3]).rename(columns={"id": 0, "val": 1, "foo": 2}), "id long, val long").show()
# was: RuntimeError: Number of columns of the returned pandas.DataFrame doesn't match specified schema.
# Expected: 2 Actual: 3
# now: RuntimeError: Number of columns of the returned pandas.DataFrame doesn't match specified schema.
# Expected: 2 Actual: 3
```
**Mismatching types (ValueError and TypeError):**
```Python
gdf.applyInPandas(lambda pdf: pdf, "id int, val string").show()
# was: pyarrow.lib.ArrowTypeError: Expected a string or bytes dtype, got int64
# now: pyarrow.lib.ArrowTypeError: Expected a string or bytes dtype, got int64
# The above exception was the direct cause of the following exception:
# TypeError: Exception thrown when converting pandas.Series (int64) with name 'val' to Arrow Array (string).
gdf.applyInPandas(lambda pdf: pdf.assign(val=pdf["val"].apply(str)), "id int, val double").show()
# was: pyarrow.lib.ArrowInvalid: Could not convert '0' with type str: tried to convert to double
# now: pyarrow.lib.ArrowInvalid: Could not convert '0' with type str: tried to convert to double
# The above exception was the direct cause of the following exception:
# ValueError: Exception thrown when converting pandas.Series (object) with name 'val' to Arrow Array (double).
with self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": safely}):
gdf.applyInPandas(lambda pdf: pdf.assign(val=pdf["val"].apply(str)), "id int, val double").show()
# was: ValueError: Exception thrown when converting pandas.Series (object) to Arrow Array (double).
# It can be caused by overflows or other unsafe conversions warned by Arrow. Arrow safe type check can be disabled
# by using SQL config `spark.sql.execution.pandas.convertToArrowArraySafely`.
# now: ValueError: Exception thrown when converting pandas.Series (object) with name 'val' to Arrow Array (double).
# It can be caused by overflows or other unsafe conversions warned by Arrow. Arrow safe type check can be disabled
# by using SQL config `spark.sql.execution.pandas.convertToArrowArraySafely`.
```
### Why are the changes needed?
Existing errors are generic (`KeyError`) or meaningless (`not all nodes and buffers were consumed`). The errors should help users in spotting the mismatching columns by naming them.
The schema of the returned Pandas DataFrames can only be checked during processing the DataFrame, so such errors are very expensive. Therefore, they should be expressive.
### Does this PR introduce _any_ user-facing change?
This only changes error messages, not behaviour.
### How was this patch tested?
Tests all cases of schema mismatch for `GroupedData.applyInPandas` and `PandasCogroupedOps.applyInPandas`.
Closes #38223 from EnricoMi/branch-pyspark-apply-in-pandas-schema-mismatch.
Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request? This pr just a trivial fix, replcace `get().getOrElse` with `getOrElse` ### Why are the changes needed? Code simplification ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions Closes #39893 from LuciferYang/getOrElse. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request? This pr aims upgrade maven plugins including: - maven-checkstyle-plugin: from 3.2.0 to 3.2.1 - maven-clean-plugin: from 3.1.0 to 3.2.0 - maven-dependency-plugin: from 3.3.0 to 3.5.0 - maven-source-plugin: from 3.1.0 to 3.2.1 - maven-surefire-plugin: from 3.0.0-M7 to 3.0.0-M8 - maven-jar-plugin: from 3.2.2 to 3.3.0 ### Why are the changes needed? - maven-checkstyle-plugin apache/maven-checkstyle-plugin@maven-checkstyle-plugin-3.2.0...maven-checkstyle-plugin-3.2.1 - maven-clean-plugin 3.2.0 include a new feature: apache/maven-clean-plugin#6 https://github.com/apache/maven-clean-plugin/releases/tag/maven-clean-plugin-3.2.0 - maven-dependency-plugin apache/maven-dependency-plugin@maven-dependency-plugin-3.3.0...maven-dependency-plugin-3.5.0 - maven-source-plugin apache/maven-source-plugin@maven-source-plugin-3.1.0...maven-source-plugin-3.2.1 - maven-surefire-plugin apache/maven-surefire@surefire-3.0.0-M7...surefire-3.0.0-M8 - maven-jar-plugin https://github.com/apache/maven-jar-plugin/releases/tag/maven-jar-plugin-3.3.0 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GitHub Actions - Manually check these plugins work normally Closes #39899 from LuciferYang/SPARK-42355. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request? This pr aims upgrade RoaringBitmap 0.9.39 ### Why are the changes needed? This version bring a bug fix: - RoaringBitmap/RoaringBitmap#614 other changes as follows: RoaringBitmap/RoaringBitmap@0.9.38...0.9.39 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions Closes #39948 from LuciferYang/SPARK-42385. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…UDFs
### What changes were proposed in this pull request?
Standardize registered pickled Python UDFs, specifically, implement `spark.udf.register()`.
### Why are the changes needed?
To reach parity with vanilla PySpark.
### Does this PR introduce _any_ user-facing change?
Yes. `spark.udf.register()` is added as shown below:
```py
>>> spark.udf
<pyspark.sql.connect.udf.UDFRegistration object at 0x7fbca0077dc0>
>>> f = spark.udf.register("f", lambda x: x+1, "int")
>>> f
<function <lambda> at 0x7fbc905e5e50>
>>> spark.sql("SELECT f(id) FROM range(2)").collect()
[Row(f(id)=1), Row(f(id)=2)]
```
### How was this patch tested?
Unit tests.
Closes #39860 from xinrong-meng/connect_registered_udf.
Lead-authored-by: Xinrong Meng <xinrong@apache.org>
Co-authored-by: Xinrong Meng <xinrong.apache@gmail.com>
Signed-off-by: Xinrong Meng <xinrong@apache.org>
### What changes were proposed in this pull request? Output the cause in the `diagnoseCorruption` method. ### Why are the changes needed? It is convenient to collect the reason of shuffle corruption from the shuffle service Log deployed by YARN Nodemanager. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? exist UT Closes #39918 from cxzl25/SPARK-42366. Authored-by: sychen <sychen@ctrip.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
…int directory ### What changes were proposed in this pull request? When RocksDB version.zip file get overwritten (e.g. concurrent task execution, task/stage/batch reattempts) or the zip file don't get uploaded successfully, the associated sst and log files don't get garbage collected. We can clean up these SST files during periodic state store maintenance. The major concern is that sst files for ongoing version also appear to be "orphan" because they are uploaded before zip file, we have to be careful not to delete them. To be conservative, we only delete orphan files older than all files tracked by current metadata when there are at least 2 versions. ### Why are the changes needed? When RocksDB version.zip file get overwritten (e.g. concurrent task execution, task/stage/batch reattempts) or the zip file don't get uploaded successfully, the associated sst and log files don't get garbage collected.([https://github.com/databricks/runtime/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala](https://github.com/databricks/runtime/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala#L305-L309)) These files consume storage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add unit test. Closes #39897 from chaoqin-li1123/orphan_cleanup. Authored-by: Chaoqin Li <chaoqin.li@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
…per exception in the Python client ### What changes were proposed in this pull request? Adds details to non-fatal errors to raise a proper exception in the Python client, which makes `df.sample` raise `IllegalArgumentException` as same as PySpark, except for the timing that is delayed to call actions. ### Why are the changes needed? Currently `SparkConnectService` does not add details for `NonFatal` exceptions to the `PRCStatus`, so the Python client can't detect the exception properly and raises `SparkConnectGrpcException` instead. It also should have the details for the Python client. ### Does this PR introduce _any_ user-facing change? Users will see a proper exception when they call `df.sample` with illegal arguments, but in a different timing. ### How was this patch tested? Enabled `DataFrameParityTests.test_sample`. Closes #39957 from ueshin/issues/SPARK-42338/sample. Authored-by: Takuya UESHIN <ueshin@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…n throwing an exception ### What changes were proposed in this pull request? This PR proposes to take the super classes into account when throwing an exception from the server to Python side by adding more metadata of classes, causes and traceback in JVM. In addition, this PR matches the exceptions being thrown to the regular PySpark exceptions defined: https://github.com/apache/spark/blob/04550edd49ee587656d215e59d6a072772d7d5ec/python/pyspark/errors/exceptions/captured.py#L108-L147 ### Why are the changes needed? Right now, many exceptions cannot be handled (e.g., `NoSuchDatabaseException` that inherits `AnalysisException`) in Python side. ### Does this PR introduce _any_ user-facing change? No to end users. Yes, it matches the exceptions to the regular PySpark exceptions. ### How was this patch tested? Unittests fixed. Closes #39947 from HyukjinKwon/SPARK-41715. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request? This patch updates the API doc of `setTimeoutTimestamp` of `GroupState`. ### Why are the changes needed? Update incorrect API doc. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Doc change only. Closes #39958 from viirya/fix_group_state. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
…to connect server module shade configuration
### What changes were proposed in this pull request?
This pr aims add `ServicesResourceTransformer` rule to connect server module shade configuration to make sure `grpc.ManagedChannelProvider` and `grpc.ServerProvider` can be used in server side.
### Why are the changes needed?
Keep `grpc.ManagedChannelProvider` and `grpc.ServerProvider` and other spi usable after grpc being shaded, sbt doesn't need to be fixed because `sbt-assembly` does this by default.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- Pass Github Actions
- Manual test, build a spark client and do as follows:
```
bin/spark-shell --jars spark-connect_2.12-3.5.0-SNAPSHOT.jar --driver-class-path spark-connect_2.12-3.5.0-SNAPSHOT.jar --conf spark.plugins=org.apache.spark.sql.connect.SparkConnectPlugin --conf spark.connect.grpc.binding.port=15102
```
then run some code in spark-shell
Before
```scala
23/02/01 20:44:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://localhost:4040
Spark context available as 'sc' (master = local[*], app id = local-1675255501816).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.5.0-SNAPSHOT
/_/
Using Scala version 2.12.17 (OpenJDK 64-Bit Server VM, Java 11.0.17)
Type in expressions to have them evaluated.
Type :help for more information.
scala> org.sparkproject.connect.grpc.ServerProvider.provider
org.sparkproject.connect.grpc.ManagedChannelProvider$ProviderNotFoundException: No functional server found. Try adding a dependency on the grpc-netty or grpc-netty-shaded artifact
at org.sparkproject.connect.grpc.ServerProvider.provider(ServerProvider.java:44)
... 47 elided
scala> org.sparkproject.connect.grpc.ManagedChannelProvider.provider
org.sparkproject.connect.grpc.ManagedChannelProvider$ProviderNotFoundException: No functional channel service provider found. Try adding a dependency on the grpc-okhttp, grpc-netty, or grpc-netty-shaded artifact
at org.sparkproject.connect.grpc.ManagedChannelProvider.provider(ManagedChannelProvider.java:45)
... 47 elided
```
After
```scala
23/02/01 21:00:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://localhost:4040
Spark context available as 'sc' (master = local[*], app id = local-1675256417224).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.5.0-SNAPSHOT
/_/
Using Scala version 2.12.17 (OpenJDK 64-Bit Server VM, Java 11.0.17)
Type in expressions to have them evaluated.
Type :help for more information.
scala> org.sparkproject.connect.grpc.ManagedChannelProvider.provider
res0: org.sparkproject.connect.grpc.ManagedChannelProvider = org.sparkproject.connect.grpc.netty.NettyChannelProvider68aa505b
scala> org.sparkproject.connect.grpc.ServerProvider.provider
res2: org.sparkproject.connect.grpc.ServerProvider = org.sparkproject.connect.grpc.netty.NettyServerProvider4a5d8ae4
```
Closes #39848 from LuciferYang/SPARK-42276.
Lead-authored-by: yangjie01 <yangjie01@baidu.com>
Co-authored-by: YangJie <yangjie01@baidu.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
pull bot
pushed a commit
that referenced
this pull request
Jul 21, 2025
…ingBuilder` ### What changes were proposed in this pull request? This PR aims to improve `toString` by `JEP-280` instead of `ToStringBuilder`. In addition, `Scalastyle` and `Checkstyle` rules are added to prevent a future regression. ### Why are the changes needed? Since Java 9, `String Concatenation` has been handled better by default. | ID | DESCRIPTION | | - | - | | JEP-280 | [Indify String Concatenation](https://openjdk.org/jeps/280) | For example, this PR improves `OpenBlocks` like the following. Both Java source code and byte code are simplified a lot by utilizing JEP-280 properly. **CODE CHANGE** ```java - return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) - .append("appId", appId) - .append("execId", execId) - .append("blockIds", Arrays.toString(blockIds)) - .toString(); + return "OpenBlocks[appId=" + appId + ",execId=" + execId + ",blockIds=" + + Arrays.toString(blockIds) + "]"; ``` **BEFORE** ``` public java.lang.String toString(); Code: 0: new #39 // class org/apache/commons/lang3/builder/ToStringBuilder 3: dup 4: aload_0 5: getstatic #41 // Field org/apache/commons/lang3/builder/ToStringStyle.SHORT_PREFIX_STYLE:Lorg/apache/commons/lang3/builder/ToStringStyle; 8: invokespecial #47 // Method org/apache/commons/lang3/builder/ToStringBuilder."<init>":(Ljava/lang/Object;Lorg/apache/commons/lang3/builder/ToStringStyle;)V 11: ldc #50 // String appId 13: aload_0 14: getfield #7 // Field appId:Ljava/lang/String; 17: invokevirtual #51 // Method org/apache/commons/lang3/builder/ToStringBuilder.append:(Ljava/lang/String;Ljava/lang/Object;)Lorg/apache/commons/lang3/builder/ToStringBuilder; 20: ldc #55 // String execId 22: aload_0 23: getfield #13 // Field execId:Ljava/lang/String; 26: invokevirtual #51 // Method org/apache/commons/lang3/builder/ToStringBuilder.append:(Ljava/lang/String;Ljava/lang/Object;)Lorg/apache/commons/lang3/builder/ToStringBuilder; 29: ldc #56 // String blockIds 31: aload_0 32: getfield #16 // Field blockIds:[Ljava/lang/String; 35: invokestatic #57 // Method java/util/Arrays.toString:([Ljava/lang/Object;)Ljava/lang/String; 38: invokevirtual #51 // Method org/apache/commons/lang3/builder/ToStringBuilder.append:(Ljava/lang/String;Ljava/lang/Object;)Lorg/apache/commons/lang3/builder/ToStringBuilder; 41: invokevirtual #61 // Method org/apache/commons/lang3/builder/ToStringBuilder.toString:()Ljava/lang/String; 44: areturn ``` **AFTER** ``` public java.lang.String toString(); Code: 0: aload_0 1: getfield #7 // Field appId:Ljava/lang/String; 4: aload_0 5: getfield #13 // Field execId:Ljava/lang/String; 8: aload_0 9: getfield #16 // Field blockIds:[Ljava/lang/String; 12: invokestatic #39 // Method java/util/Arrays.toString:([Ljava/lang/Object;)Ljava/lang/String; 15: invokedynamic #43, 0 // InvokeDynamic #0:makeConcatWithConstants:(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String; 20: areturn ``` ### Does this PR introduce _any_ user-facing change? No. This is an `toString` implementation improvement. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#51572 from dongjoon-hyun/SPARK-52880. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
pull bot
pushed a commit
that referenced
this pull request
Nov 1, 2025
### What changes were proposed in this pull request? This PR proposes to add `doCanonicalize` function for DataSourceV2ScanRelation. The implementation is similar to [the one in BatchScanExec](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L150), as well as the [the one in LogicalRelation](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L52). ### Why are the changes needed? Query optimization rules such as MergeScalarSubqueries check if two plans are identical by [comparing their canonicalized form](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala#L219). For DSv2, for physical plan, the canonicalization goes down in the child hierarchy to the BatchScanExec, which [has a doCanonicalize function](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L150); for logical plan, the canonicalization goes down to the DataSourceV2ScanRelation, which, however, does not have a doCanonicalize function. As a result, two logical plans who are semantically identical are not identified. Moreover, for reference, [DSv1 LogicalRelation](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L52) also has `doCanonicalize()`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? A new unit test is added to show that `MergeScalarSubqueries` is working for DataSourceV2ScanRelation. For a query ```sql select (select max(i) from df) as max_i, (select min(i) from df) as min_i ``` Before introducing the canonicalization, the plan is ``` == Parsed Logical Plan == 'Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- 'Project [unresolvedalias('max('i))] : : +- 'UnresolvedRelation [df], [], false : +- 'Project [unresolvedalias('min('i))] : +- 'UnresolvedRelation [df], [], false +- OneRowRelation == Analyzed Logical Plan == max_i: int, min_i: int Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- Aggregate [max(i#0) AS max(i)#7] : : +- SubqueryAlias df : : +- View (`df`, [i#0, j#1]) : : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 : +- Aggregate [min(i#10) AS min(i)#9] : +- SubqueryAlias df : +- View (`df`, [i#10, j#11]) : +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 +- OneRowRelation == Optimized Logical Plan == Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- Aggregate [max(i#0) AS max(i)#7] : : +- Project [i#0] : : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 : +- Aggregate [min(i#10) AS min(i)#9] : +- Project [i#10] : +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 +- OneRowRelation == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 0 +- *(1) Project [Subquery subquery#2, [id=#32] AS max_i#3, Subquery subquery#4, [id=#33] AS min_i#5] : :- Subquery subquery#2, [id=#32] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=58] +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14]) +- *(1) Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=19] +- HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14]) +- Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] : +- Subquery subquery#4, [id=#33] : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=63] +- *(1) HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15]) +- *(1) Project [i#10] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30] +- HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15]) +- Project [i#10] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- *(1) Scan OneRowRelation[] +- == Initial Plan == Project [Subquery subquery#2, [id=#32] AS max_i#3, Subquery subquery#4, [id=#33] AS min_i#5] : :- Subquery subquery#2, [id=#32] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=58] +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14]) +- *(1) Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=19] +- HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14]) +- Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] : +- Subquery subquery#4, [id=#33] : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=63] +- *(1) HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15]) +- *(1) Project [i#10] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30] +- HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15]) +- Project [i#10] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- Scan OneRowRelation[] ``` After introducing the canonicalization, the plan is as following, where you can see **ReusedSubquery** ``` == Parsed Logical Plan == 'Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- 'Project [unresolvedalias('max('i))] : : +- 'UnresolvedRelation [df], [], false : +- 'Project [unresolvedalias('min('i))] : +- 'UnresolvedRelation [df], [], false +- OneRowRelation == Analyzed Logical Plan == max_i: int, min_i: int Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- Aggregate [max(i#0) AS max(i)#7] : : +- SubqueryAlias df : : +- View (`df`, [i#0, j#1]) : : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 : +- Aggregate [min(i#10) AS min(i)#9] : +- SubqueryAlias df : +- View (`df`, [i#10, j#11]) : +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 +- OneRowRelation == Optimized Logical Plan == Project [scalar-subquery#2 [].max(i) AS max_i#3, scalar-subquery#4 [].min(i) AS min_i#5] : :- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] : : +- Aggregate [max(i#0) AS max(i)#7, min(i#0) AS min(i)#9] : : +- Project [i#0] : : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 : +- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] : +- Aggregate [max(i#0) AS max(i)#7, min(i#0) AS min(i)#9] : +- Project [i#0] : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 +- OneRowRelation == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 0 +- *(1) Project [Subquery subquery#2, [id=#40].max(i) AS max_i#3, ReusedSubquery Subquery subquery#2, [id=#40].min(i) AS min_i#5] : :- Subquery subquery#2, [id=#40] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] +- *(2) HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=71] +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) +- *(1) Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=22] +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) +- Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] : +- ReusedSubquery Subquery subquery#2, [id=#40] +- *(1) Scan OneRowRelation[] +- == Initial Plan == Project [Subquery subquery#2, [id=#40].max(i) AS max_i#3, Subquery subquery#4, [id=#41].min(i) AS min_i#5] : :- Subquery subquery#2, [id=#40] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] +- *(2) HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=71] +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) +- *(1) Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=22] +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) +- Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] : +- Subquery subquery#4, [id=#41] : +- AdaptiveSparkPlan isFinalPlan=false : +- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] : +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=37] : +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) : +- Project [i#0] : +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- Scan OneRowRelation[] ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52529 from yhuang-db/scan-canonicalization. Authored-by: yhuang-db <itisyuchuan@gmail.com> Signed-off-by: Peter Toth <peter.toth@gmail.com>
huangxiaopingRD
pushed a commit
that referenced
this pull request
Nov 25, 2025
### What changes were proposed in this pull request? This PR proposes to add `doCanonicalize` function for DataSourceV2ScanRelation. The implementation is similar to [the one in BatchScanExec](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L150), as well as the [the one in LogicalRelation](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L52). ### Why are the changes needed? Query optimization rules such as MergeScalarSubqueries check if two plans are identical by [comparing their canonicalized form](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala#L219). For DSv2, for physical plan, the canonicalization goes down in the child hierarchy to the BatchScanExec, which [has a doCanonicalize function](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L150); for logical plan, the canonicalization goes down to the DataSourceV2ScanRelation, which, however, does not have a doCanonicalize function. As a result, two logical plans who are semantically identical are not identified. Moreover, for reference, [DSv1 LogicalRelation](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L52) also has `doCanonicalize()`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? A new unit test is added to show that `MergeScalarSubqueries` is working for DataSourceV2ScanRelation. For a query ```sql select (select max(i) from df) as max_i, (select min(i) from df) as min_i ``` Before introducing the canonicalization, the plan is ``` == Parsed Logical Plan == 'Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- 'Project [unresolvedalias('max('i))] : : +- 'UnresolvedRelation [df], [], false : +- 'Project [unresolvedalias('min('i))] : +- 'UnresolvedRelation [df], [], false +- OneRowRelation == Analyzed Logical Plan == max_i: int, min_i: int Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- Aggregate [max(i#0) AS max(i)#7] : : +- SubqueryAlias df : : +- View (`df`, [i#0, j#1]) : : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 : +- Aggregate [min(i#10) AS min(i)#9] : +- SubqueryAlias df : +- View (`df`, [i#10, j#11]) : +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 +- OneRowRelation == Optimized Logical Plan == Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- Aggregate [max(i#0) AS max(i)#7] : : +- Project [i#0] : : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 : +- Aggregate [min(i#10) AS min(i)#9] : +- Project [i#10] : +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 +- OneRowRelation == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 0 +- *(1) Project [Subquery subquery#2, [id=#32] AS max_i#3, Subquery subquery#4, [id=#33] AS min_i#5] : :- Subquery subquery#2, [id=#32] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=58] +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14]) +- *(1) Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=19] +- HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14]) +- Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] : +- Subquery subquery#4, [id=#33] : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=63] +- *(1) HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15]) +- *(1) Project [i#10] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30] +- HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15]) +- Project [i#10] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- *(1) Scan OneRowRelation[] +- == Initial Plan == Project [Subquery subquery#2, [id=#32] AS max_i#3, Subquery subquery#4, [id=#33] AS min_i#5] : :- Subquery subquery#2, [id=#32] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=58] +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14]) +- *(1) Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=19] +- HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14]) +- Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] : +- Subquery subquery#4, [id=#33] : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=63] +- *(1) HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15]) +- *(1) Project [i#10] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30] +- HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15]) +- Project [i#10] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- Scan OneRowRelation[] ``` After introducing the canonicalization, the plan is as following, where you can see **ReusedSubquery** ``` == Parsed Logical Plan == 'Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- 'Project [unresolvedalias('max('i))] : : +- 'UnresolvedRelation [df], [], false : +- 'Project [unresolvedalias('min('i))] : +- 'UnresolvedRelation [df], [], false +- OneRowRelation == Analyzed Logical Plan == max_i: int, min_i: int Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- Aggregate [max(i#0) AS max(i)#7] : : +- SubqueryAlias df : : +- View (`df`, [i#0, j#1]) : : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 : +- Aggregate [min(i#10) AS min(i)#9] : +- SubqueryAlias df : +- View (`df`, [i#10, j#11]) : +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 +- OneRowRelation == Optimized Logical Plan == Project [scalar-subquery#2 [].max(i) AS max_i#3, scalar-subquery#4 [].min(i) AS min_i#5] : :- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] : : +- Aggregate [max(i#0) AS max(i)#7, min(i#0) AS min(i)#9] : : +- Project [i#0] : : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 : +- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] : +- Aggregate [max(i#0) AS max(i)#7, min(i#0) AS min(i)#9] : +- Project [i#0] : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 +- OneRowRelation == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 0 +- *(1) Project [Subquery subquery#2, [id=#40].max(i) AS max_i#3, ReusedSubquery Subquery subquery#2, [id=#40].min(i) AS min_i#5] : :- Subquery subquery#2, [id=#40] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] +- *(2) HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=71] +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) +- *(1) Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=22] +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) +- Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] : +- ReusedSubquery Subquery subquery#2, [id=#40] +- *(1) Scan OneRowRelation[] +- == Initial Plan == Project [Subquery subquery#2, [id=#40].max(i) AS max_i#3, Subquery subquery#4, [id=#41].min(i) AS min_i#5] : :- Subquery subquery#2, [id=#40] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] +- *(2) HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=71] +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) +- *(1) Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=22] +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) +- Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] : +- Subquery subquery#4, [id=#41] : +- AdaptiveSparkPlan isFinalPlan=false : +- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] : +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=37] : +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) : +- Project [i#0] : +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- Scan OneRowRelation[] ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52529 from yhuang-db/scan-canonicalization. Authored-by: yhuang-db <itisyuchuan@gmail.com> Signed-off-by: Peter Toth <peter.toth@gmail.com>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
See Commits and Changes for more details.
Created by
pull[bot]
Can you help keep this open source service alive? 💖 Please sponsor : )