forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 1
[pull] master from apache:master #40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…eConf for its usage in RocksDBConf ### What changes were proposed in this pull request? Currently the usage of StateStoreConf is via [confs](https://github.com/huanliwang-db/spark/blob/7671bc975f2d88ab929e4982abfe3e166fa72e35/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala#L77-L78), which composes both SQL confs and extraOptions into one. The name of config for extraOptions shouldn't have to follow the name prefix of SQL conf, because it's not bound to the context of SQL conf. ### Why are the changes needed? After differentiate SQL conf and extraOptions in StateStoreConf, we should be able to adopt more use case on operator level configs by using the extraOptions. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UT should cover the change Closes #39069 from huanliwang-db/SPARK-41524. Authored-by: Huanli Wang <huanli.wang@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
…in case of 0 snapshot ### What changes were proposed in this pull request? This PR is a follow-up to skip allocation code block (`splitSlots` and `requestNewExecutors`) completely when the notified snapshots are an empty array. ### Why are the changes needed? When there is no change from K8s control plane side, the notified snapshot is empty like the following. In that case, we still need to do executor clean up, but we don't need to try to create a new executor if we are waiting for new PVCs. ``` 22/12/15 18:08:24 INFO ExecutorPodsAllocator: Going to request 1 executors from Kubernetes for ResourceProfile Id: 0, target: 15, known: 14, sharedSlotFromPendingPods: 2147483646. 22/12/15 18:08:24 INFO ExecutorPodsAllocator: Found 0 reusable PVCs from 15 PVCs 22/12/15 18:08:24 INFO ExecutorPodsAllocator: Wait to reuse one of the existing 15 PVCs. 22/12/15 18:08:25 INFO ExecutorPodsAllocator: Received 0 snapshots 22/12/15 18:08:26 INFO ExecutorPodsAllocator: Received 0 snapshots 22/12/15 18:08:27 INFO ExecutorPodsAllocator: Received 0 snapshots 22/12/15 18:08:28 INFO ExecutorPodsAllocator: Received 0 snapshots 22/12/15 18:08:29 INFO ExecutorPodsAllocator: Received 0 snapshots 22/12/15 18:08:30 INFO ExecutorPodsAllocator: Received 0 snapshots 22/12/15 18:08:31 INFO ExecutorPodsAllocator: Received 0 snapshots 22/12/15 18:08:32 INFO ExecutorPodsAllocator: Received 0 snapshots 22/12/15 18:08:33 INFO ExecutorPodsAllocator: Received 0 snapshots 22/12/15 18:08:34 INFO ExecutorPodsAllocator: Received 0 snapshots 22/12/15 18:08:35 INFO ExecutorPodsAllocator: Received 0 snapshots 22/12/15 18:08:36 INFO ExecutorPodsAllocator: Received 0 snapshots 22/12/15 18:08:37 INFO ExecutorPodsAllocator: Received 0 snapshots 22/12/15 18:08:38 INFO ExecutorPodsAllocator: Received 0 snapshots 22/12/15 18:08:39 INFO ExecutorPodsAllocator: Received 0 snapshots 22/12/15 18:08:40 INFO ExecutorPodsAllocator: Received 0 snapshots 22/12/15 18:08:41 INFO ExecutorPodsAllocator: Received 0 snapshots 22/12/15 18:08:42 INFO ExecutorPodsAllocator: Received 0 snapshots 22/12/15 18:08:43 INFO ExecutorPodsAllocator: Received 0 snapshots 22/12/15 18:08:44 INFO ExecutorPodsAllocator: Received 0 snapshots 22/12/15 18:08:45 INFO ExecutorPodsAllocator: Received 0 snapshots ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. Closes #39079 from dongjoon-hyun/SPARK-41410. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…s Future Work section ### What changes were proposed in this pull request? This PR aims to remove `Dynamic Resource Allocation` from K8s document because Apache Spark has been supported it. In addition, `Dynamic Resource Allocation` can be used with 3rd-party alternatives like storing the shuffle data to S3 directly. Note that the remaining future item, `External Shuffle Service`, is also a good and independent feature. ### Why are the changes needed? Here is a brief history around `spark.dynamicAllocation.shuffleTracking.enabled`. - Apache Spark 3.0.0 added it via SPARK-27963 for K8s environment. - Apache Spark 3.1.1 made K8s GA via SPARK-33005 and started to used it in K8s widely. - Apache Spark 3.2.0 started to support shuffle data recovery on the reused PVCs via SPARK-35593 - Apache Spark 3.3.0 removed `Experimental` tag from it via SPARK-39322. - Apache Spark 3.4.0 will enable it by default via SPARK-39846 - This PR removes `Dynamic Allocation` from `Future Work` to improve its visibility. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual review because this is a documentation change. Closes #39080 from dongjoon-hyun/SPARK-41536. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? This PR workaround several `ignore`s in MyPy check ### Why are the changes needed? To make MyPy happy. In addition, It fails (in my local) although I use the exactly same MyPy version. ``` annotations failed mypy checks: python/pyspark/pandas/frame.py:6371: error: unused "type: ignore" comment python/pyspark/pandas/frame.py:6428: error: unused "type: ignore" comment python/pyspark/pandas/frame.py:6430: error: unused "type: ignore" comment python/pyspark/pandas/resample.py:104: error: unused "type: ignore" comment python/pyspark/pandas/resample.py:137: error: unused "type: ignore" comment Found 5 errors in 2 files (checked 375 source files) ``` ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? Manually tested via `./dev/lint-python` in my local. Closes #39075 from HyukjinKwon/remove-unused-ignores. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…yarn environment ### What changes were proposed in this pull request? Stages UI page fails to load for proxy in some specific yarn environment. ### Why are the changes needed? My environment CDH 5.8 , click to enter the spark UI from the yarn Resource Manager page when visit the stage URI, it fails to load, URI is http://<yarn-url>:8088/proxy/application_1669877165233_0021/stages/stage/?id=0&attempt=0 The issue is similar to, the final phenomenon of the issue is the same, because the parameter encode twice [SPARK-32467](https://issues.apache.org/jira/browse/SPARK-32467) [SPARK-33611](https://issues.apache.org/jira/browse/SPARK-33611) The two issues solve two scenarios to avoid encode twice: 1. https redirect proxy 2. set reverse proxy enabled (spark.ui.reverseProxy) in Nginx But if encode twice due to other reasons, such as this issue (yarn proxy), it will also fail when visit stage page. It is better to decode parameter twice here. Just like fix here [SPARK-12708](https://issues.apache.org/jira/browse/SPARK-12708) [codes](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/UIUtils.scala#L626) ### Does this PR introduce any user-facing change? No ### How was this patch tested? new added UT Closes #38882 from yabola/fixui. Authored-by: chenliang.lu <marssss2929@gmail.com> Signed-off-by: Gengliang Wang <gengliang@apache.org>
### What changes were proposed in this pull request?
This PR supports data masking built-in Function **mask**, which returns a masked version of input string.
By default, upper case letters will be converted to "X", lower case letters will be converted to "x" and numbers will be converted to "n".
For example mask("**abcd-EFGH-8765-4321**") results in **xxxx-XXXX-nnnn-nnnn** will be able to override the characters used in the mask by supplying additional arguments: the second argument controls the mask character for upper case letters, the third argument for lower case letters and the fourth argument for numbers. For example, mask("**abcd-EFGH-8765-4321", "U", "l", "#**") should result in **llll-UUUU-####-####**
Examples:
```
> SELECT mask('abcd-EFGH-8765-4321');
xxxx-XXXX-nnnn-nnnn
> SELECT mask('abcd-EFGH-8765-4321', 'Q');
xxxx-QQQQ-nnnn-nnnn
> SELECT mask('AbCD123-$#', 'Q','q');
QqQQnnn-$#
> SELECT mask('AbCD123-$#');
XxXXnnn-$#
> SELECT mask('AbCD123-$#', 'Q');
QxQQnnn-$#
> SELECT mask('AbCD123-$#', 'Q','q');
QqQQnnn-$#
> SELECT mask('AbCD123-$#', 'Q','q', 'd');
QqQQddd-$#
> SELECT mask('AbCD123-$#', 'Q','q', 'd', 'o');
QqQQdddoooo
> SELECT mask('AbCD123-$#', -1, 'q', 'd', 'o');
AqCDdddoooo
> SELECT mask('AbCD123-$#', -1,-1, 'd', 'o');
AbCDdddoooo
> SELECT mask('AbCD123-$#', -1,-1, -1, 'o');
AbCD123oooo
> SELECT mask(NULL, -1,-1, -1, 'o');
NULL
> SELECT mask(NULL);
NULL
> SELECT mask('AbCD123-$#', -1, -1, -1, -1);
AbCD123-$#
```
### Why are the changes needed?
To support data masking built-in function **mask**, which returns a masked version of the input string
Ref : [Data masking functions](https://cwiki.apache.org/confluence/display/hive/languagemanual+udf#LanguageManualUDF-DataMaskingFunctions)
### Does this PR introduce _any_ user-facing change?
Yes, added a new build-in function named '**mask**'
### How was this patch tested?
Added test cases
Closes #38146 from vinodkc/br_supportMask.
Authored-by: Vinod KC <vinod.kc.in@gmail.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
…eePatterns ### What changes were proposed in this pull request? Removed `AND_OR` TreePattern, replaced with separate `AND` TreePattern and `OR` TreePattern. ### Why are the changes needed? This way we can be more fine-grained with how we match for AND/OR patterns, since in many cases we want to treat them separately. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? No tests were added as there are not any existing unit tests for TreePatterns. Instead, I just made sure the existing relevant tests (e.g. `BooleanSimplificationSuite`) passed. Closes #39064 from kelvinjian-db/SPARK-41520-and-or-treepattern. Authored-by: Kelvin Jiang <kelvin.jiang@databricks.com> Signed-off-by: Gengliang Wang <gengliang@apache.org>
… Function `when` with `UnresolvedFunction`
### What changes were proposed in this pull request?
1, Implement `Column.{when, otherwise}` and Function `when`
### Why are the changes needed?
For API coverage
### Does this PR introduce _any_ user-facing change?
yes
### How was this patch tested?
added UT
Closes #38956 from zhengruifeng/connect_column_case_when_with_function.
Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
### What changes were proposed in this pull request? This PR aims to add a new executor roll policy, `DISK_USED`, which chooses the executor whose used disk size is biggest. ### Why are the changes needed? To provide a new built-in policy to the users. ### Does this PR introduce _any_ user-facing change? No. This is a new policy. ### How was this patch tested? Pass the CIs. Closes #39083 from dongjoon-hyun/SPARK-41540. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…t in Spark Connect ### What changes were proposed in this pull request? This PR proposes to set parallelism as 1 for coverage report in Spark Connect. Currently, the coverage report consistently fails for Spark Connect, see https://github.com/apache/spark/actions/runs/3663716955/jobs/6193686439 ### Why are the changes needed? To make Spark Connect build working (Spark Connect requires 1 parallelism). Note that this is consistent with our PR builder, see https://github.com/apache/spark/blob/919e556d182f86b3e1e22cd05a546e0f2fc3e307/.github/workflows/build_and_test.yml#L405 ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? Will monitor the jobs and https://app.codecov.io/gh/apache/spark after merging this. Closes #39085 from HyukjinKwon/SPARK-41542. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…TED_INPUT_TYPE` and remove `_LEGACY_ERROR_TEMP_1179` ### What changes were proposed in this pull request? The main changes of this pr as follows: - Introduce `UNEXPECTED_INPUT_TYPE` instead of `_LEGACY_ERROR_TEMP_1180` and rename the method name from `incompatibleRangeInputDataTypeError` to `unexpectedInputDataTypeError` - Refactor `Range#toLong` and `Range#toInt` to pass `paramIndex` to `UNEXPECTED_INPUT_TYPE` - Throw existing `AnalysisException` in `TableFunctionBuilder` instead of throwing a new `AnalysisException` - Remove unused `_LEGACY_ERROR_TEMP_1179` and `_LEGACY_ERROR_TEMP_1180` ### Why are the changes needed? Proper names of error classes to improve user experience with Spark SQL. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions Closes #39063 from LuciferYang/SPARK-41508. Lead-authored-by: yangjie01 <yangjie01@baidu.com> Co-authored-by: YangJie <yangjie01@baidu.com> Signed-off-by: Max Gekk <max.gekk@gmail.com>
…rojection` for array of UDTs
### What changes were proposed in this pull request?
Change `InterpretedUnsafeProjection#getElementSize` to choose the appropriate element size for the underlying SQL type of a UDT, rather than simply using the the default size of the underlying SQL type.
### Why are the changes needed?
Consider this query:
```
// create a file of vector data
import org.apache.spark.ml.linalg.{DenseVector, Vector}
case class TestRow(varr: Array[Vector])
val values = Array(0.1d, 0.2d, 0.3d)
val dv = new DenseVector(values).asInstanceOf[Vector]
val ds = Seq(TestRow(Array(dv, dv))).toDS
ds.coalesce(1).write.mode("overwrite").format("parquet").save("vector_data")
// this works
spark.read.format("parquet").load("vector_data").collect
sql("set spark.sql.codegen.wholeStage=false")
sql("set spark.sql.codegen.factoryMode=NO_CODEGEN")
// this will get an error
spark.read.format("parquet").load("vector_data").collect
```
The failures vary, incuding
* `VectorUDT` attempting to deserialize to a `SparseVector` (rather than a `DenseVector`)
* negative array size (for one of the nested arrays)
* JVM crash (SIGBUS error).
This is because `InterpretedUnsafeProjection` initializes the outer-most array writer with an element size of 17 (the size of the UDT's underlying struct), rather than an element size of 8, which would be appropriate for an array of structs.
When the outer-most array is later accessed, `UnsafeArrayData` assumes an element size of 8, so it picks up a garbage offset/size tuple for the second element.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New unit test.
Closes #39349 from bersprockets/udt_issue.
Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…dataframe ### What changes were proposed in this pull request? This PR proposes to enable doctests in pyspark.sql.connect.dataframe that is virtually the same as pyspark.sql.dataframe. ### Why are the changes needed? To make sure on the PySpark compatibility and test coverage. ### Does this PR introduce any user-facing change? No, doctest's only. ### How was this patch tested? New Doctests Added Closes #39346 from techaddict/SPARK-41656-pyspark.sql.connect.dataframe. Authored-by: Sandeep Singh <sandeep@techaddict.me> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request? This PR proposes to automatically reformat `python/setup.py` too. ### Why are the changes needed? To make the development cycle easier. ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? I manually checked via: ```bash ./dev/reformat-python ./dev/lint-python ``` Closes #39352 from HyukjinKwon/SPARK-41854. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…URE.DISTRIBUTE_BY ### What changes were proposed in this pull request? The pr aims to remove non-existent error class: UNSUPPORTED_FEATURE.DISTRIBUTE_BY. ### Why are the changes needed? When I want to add UT for error class: UNSUPPORTED_FEATURE.DISTRIBUTE_BY, I found it no longer in use. <img width="752" alt="image" src="https://user-images.githubusercontent.com/15246973/210191953-7d896bd7-b552-4a7e-bda8-72237c25297d.png"> ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. Closes #39335 from panbingkun/SPARK-41807. Authored-by: panbingkun <pbk1982@gmail.com> Signed-off-by: Max Gekk <max.gekk@gmail.com>
…functions ### What changes were proposed in this pull request? This PR proposes to enable doctests in pyspark.sql.connect.functions that is virtually the same as pyspark.sql.functions. ### Why are the changes needed? To make sure on the PySpark compatibility and test coverage. ### Does this PR introduce any user-facing change? No, doctest's only. ### How was this patch tested? New Doctests Added Closes #39347 from techaddict/SPARK-41658-pyspark.sql.connect.functions. Authored-by: Sandeep Singh <sandeep@techaddict.me> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…rigger the error from user space ### What changes were proposed in this pull request? This updates the test for error class RENAME_SRC_PATH_NOT_FOUND in QueryExecutionErrorsSuite to trigger the error with user-facing APIs. ### Why are the changes needed? To test the code more directly. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The only change is the modification of the RENAME_SRC_PATH_NOT_FOUND test. ### License This contribution is my original work. I license the work to the project under the project’s open source license. Closes #39348 from ibuder/SPARK-41311. Authored-by: Immanuel Buder <immanuel_buder@intuit.com> Signed-off-by: Max Gekk <max.gekk@gmail.com>
…ests' comments ### What changes were proposed in this pull request? This PR is a followup of #39347 and #39347, which updates the invalid JIRAs linked in the comments. ### Why are the changes needed? To track the issues properly, and reenable skipped tests in the future. ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? Comment-only. Linter in CI should verify them. I also manually checked it in my local. Closes #39354 from HyukjinKwon/SPARK-41658-followup. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…time_functions, test_expr, test_math_functions, test_window_functions_cumulative_sum, test_corr, test_cov, test_crosstab, test_approxQuantile ### What changes were proposed in this pull request? This PR enables the reused PySpark tests in Spark Connect that pass now. ### Why are the changes needed? To make sure on the test coverage. ### Does this PR introduce any user-facing change? No, test-only. ### How was this patch tested? Enabling tests Closes #39359 from techaddict/SPARK-41857. Authored-by: Sandeep Singh <sandeep@techaddict.me> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
… Make `createDataFrame` handle None/NaN properly
### What changes were proposed in this pull request?
Make `createDataFrame` handle None/NaN properly
Existing implementation always convert local data to a Pandas DataFrame, and then a PyArrow Table, this approach can not handle None/NaN properly:
1, local data -> Pandas DataFrame, method `pd.DataFrame` always convert None in numeric columns into NaN, and there is no parameter to control this behavior;
2, Pandas DataFrame -> PyArrow Table, method `pa.Table.from_pandas` always convert NaN into Null, and there is no parameter to control this behavior;
```
In [72]: data = [Row(id=1, value=float("NaN"), s="x"), Row(id=2, value=42.0, s="y"), Row(id=3, value=None, s=None)]
In [73]: pdf = pd.DataFrame(data, columns=["id", "value", "s"])
In [74]: pdf
Out[74]:
id value s
0 1 NaN x
1 2 42.0 y
2 3 NaN None
In [75]: pa.Table.from_pandas(pdf)
Out[75]:
pyarrow.Table
id: int64
value: double
s: string
----
id: [[1,2,3]]
value: [[null,42,null]]
s: [["x","y",null]]
```
to correctly handle None/NaN, I found that `pa.Table.from_pylist` works.
```
In [76]: pa.Table.from_pylist([d.asDict() for d in data])
Out[76]:
pyarrow.Table
id: int64
value: double
s: string
----
id: [[1,2,3]]
value: [[nan,42,null]]
s: [["x","y",null]]
```
### Why are the changes needed?
to support None and NaN
### Does this PR introduce _any_ user-facing change?
yes
### How was this patch tested?
added ut
Closes #39360 from zhengruifeng/connect_fix_41814.
Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request? This PR is a major refactor of how Spark resolves columns. Today, the column resolution logic is placed in several rules, which makes it hard to understand. It's also very fragile to maintain the resolution precedence, as you have to carefully deal with the interactions between these rules. This PR centralizes the column resolution logic into a single rule: the existing `ResolveReferences` rule, so that we no longer need to worry about the interactions between multiple rules. The detailed resolution precedence is also documented. ### Why are the changes needed? code cleanup ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes #38888 from cloud-fan/col. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…nReader ### What changes were proposed in this pull request? The use of `SortedMap` in `ErrorClassesJsonReader` was mostly for making tests easier to write. This PR replaces `SortedMap` with `Map` since `SortedMap` is slower compared to `Map` ### Why are the changes needed? `SortedMap` is slower compared to `Map` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing test suite. Closes #39351 from tedyu/json-reader-map. Authored-by: Ted Yu <yuzhihong@gmail.com> Signed-off-by: Sean Owen <srowen@gmail.com>
…e feature ### What changes were proposed in this pull request? This PR is a partial and logical revert of SPARK-39862, #37280, to fix the huge ORC reader perf regression (3x slower). SPARK-39862 should propose a fix without perf regression. ### Why are the changes needed? During Apache Spark 3.4.0 preparation, SPARK-41782 identified a perf regression. - #39301 (comment) ### Does this PR introduce _any_ user-facing change? After this PR, the regression is removed. However, the bug of DEFAULT value feature will remain. This should be handled separately. ### How was this patch tested? Pass the CI. Closes #39362 from dongjoon-hyun/SPARK-41858. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…he base class ### What changes were proposed in this pull request? This is a followup of #39248 , to add one more code cleanup. The expression initialization code is duplicated 6 times and we should put it in the base class. ### Why are the changes needed? code cleanup ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes #39364 from cloud-fan/expr. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
…octests ### What changes were proposed in this pull request? This PR is a followup of #39360 that enables skipped doctests. ### Why are the changes needed? To make sure on the test coverage. ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Manually checked via: ```bash ./python/run-tests --testnames 'pyspark.sql.connect.functions' ./python/run-tests --testnames 'pyspark.sql.connect.column' ``` Closes #39363 from HyukjinKwon/SPARK-41814. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? Currently, the GitHub Action Python linter job is broken. This PR will recover Python linter failure. ### Why are the changes needed? There are two kind of failures. 1. https://github.com/apache/spark/actions/runs/3829330032/jobs/6524170799 ``` python/pyspark/pandas/sql_processor.py:221: error: unused "type: ignore" comment Found 1 error in 1 file (checked 380 source files) ``` 2. After fixing (1), we hit the following. ``` ModuleNotFoundError: No module named 'py._path'; 'py' is not a package ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the GitHub CI on this PR. Or, manually run the following. ``` $ dev/lint-python starting python compilation test... python compilation succeeded. starting black test... black checks passed. starting flake8 test... flake8 checks passed. starting mypy annotations test... annotations passed mypy checks. starting mypy examples test... examples passed mypy checks. starting mypy data test... annotations passed data checks. all lint-python tests passed! ``` Closes #39373 from dongjoon-hyun/SPARK-41864. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…d is not available ### What changes were proposed in this pull request? This PR aims to skip `flake8` tests if the command is not available. ### Why are the changes needed? Linters are optional modules and we can be skip in some systems like `mypy`. ``` $ dev/lint-python starting python compilation test... python compilation succeeded. The Python library providing 'black' module was not found. Skipping black checks for now. The flake8 command was not found. Skipping for now. The mypy command was not found. Skipping for now. all lint-python tests passed! ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual tests. Closes #39372 from dongjoon-hyun/SPARK-41863. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…atorUpdates for Scala 2.13 ### What changes were proposed in this pull request? This PR is a followup of #39192 that excludes `StageData.rddIds` and `StageData.accumulatorUpdates` for Scala 2.13 ### Why are the changes needed? To recover the Scala 2.13 build. It is currently broken (https://github.com/apache/spark/actions/runs/3824617107/jobs/6506925003): ``` [error] spark-core: Failed binary compatibility check against org.apache.spark:spark-core_2.13:3.3.0! Found 3 potential problems (filtered 997) [error] * method rddIds()scala.collection.immutable.Seq in class org.apache.spark.status.api.v1.StageData has a different result type in current version, where it is scala.collection.Seq rather than scala.collection.immutable.Seq [error] filter with: ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.StageData.rddIds") [error] * method accumulatorUpdates()scala.collection.immutable.Seq in class org.apache.spark.status.api.v1.StageData has a different result type in current version, where it is scala.collection.Seq rather than scala.collection.immutable.Seq [error] filter with: ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.StageData.accumulatorUpdates") [error] * method this(org.apache.spark.status.api.v1.StageStatus,Int,Int,Int,Int,Int,Int,Int,Int,scala.Option,scala.Option,scala.Option,scala.Option,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,java.lang.String,scala.Option,java.lang.String,java.lang.String,scala.collection.immutable.Seq,scala.collection.immutable.Seq,scala.Option,scala.Option,scala.Option,scala.collection.immutable.Map,Int,scala.Option,scala.Option,scala.Option)Unit in class org.apache.spark.status.api.v1.StageData's type is different in current version, where it is (org.apache.spark.status.api.v1.StageStatus,Int,Int,Int,Int,Int,Int,Int,Int,scala.Option,scala.Option,scala.Option,scala.Option,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,java.lang.String,scala.Option,java.lang.String,java.lang.String,scala.collection.Seq,scala.collection.Seq,scala.Option,scala.Option,scala.Option,scala.collection.immutable.Map,Int,scala.Option,scala.Option,scala.Option)Unit instead of (org.apache.spark.status.api.v1.StageStatus,Int,Int,Int,Int,Int,Int,Int,Int,scala.Option,scala.Option,scala.Option,scala.Option,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,Long,java.lang.String,scala.Option,java.lang.String,java.lang.String,scala.collection.immutable.Seq,scala.collection.immutable.Seq,scala.Option,scala.Option,scala.Option,scala.collection.immutable.Map,Int,scala.Option,scala.Option,scala.Option)Unit [error] filter with: ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.status.api.v1.StageData.this") ``` ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? Manually tested. Closes #39356 from HyukjinKwon/SPARK-41423. Lead-authored-by: Hyukjin Kwon <gurwls223@apache.org> Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…e, test_freqItems, test_input_files, test_to_pandas_required_pandas_not_found ### What changes were proposed in this pull request? This PR enables the reused PySpark tests in Spark Connect that pass now. ### Why are the changes needed? To make sure on the test coverage. ### Does this PR introduce any user-facing change? No, test-only. ### How was this patch tested? Manually ran it in my local. Closes #39358 from techaddict/SPARK-41856. Authored-by: Sandeep Singh <sandeep@techaddict.me> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…rc reader ### What changes were proposed in this pull request? This PR fixes a correctness bug related to column DEFAULT values in Orc reader. * #37280 introduced a performance regression in the Orc reader. * #39362 fixed the performance regression, but stopped the column DEFAULT feature from working, causing a temporary correctness regression that we agreed for me to fix later. * This PR restores column DEFAULT functionality for Orc scans and fixes the correctness regression while not reintroducing the performance regression. ### Why are the changes needed? This PR fixes a correctness bug. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? This PR updates a unit test to exercise that the Orc scan functionality is correct. Closes #39370 from dtenedor/fix-perf-bug-orc-reader. Authored-by: Daniel Tenedorio <daniel.tenedorio@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request? In SSLOptions rest of the settings should be set only when ssl is enabled. ### Why are the changes needed? If spark.ssl.enabled is false, there is no use of setting rest of spark.ssl.* settings in SSLOptions as this requires unnecessary operations to be performed to set these properties. Additional implication of trying to set the rest of settings is if any error occurs in setting these properties it will cause job failure which otherwise should have worked since ssl is disabled. For example, if the user doesn't have access to the keystore path which is set in hadoop.security.credential.provider.path of hive-site.xml, it can result in failure while launching spark shell since SSLOptions won't be initialized due to error in accessing the keystore. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added new test. Closes #39221 from shrprasa/ssl_options_fix. Authored-by: Shrikant Prasad <shrprasa@visa.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
huangxiaopingRD
pushed a commit
that referenced
this pull request
Oct 12, 2023
…edExecutorBackend
### What changes were proposed in this pull request?
Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an executor process randomly gets stuck
### Why are the changes needed?
For each executor, the single-threaded dispatcher can run into an "infinite loop" (as explained in the SPARK-45227). Once an executor process runs into a state, it'd stop launching tasks from the driver or reporting task status back.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
```
$ build/mvn package -DskipTests -pl core
$ build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite test
```
### Was this patch authored or co-authored using generative AI tooling?
No
******************************************************************************
**_Please feel free to skip reading unless you're interested in details_**
******************************************************************************
### Symptom
Our Spark 3 app running on EMR 6.10.0 with Spark 3.3.1 got stuck in the very last step of writing a data frame to S3 by calling `df.write`. Looking at Spark UI, we saw that an executor process hung over 1 hour. After we manually killed the executor process, the app succeeded. Note that the same EMR cluster with two worker nodes was able to run the same app without any issue before and after the incident.
Below is what's observed from relevant container logs and thread dump.
- A regular task that's sent to the executor, which also reported back to the driver upon the task completion.
```
$zgrep 'task 150' container_1694029806204_12865_01_000001/stderr.gz
23/09/12 18:13:55 INFO TaskSetManager: Starting task 150.0 in stage 23.0 (TID 923) (ip-10-0-185-107.ec2.internal, executor 3, partition 150, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()
23/09/12 18:13:55 INFO TaskSetManager: Finished task 150.0 in stage 23.0 (TID 923) in 126 ms on ip-10-0-185-107.ec2.internal (executor 3) (16/200)
$zgrep ' 923' container_1694029806204_12865_01_000004/stderr.gz
23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 923
$zgrep 'task 150' container_1694029806204_12865_01_000004/stderr.gz
23/09/12 18:13:55 INFO Executor: Running task 150.0 in stage 23.0 (TID 923)
23/09/12 18:13:55 INFO Executor: Finished task 150.0 in stage 23.0 (TID 923). 4495 bytes result sent to driver
```
- Another task that's sent to the executor but didn't get launched since the single-threaded dispatcher was stuck (presumably in an "infinite loop" as explained later).
```
$zgrep 'task 153' container_1694029806204_12865_01_000001/stderr.gz
23/09/12 18:13:55 INFO TaskSetManager: Starting task 153.0 in stage 23.0 (TID 924) (ip-10-0-185-107.ec2.internal, executor 3, partition 153, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()
$zgrep ' 924' container_1694029806204_12865_01_000004/stderr.gz
23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 924
$zgrep 'task 153' container_1694029806204_12865_01_000004/stderr.gz
>> note that the above command has no matching result, indicating that task 153.0 in stage 23.0 (TID 924) was never launched
```
- Thread dump shows that the dispatcher-Executor thread has the following stack trace.
```
"dispatcher-Executor" #40 daemon prio=5 os_prio=0 tid=0x0000ffff98e37800 nid=0x1aff runnable [0x0000ffff73bba000]
java.lang.Thread.State: RUNNABLE
at scala.runtime.BoxesRunTime.equalsNumObject(BoxesRunTime.java:142)
at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:131)
at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123)
at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365)
at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365)
at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44)
at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140)
at scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:169)
at scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167)
at scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44)
at scala.collection.mutable.HashMap.put(HashMap.scala:126)
at scala.collection.mutable.HashMap.update(HashMap.scala:131)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:200)
at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
at org.apache.spark.rpc.netty.Inbox$$Lambda$323/1930826709.apply$mcV$sp(Unknown Source)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
```
### Relevant code paths
Within an executor process, there's a [dispatcher thread](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L170) dedicated to CoarseGrainedExecutorBackend(a single RPC endpoint) that launches tasks scheduled by the driver. Each task is run on a TaskRunner thread backed by a thread pool created for the executor. The TaskRunner thread and the dispatcher thread are different. However, they read and write a common object (i.e., taskResources) that's a mutable hashmap without thread-safety, in [Executor](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/Executor.scala#L561) and [CoarseGrainedExecutorBackend](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L189), respectively.
### What's going on?
Based on the above observations, our hypothesis is that the dispatcher thread runs into an "infinite loop" due to a race condition when two threads access the same hashmap object. For illustration purpose, let's consider the following scenario where two threads (Thread 1 and Thread 2) access a hash table without thread-safety
- Thread 1 sees A.next = B, but then yields execution to Thread 2
<img src="https://issues.apache.org/jira/secure/attachment/13063040/13063040_hashtable1.png" width="400">
- Thread 2 triggers a resize operation resulting in B.next = A (Note that hashmap doesn't care about ordering), and then yields execution to Thread 1.
<img src="https://issues.apache.org/jira/secure/attachment/13063041/13063041_hashtable2.png" width="400">
- After taking over CPU, Thread 1 would run into an "infinite loop" when traversing the list in the last bucket, given A.next = B and B.next = A in its view.
Closes apache#43021 from xiongbo-sjtu/master.
Authored-by: Bo Xiong <xiongbo@amazon.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
pull bot
pushed a commit
that referenced
this pull request
Jan 23, 2025
…IN-subquery ### What changes were proposed in this pull request? This PR adds code to `RewritePredicateSubquery#apply` to explicitly handle the case where an `Aggregate` node contains an aggregate expression in the left-hand operand of an IN-subquery expression. The explicit handler moves the IN-subquery expressions out of the `Aggregate` and into a parent `Project` node. The `Aggregate` will continue to perform the aggregations that were used as an operand to the IN-subquery expression, but will not include the IN-subquery expression itself. After pulling up IN-subquery expressions into a Project node, `RewritePredicateSubquery#apply` is called again to handle the `Project` as a `UnaryNode`. The `Join` will now be inserted between the `Project` and the `Aggregate` node, and the join condition will use an attribute rather than an aggregate expression, e.g.: ``` Project [col1#32, exists#42 AS (sum(col2) IN (listquery()))#40] +- Join ExistenceJoin(exists#42), (sum(col2)#41L = c2#39L) :- Aggregate [col1#32], [col1#32, sum(col2#33) AS sum(col2)#41L] : +- LocalRelation [col1#32, col2#33] +- LocalRelation [c2#39L] ``` `sum(col2)#41L` in the above join condition, despite how it looks, is the name of the attribute, not an aggregate expression. ### Why are the changes needed? The following query fails: ``` create or replace temp view v1(c1, c2) as values (1, 2), (1, 3), (2, 2), (3, 7), (3, 1); create or replace temp view v2(col1, col2) as values (1, 2), (1, 3), (2, 2), (3, 7), (3, 1); select col1, sum(col2) in (select c2 from v1) from v2 group by col1; ``` It fails with this error: ``` [INTERNAL_ERROR] Cannot generate code for expression: sum(input[1, int, false]) SQLSTATE: XX000 ``` With SPARK_TESTING=1, it fails with this error: ``` [PLAN_VALIDATION_FAILED_RULE_IN_BATCH] Rule org.apache.spark.sql.catalyst.optimizer.RewritePredicateSubquery in batch RewriteSubquery generated an invalid plan: Special expressions are placed in the wrong plan: Aggregate [col1#11], [col1#11, first(exists#20, false) AS (sum(col2) IN (listquery()))#19] +- Join ExistenceJoin(exists#20), (sum(col2#12) = c2#18L) :- LocalRelation [col1#11, col2#12] +- LocalRelation [c2#18L] ``` The issue is that `RewritePredicateSubquery` builds a `Join` operator where the join condition contains an aggregate expression. The bug is in the handler for `UnaryNode` in `RewritePredicateSubquery#apply`, which adds a `Join` below the `Aggregate` and assumes that the left-hand operand of IN-subquery can be used in the join condition. This works fine for most cases, but not when the left-hand operand is an aggregate expression. This PR moves the offending IN-subqueries to a `Project` node, with the aggregates replaced by attributes referring to the aggregate expressions. The resulting join condition now uses those attributes rather than the actual aggregate expressions. ### Does this PR introduce _any_ user-facing change? No, other than allowing this type of query to succeed. ### How was this patch tested? New unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#48627 from bersprockets/aggregate_in_set_issue. Authored-by: Bruce Robbins <bersprockets@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
huangxiaopingRD
pushed a commit
that referenced
this pull request
Sep 2, 2025
…IN-subquery ### What changes were proposed in this pull request? This PR adds code to `RewritePredicateSubquery#apply` to explicitly handle the case where an `Aggregate` node contains an aggregate expression in the left-hand operand of an IN-subquery expression. The explicit handler moves the IN-subquery expressions out of the `Aggregate` and into a parent `Project` node. The `Aggregate` will continue to perform the aggregations that were used as an operand to the IN-subquery expression, but will not include the IN-subquery expression itself. After pulling up IN-subquery expressions into a Project node, `RewritePredicateSubquery#apply` is called again to handle the `Project` as a `UnaryNode`. The `Join` will now be inserted between the `Project` and the `Aggregate` node, and the join condition will use an attribute rather than an aggregate expression, e.g.: ``` Project [col1#32, exists#42 AS (sum(col2) IN (listquery()))#40] +- Join ExistenceJoin(exists#42), (sum(col2)#41L = c2#39L) :- Aggregate [col1#32], [col1#32, sum(col2#33) AS sum(col2)#41L] : +- LocalRelation [col1#32, col2#33] +- LocalRelation [c2#39L] ``` `sum(col2)#41L` in the above join condition, despite how it looks, is the name of the attribute, not an aggregate expression. ### Why are the changes needed? The following query fails: ``` create or replace temp view v1(c1, c2) as values (1, 2), (1, 3), (2, 2), (3, 7), (3, 1); create or replace temp view v2(col1, col2) as values (1, 2), (1, 3), (2, 2), (3, 7), (3, 1); select col1, sum(col2) in (select c2 from v1) from v2 group by col1; ``` It fails with this error: ``` [INTERNAL_ERROR] Cannot generate code for expression: sum(input[1, int, false]) SQLSTATE: XX000 ``` With SPARK_TESTING=1, it fails with this error: ``` [PLAN_VALIDATION_FAILED_RULE_IN_BATCH] Rule org.apache.spark.sql.catalyst.optimizer.RewritePredicateSubquery in batch RewriteSubquery generated an invalid plan: Special expressions are placed in the wrong plan: Aggregate [col1#11], [col1#11, first(exists#20, false) AS (sum(col2) IN (listquery()))#19] +- Join ExistenceJoin(exists#20), (sum(col2#12) = c2#18L) :- LocalRelation [col1#11, col2#12] +- LocalRelation [c2#18L] ``` The issue is that `RewritePredicateSubquery` builds a `Join` operator where the join condition contains an aggregate expression. The bug is in the handler for `UnaryNode` in `RewritePredicateSubquery#apply`, which adds a `Join` below the `Aggregate` and assumes that the left-hand operand of IN-subquery can be used in the join condition. This works fine for most cases, but not when the left-hand operand is an aggregate expression. This PR moves the offending IN-subqueries to a `Project` node, with the aggregates replaced by attributes referring to the aggregate expressions. The resulting join condition now uses those attributes rather than the actual aggregate expressions. ### Does this PR introduce _any_ user-facing change? No, other than allowing this type of query to succeed. ### How was this patch tested? New unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#48627 from bersprockets/aggregate_in_set_issue. Authored-by: Bruce Robbins <bersprockets@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit e02ff1c) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
huangxiaopingRD
pushed a commit
that referenced
this pull request
Sep 2, 2025
…edExecutorBackend
### What changes were proposed in this pull request?
Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an executor process randomly gets stuck
### Why are the changes needed?
For each executor, the single-threaded dispatcher can run into an "infinite loop" (as explained in the SPARK-45227). Once an executor process runs into a state, it'd stop launching tasks from the driver or reporting task status back.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
```
$ build/mvn package -DskipTests -pl core
$ build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite test
```
### Was this patch authored or co-authored using generative AI tooling?
No
******************************************************************************
**_Please feel free to skip reading unless you're interested in details_**
******************************************************************************
### Symptom
Our Spark 3 app running on EMR 6.10.0 with Spark 3.3.1 got stuck in the very last step of writing a data frame to S3 by calling `df.write`. Looking at Spark UI, we saw that an executor process hung over 1 hour. After we manually killed the executor process, the app succeeded. Note that the same EMR cluster with two worker nodes was able to run the same app without any issue before and after the incident.
Below is what's observed from relevant container logs and thread dump.
- A regular task that's sent to the executor, which also reported back to the driver upon the task completion.
```
$zgrep 'task 150' container_1694029806204_12865_01_000001/stderr.gz
23/09/12 18:13:55 INFO TaskSetManager: Starting task 150.0 in stage 23.0 (TID 923) (ip-10-0-185-107.ec2.internal, executor 3, partition 150, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()
23/09/12 18:13:55 INFO TaskSetManager: Finished task 150.0 in stage 23.0 (TID 923) in 126 ms on ip-10-0-185-107.ec2.internal (executor 3) (16/200)
$zgrep ' 923' container_1694029806204_12865_01_000004/stderr.gz
23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 923
$zgrep 'task 150' container_1694029806204_12865_01_000004/stderr.gz
23/09/12 18:13:55 INFO Executor: Running task 150.0 in stage 23.0 (TID 923)
23/09/12 18:13:55 INFO Executor: Finished task 150.0 in stage 23.0 (TID 923). 4495 bytes result sent to driver
```
- Another task that's sent to the executor but didn't get launched since the single-threaded dispatcher was stuck (presumably in an "infinite loop" as explained later).
```
$zgrep 'task 153' container_1694029806204_12865_01_000001/stderr.gz
23/09/12 18:13:55 INFO TaskSetManager: Starting task 153.0 in stage 23.0 (TID 924) (ip-10-0-185-107.ec2.internal, executor 3, partition 153, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()
$zgrep ' 924' container_1694029806204_12865_01_000004/stderr.gz
23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 924
$zgrep 'task 153' container_1694029806204_12865_01_000004/stderr.gz
>> note that the above command has no matching result, indicating that task 153.0 in stage 23.0 (TID 924) was never launched
```
- Thread dump shows that the dispatcher-Executor thread has the following stack trace.
```
"dispatcher-Executor" #40 daemon prio=5 os_prio=0 tid=0x0000ffff98e37800 nid=0x1aff runnable [0x0000ffff73bba000]
java.lang.Thread.State: RUNNABLE
at scala.runtime.BoxesRunTime.equalsNumObject(BoxesRunTime.java:142)
at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:131)
at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123)
at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365)
at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365)
at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44)
at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140)
at scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:169)
at scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167)
at scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44)
at scala.collection.mutable.HashMap.put(HashMap.scala:126)
at scala.collection.mutable.HashMap.update(HashMap.scala:131)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:200)
at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
at org.apache.spark.rpc.netty.Inbox$$Lambda$323/1930826709.apply$mcV$sp(Unknown Source)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
```
### Relevant code paths
Within an executor process, there's a [dispatcher thread](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L170) dedicated to CoarseGrainedExecutorBackend(a single RPC endpoint) that launches tasks scheduled by the driver. Each task is run on a TaskRunner thread backed by a thread pool created for the executor. The TaskRunner thread and the dispatcher thread are different. However, they read and write a common object (i.e., taskResources) that's a mutable hashmap without thread-safety, in [Executor](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/Executor.scala#L561) and [CoarseGrainedExecutorBackend](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L189), respectively.
### What's going on?
Based on the above observations, our hypothesis is that the dispatcher thread runs into an "infinite loop" due to a race condition when two threads access the same hashmap object. For illustration purpose, let's consider the following scenario where two threads (Thread 1 and Thread 2) access a hash table without thread-safety
- Thread 1 sees A.next = B, but then yields execution to Thread 2
<img src="https://issues.apache.org/jira/secure/attachment/13063040/13063040_hashtable1.png" width="400">
- Thread 2 triggers a resize operation resulting in B.next = A (Note that hashmap doesn't care about ordering), and then yields execution to Thread 1.
<img src="https://issues.apache.org/jira/secure/attachment/13063041/13063041_hashtable2.png" width="400">
- After taking over CPU, Thread 1 would run into an "infinite loop" when traversing the list in the last bucket, given A.next = B and B.next = A in its view.
Closes apache#43021 from xiongbo-sjtu/master.
Authored-by: Bo Xiong <xiongbo@amazon.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 8e6b160)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
huangxiaopingRD
pushed a commit
that referenced
this pull request
Sep 2, 2025
…edExecutorBackend
### What changes were proposed in this pull request?
Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an executor process randomly gets stuck
### Why are the changes needed?
For each executor, the single-threaded dispatcher can run into an "infinite loop" (as explained in the SPARK-45227). Once an executor process runs into a state, it'd stop launching tasks from the driver or reporting task status back.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
```
$ build/mvn package -DskipTests -pl core
$ build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite test
```
### Was this patch authored or co-authored using generative AI tooling?
No
******************************************************************************
**_Please feel free to skip reading unless you're interested in details_**
******************************************************************************
### Symptom
Our Spark 3 app running on EMR 6.10.0 with Spark 3.3.1 got stuck in the very last step of writing a data frame to S3 by calling `df.write`. Looking at Spark UI, we saw that an executor process hung over 1 hour. After we manually killed the executor process, the app succeeded. Note that the same EMR cluster with two worker nodes was able to run the same app without any issue before and after the incident.
Below is what's observed from relevant container logs and thread dump.
- A regular task that's sent to the executor, which also reported back to the driver upon the task completion.
```
$zgrep 'task 150' container_1694029806204_12865_01_000001/stderr.gz
23/09/12 18:13:55 INFO TaskSetManager: Starting task 150.0 in stage 23.0 (TID 923) (ip-10-0-185-107.ec2.internal, executor 3, partition 150, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()
23/09/12 18:13:55 INFO TaskSetManager: Finished task 150.0 in stage 23.0 (TID 923) in 126 ms on ip-10-0-185-107.ec2.internal (executor 3) (16/200)
$zgrep ' 923' container_1694029806204_12865_01_000004/stderr.gz
23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 923
$zgrep 'task 150' container_1694029806204_12865_01_000004/stderr.gz
23/09/12 18:13:55 INFO Executor: Running task 150.0 in stage 23.0 (TID 923)
23/09/12 18:13:55 INFO Executor: Finished task 150.0 in stage 23.0 (TID 923). 4495 bytes result sent to driver
```
- Another task that's sent to the executor but didn't get launched since the single-threaded dispatcher was stuck (presumably in an "infinite loop" as explained later).
```
$zgrep 'task 153' container_1694029806204_12865_01_000001/stderr.gz
23/09/12 18:13:55 INFO TaskSetManager: Starting task 153.0 in stage 23.0 (TID 924) (ip-10-0-185-107.ec2.internal, executor 3, partition 153, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()
$zgrep ' 924' container_1694029806204_12865_01_000004/stderr.gz
23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 924
$zgrep 'task 153' container_1694029806204_12865_01_000004/stderr.gz
>> note that the above command has no matching result, indicating that task 153.0 in stage 23.0 (TID 924) was never launched
```
- Thread dump shows that the dispatcher-Executor thread has the following stack trace.
```
"dispatcher-Executor" #40 daemon prio=5 os_prio=0 tid=0x0000ffff98e37800 nid=0x1aff runnable [0x0000ffff73bba000]
java.lang.Thread.State: RUNNABLE
at scala.runtime.BoxesRunTime.equalsNumObject(BoxesRunTime.java:142)
at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:131)
at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123)
at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365)
at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365)
at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44)
at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140)
at scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:169)
at scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167)
at scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44)
at scala.collection.mutable.HashMap.put(HashMap.scala:126)
at scala.collection.mutable.HashMap.update(HashMap.scala:131)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:200)
at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
at org.apache.spark.rpc.netty.Inbox$$Lambda$323/1930826709.apply$mcV$sp(Unknown Source)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
```
### Relevant code paths
Within an executor process, there's a [dispatcher thread](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L170) dedicated to CoarseGrainedExecutorBackend(a single RPC endpoint) that launches tasks scheduled by the driver. Each task is run on a TaskRunner thread backed by a thread pool created for the executor. The TaskRunner thread and the dispatcher thread are different. However, they read and write a common object (i.e., taskResources) that's a mutable hashmap without thread-safety, in [Executor](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/Executor.scala#L561) and [CoarseGrainedExecutorBackend](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L189), respectively.
### What's going on?
Based on the above observations, our hypothesis is that the dispatcher thread runs into an "infinite loop" due to a race condition when two threads access the same hashmap object. For illustration purpose, let's consider the following scenario where two threads (Thread 1 and Thread 2) access a hash table without thread-safety
- Thread 1 sees A.next = B, but then yields execution to Thread 2
<img src="https://issues.apache.org/jira/secure/attachment/13063040/13063040_hashtable1.png" width="400">
- Thread 2 triggers a resize operation resulting in B.next = A (Note that hashmap doesn't care about ordering), and then yields execution to Thread 1.
<img src="https://issues.apache.org/jira/secure/attachment/13063041/13063041_hashtable2.png" width="400">
- After taking over CPU, Thread 1 would run into an "infinite loop" when traversing the list in the last bucket, given A.next = B and B.next = A in its view.
Closes apache#43021 from xiongbo-sjtu/master.
Authored-by: Bo Xiong <xiongbo@amazon.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 8e6b160)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
pull bot
pushed a commit
that referenced
this pull request
Nov 1, 2025
### What changes were proposed in this pull request? This PR proposes to add `doCanonicalize` function for DataSourceV2ScanRelation. The implementation is similar to [the one in BatchScanExec](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L150), as well as the [the one in LogicalRelation](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L52). ### Why are the changes needed? Query optimization rules such as MergeScalarSubqueries check if two plans are identical by [comparing their canonicalized form](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala#L219). For DSv2, for physical plan, the canonicalization goes down in the child hierarchy to the BatchScanExec, which [has a doCanonicalize function](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L150); for logical plan, the canonicalization goes down to the DataSourceV2ScanRelation, which, however, does not have a doCanonicalize function. As a result, two logical plans who are semantically identical are not identified. Moreover, for reference, [DSv1 LogicalRelation](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L52) also has `doCanonicalize()`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? A new unit test is added to show that `MergeScalarSubqueries` is working for DataSourceV2ScanRelation. For a query ```sql select (select max(i) from df) as max_i, (select min(i) from df) as min_i ``` Before introducing the canonicalization, the plan is ``` == Parsed Logical Plan == 'Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- 'Project [unresolvedalias('max('i))] : : +- 'UnresolvedRelation [df], [], false : +- 'Project [unresolvedalias('min('i))] : +- 'UnresolvedRelation [df], [], false +- OneRowRelation == Analyzed Logical Plan == max_i: int, min_i: int Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- Aggregate [max(i#0) AS max(i)#7] : : +- SubqueryAlias df : : +- View (`df`, [i#0, j#1]) : : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 : +- Aggregate [min(i#10) AS min(i)#9] : +- SubqueryAlias df : +- View (`df`, [i#10, j#11]) : +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 +- OneRowRelation == Optimized Logical Plan == Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- Aggregate [max(i#0) AS max(i)#7] : : +- Project [i#0] : : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 : +- Aggregate [min(i#10) AS min(i)#9] : +- Project [i#10] : +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 +- OneRowRelation == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 0 +- *(1) Project [Subquery subquery#2, [id=#32] AS max_i#3, Subquery subquery#4, [id=#33] AS min_i#5] : :- Subquery subquery#2, [id=#32] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=58] +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14]) +- *(1) Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=19] +- HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14]) +- Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] : +- Subquery subquery#4, [id=#33] : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=63] +- *(1) HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15]) +- *(1) Project [i#10] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30] +- HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15]) +- Project [i#10] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- *(1) Scan OneRowRelation[] +- == Initial Plan == Project [Subquery subquery#2, [id=#32] AS max_i#3, Subquery subquery#4, [id=#33] AS min_i#5] : :- Subquery subquery#2, [id=#32] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=58] +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14]) +- *(1) Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=19] +- HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14]) +- Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] : +- Subquery subquery#4, [id=#33] : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=63] +- *(1) HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15]) +- *(1) Project [i#10] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30] +- HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15]) +- Project [i#10] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- Scan OneRowRelation[] ``` After introducing the canonicalization, the plan is as following, where you can see **ReusedSubquery** ``` == Parsed Logical Plan == 'Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- 'Project [unresolvedalias('max('i))] : : +- 'UnresolvedRelation [df], [], false : +- 'Project [unresolvedalias('min('i))] : +- 'UnresolvedRelation [df], [], false +- OneRowRelation == Analyzed Logical Plan == max_i: int, min_i: int Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- Aggregate [max(i#0) AS max(i)#7] : : +- SubqueryAlias df : : +- View (`df`, [i#0, j#1]) : : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 : +- Aggregate [min(i#10) AS min(i)#9] : +- SubqueryAlias df : +- View (`df`, [i#10, j#11]) : +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 +- OneRowRelation == Optimized Logical Plan == Project [scalar-subquery#2 [].max(i) AS max_i#3, scalar-subquery#4 [].min(i) AS min_i#5] : :- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] : : +- Aggregate [max(i#0) AS max(i)#7, min(i#0) AS min(i)#9] : : +- Project [i#0] : : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 : +- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] : +- Aggregate [max(i#0) AS max(i)#7, min(i#0) AS min(i)#9] : +- Project [i#0] : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 +- OneRowRelation == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 0 +- *(1) Project [Subquery subquery#2, [id=#40].max(i) AS max_i#3, ReusedSubquery Subquery subquery#2, [id=#40].min(i) AS min_i#5] : :- Subquery subquery#2, [id=#40] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] +- *(2) HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=71] +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) +- *(1) Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=22] +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) +- Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] : +- ReusedSubquery Subquery subquery#2, [id=#40] +- *(1) Scan OneRowRelation[] +- == Initial Plan == Project [Subquery subquery#2, [id=#40].max(i) AS max_i#3, Subquery subquery#4, [id=#41].min(i) AS min_i#5] : :- Subquery subquery#2, [id=#40] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] +- *(2) HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=71] +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) +- *(1) Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=22] +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) +- Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] : +- Subquery subquery#4, [id=#41] : +- AdaptiveSparkPlan isFinalPlan=false : +- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] : +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=37] : +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) : +- Project [i#0] : +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- Scan OneRowRelation[] ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52529 from yhuang-db/scan-canonicalization. Authored-by: yhuang-db <itisyuchuan@gmail.com> Signed-off-by: Peter Toth <peter.toth@gmail.com>
huangxiaopingRD
pushed a commit
that referenced
this pull request
Nov 25, 2025
### What changes were proposed in this pull request? This PR proposes to add `doCanonicalize` function for DataSourceV2ScanRelation. The implementation is similar to [the one in BatchScanExec](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L150), as well as the [the one in LogicalRelation](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L52). ### Why are the changes needed? Query optimization rules such as MergeScalarSubqueries check if two plans are identical by [comparing their canonicalized form](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala#L219). For DSv2, for physical plan, the canonicalization goes down in the child hierarchy to the BatchScanExec, which [has a doCanonicalize function](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L150); for logical plan, the canonicalization goes down to the DataSourceV2ScanRelation, which, however, does not have a doCanonicalize function. As a result, two logical plans who are semantically identical are not identified. Moreover, for reference, [DSv1 LogicalRelation](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L52) also has `doCanonicalize()`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? A new unit test is added to show that `MergeScalarSubqueries` is working for DataSourceV2ScanRelation. For a query ```sql select (select max(i) from df) as max_i, (select min(i) from df) as min_i ``` Before introducing the canonicalization, the plan is ``` == Parsed Logical Plan == 'Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- 'Project [unresolvedalias('max('i))] : : +- 'UnresolvedRelation [df], [], false : +- 'Project [unresolvedalias('min('i))] : +- 'UnresolvedRelation [df], [], false +- OneRowRelation == Analyzed Logical Plan == max_i: int, min_i: int Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- Aggregate [max(i#0) AS max(i)#7] : : +- SubqueryAlias df : : +- View (`df`, [i#0, j#1]) : : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 : +- Aggregate [min(i#10) AS min(i)#9] : +- SubqueryAlias df : +- View (`df`, [i#10, j#11]) : +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 +- OneRowRelation == Optimized Logical Plan == Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- Aggregate [max(i#0) AS max(i)#7] : : +- Project [i#0] : : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 : +- Aggregate [min(i#10) AS min(i)#9] : +- Project [i#10] : +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 +- OneRowRelation == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 0 +- *(1) Project [Subquery subquery#2, [id=#32] AS max_i#3, Subquery subquery#4, [id=#33] AS min_i#5] : :- Subquery subquery#2, [id=#32] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=58] +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14]) +- *(1) Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=19] +- HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14]) +- Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] : +- Subquery subquery#4, [id=#33] : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=63] +- *(1) HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15]) +- *(1) Project [i#10] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30] +- HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15]) +- Project [i#10] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- *(1) Scan OneRowRelation[] +- == Initial Plan == Project [Subquery subquery#2, [id=#32] AS max_i#3, Subquery subquery#4, [id=#33] AS min_i#5] : :- Subquery subquery#2, [id=#32] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=58] +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14]) +- *(1) Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=19] +- HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14]) +- Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] : +- Subquery subquery#4, [id=#33] : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=63] +- *(1) HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15]) +- *(1) Project [i#10] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30] +- HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15]) +- Project [i#10] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- Scan OneRowRelation[] ``` After introducing the canonicalization, the plan is as following, where you can see **ReusedSubquery** ``` == Parsed Logical Plan == 'Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- 'Project [unresolvedalias('max('i))] : : +- 'UnresolvedRelation [df], [], false : +- 'Project [unresolvedalias('min('i))] : +- 'UnresolvedRelation [df], [], false +- OneRowRelation == Analyzed Logical Plan == max_i: int, min_i: int Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- Aggregate [max(i#0) AS max(i)#7] : : +- SubqueryAlias df : : +- View (`df`, [i#0, j#1]) : : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 : +- Aggregate [min(i#10) AS min(i)#9] : +- SubqueryAlias df : +- View (`df`, [i#10, j#11]) : +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 +- OneRowRelation == Optimized Logical Plan == Project [scalar-subquery#2 [].max(i) AS max_i#3, scalar-subquery#4 [].min(i) AS min_i#5] : :- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] : : +- Aggregate [max(i#0) AS max(i)#7, min(i#0) AS min(i)#9] : : +- Project [i#0] : : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 : +- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] : +- Aggregate [max(i#0) AS max(i)#7, min(i#0) AS min(i)#9] : +- Project [i#0] : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 +- OneRowRelation == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 0 +- *(1) Project [Subquery subquery#2, [id=#40].max(i) AS max_i#3, ReusedSubquery Subquery subquery#2, [id=#40].min(i) AS min_i#5] : :- Subquery subquery#2, [id=#40] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] +- *(2) HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=71] +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) +- *(1) Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=22] +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) +- Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] : +- ReusedSubquery Subquery subquery#2, [id=#40] +- *(1) Scan OneRowRelation[] +- == Initial Plan == Project [Subquery subquery#2, [id=#40].max(i) AS max_i#3, Subquery subquery#4, [id=#41].min(i) AS min_i#5] : :- Subquery subquery#2, [id=#40] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] +- *(2) HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=71] +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) +- *(1) Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=22] +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) +- Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] : +- Subquery subquery#4, [id=#41] : +- AdaptiveSparkPlan isFinalPlan=false : +- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] : +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=37] : +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) : +- Project [i#0] : +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- Scan OneRowRelation[] ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52529 from yhuang-db/scan-canonicalization. Authored-by: yhuang-db <itisyuchuan@gmail.com> Signed-off-by: Peter Toth <peter.toth@gmail.com>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
See Commits and Changes for more details.
Created by
pull[bot]
Can you help keep this open source service alive? 💖 Please sponsor : )