forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 49
Ky 945, sync from apache spark #401
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
fishcus
merged 169 commits into
Kyligence:kyspark-3.1.1.x-4.x-xuanwu
from
fishcus:KY-945
Feb 16, 2022
Merged
Ky 945, sync from apache spark #401
fishcus
merged 169 commits into
Kyligence:kyspark-3.1.1.x-4.x-xuanwu
from
fishcus:KY-945
Feb 16, 2022
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…already known by the scheduler backend in the pod allocator ### What changes were proposed in this pull request? This PR modifies the POD allocator to use the scheduler backend to get the known executors and remove those from the pending and newly created list. This is different from the normal `ExecutorAllocationManager` requested killing of executors where the `spark.dynamicAllocation.executorIdleTimeout` is used. In this case POD allocator kills the executors which should be only responsible for terminating not satisfied POD allocations (new requests where no POD state is received yet and PODs in pending state). ### Why are the changes needed? Because there is race between executor POD allocator and cluster scheduler backend. Running several experiment during downscaling we experienced a lot of killed fresh executors wich has already running task on them. The pattern in the log was the following (see executor 312 and TID 2079): ``` 21/02/01 15:12:03 INFO ExecutorMonitor: New executor 312 has registered (new total is 138) ... 21/02/01 15:12:03 INFO TaskSetManager: Starting task 247.0 in stage 4.0 (TID 2079, 100.100.18.138, executor 312, partition 247, PROCESS_LOCAL, 8777 bytes) 21/02/01 15:12:03 INFO ExecutorPodsAllocator: Deleting 3 excess pod requests (408,312,307). ... 21/02/01 15:12:04 ERROR TaskSchedulerImpl: Lost executor 312 on 100.100.18.138: The executor with id 312 was deleted by a user or the framework. 21/02/01 15:12:04 INFO TaskSetManager: Task 2079 failed because while it was being computed, its executor exited for a reason unrelated to the task. Not counting this failure towards the maximum number of failures for the task. ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? #### Manually With this change there was no executor lost with running task on it. ##### With unit test A new test is added and existing test is modified to check these cases. Closes apache#31513 from attilapiros/SPARK-34361. Authored-by: “attilapiros” <piros.attila.zsolt@gmail.com> Signed-off-by: Holden Karau <hkarau@apple.com> (cherry picked from commit 6c5322d) Signed-off-by: Holden Karau <hkarau@apple.com>
…akages This PR aims to add `ownerReference` to the executor ConfigMap to fix leakage. SPARK-30985 maintains the executor config map explicitly inside Spark. However, this config map can be leaked when Spark drivers die accidentally or are killed by K8s. We need to add `ownerReference` to make K8s do the garbage collection these automatically. The number of ConfigMap is one of the resource quota. So, the leaked configMaps currently cause Spark jobs submission failures. No. Pass the CIs and check manually. K8s IT is tested manually. ``` KubernetesSuite: - Run SparkPi with no resources - Run SparkPi with a very long application name. - Use SparkLauncher.NO_RESOURCE - Run SparkPi with a master URL without a scheme. - Run SparkPi with an argument. - Run SparkPi with custom labels, annotations, and environment variables. - All pods have the same service account by default - Run extraJVMOptions check on driver - Run SparkRemoteFileTest using a remote data file - Verify logging configuration is picked from the provided SPARK_CONF_DIR/log4j.properties - Run SparkPi with env and mount secrets. - Run PySpark on simple pi.py example - Run PySpark to test a pyfiles example - Run PySpark with memory customization - Run in client mode. - Start pod creation from template - PVs with local storage - Launcher client dependencies - SPARK-33615: Launcher client archives - SPARK-33748: Launcher python client respecting PYSPARK_PYTHON - SPARK-33748: Launcher python client respecting spark.pyspark.python and spark.pyspark.driver.python - Launcher python client dependencies using a zip file - Test basic decommissioning - Test basic decommissioning with shuffle cleanup - Test decommissioning with dynamic allocation & shuffle cleanups - Test decommissioning timeouts - Run SparkR on simple dataframe.R example Run completed in 19 minutes, 2 seconds. Total number of tests run: 27 Suites: completed 2, aborted 0 Tests: succeeded 27, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` **BEFORE** ``` $ k get cm spark-exec-450b417895b3b2c7-conf-map -oyaml | grep ownerReferences ``` **AFTER** ``` $ k get cm spark-exec-bb37a27895b1c26c-conf-map -oyaml | grep ownerReferences f:ownerReferences: ``` Closes apache#32042 from dongjoon-hyun/SPARK-34948. Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com> (cherry picked from commit a42dc93) Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
….blockmanager.port` in BasicExecutorFeatureStep ### What changes were proposed in this pull request? most spark conf keys are case sensitive, including `spark.blockManager.port`, we can not get the correct port number with `spark.blockmanager.port`. This PR changes the wrong key to `spark.blockManager.port` in `BasicExecutorFeatureStep`. This PR also ensures a fast fail when the port value is invalid for executor containers. When 0 is specified(it is valid as random port, but invalid as a k8s request), it should not be put in the `containerPort` field of executor pod desc. We do not expect executor pods to continuously fail to create because of invalid requests. ### Why are the changes needed? bugfix ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests Closes apache#32621 from yaooqinn/SPARK-35482. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Dongjoon Hyun <dhyun@apple.com> (cherry picked from commit d957426) Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
….driver.blockManager.port` as same as other cluster managers ### What changes were proposed in this pull request? `spark.blockManager.port` does not work for k8s driver pods now, we should make it work as other cluster managers. ### Why are the changes needed? `spark.blockManager.port` should be able to work for spark driver pod ### Does this PR introduce _any_ user-facing change? yes, `spark.blockManager.port` will be respect iff it is present && `spark.driver.blockManager.port` is absent ### How was this patch tested? new tests Closes apache#32639 from yaooqinn/SPARK-35493. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Dongjoon Hyun <dhyun@apple.com> (cherry picked from commit 96b0548) Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…cutors start ### What changes were proposed in this pull request? Add a new config that controls the timeout of waiting for driver pod's readiness before allocating executor pods. This wait only happens once on application start. ### Why are the changes needed? The driver's headless service can be resolved by DNS only after the driver pod is ready. If the executor tries to connect to the headless service before driver pod is ready, it will hit UnkownHostException and get into error state but will not be restarted. **This case usually happens when the driver pod has sidecar containers but hasn't finished their creation when executors start.** So basically there is a race condition. This issue can be mitigated by tweaking this config. ### Does this PR introduce _any_ user-facing change? A new config `spark.kubernetes.allocation.driver.readinessTimeout` added. ### How was this patch tested? Exisiting tests. Closes apache#32752 from cchriswu/SPARK-32975-fix. Lead-authored-by: Chris Wu <wucaowei19@gmail.com> Co-authored-by: Chris Wu <wcaowei@vmware.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com> (cherry picked from commit 497c80a) Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request? A follow-up for SPARK-32975 to avoid unexpected the `None.get` exception Run SparkPi with docker desktop, as podName is an option, we will got ```logtalk 21/06/09 01:09:12 ERROR Utils: Uncaught exception in thread main java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:529) at scala.None$.get(Option.scala:527) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1(ExecutorPodsAllocator.scala:110) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1417) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.start(ExecutorPodsAllocator.scala:111) at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.start(KubernetesClusterSchedulerBackend.scala:99) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:220) at org.apache.spark.SparkContext.<init>(SparkContext.scala:581) at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2686) at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:948) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:942) at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:30) at org.apache.spark.examples.SparkPi.main(SparkPi.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ``` ### Why are the changes needed? fix a regression ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Manual. Closes apache#32830 from yaooqinn/SPARK-32975. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Dongjoon Hyun <dhyun@apple.com> (cherry picked from commit b4b78ce) Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Print the driver pod name instead of Some(name) if absent fix error hint no new test Closes apache#32889 from yaooqinn/minork8s. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 1125afd) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…p` instead of `startTime` SPARK-33099 added the support to respect `spark.dynamicAllocation.executorIdleTimeout` in `ExecutorPodsAllocator`. However, when it checks if a pending executor pod is timed out, it checks against the pod's [startTime](https://github.com/kubernetes/api/blob/2a5dae08c42b1e8fdc1379432d8898efece65363/core/v1/types.go#L3664-L3667), see code [here](https://github.com/apache/spark/blob/c2ba498ff678ddda034cedf45cc17fbeefe922fd/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala#L459). A pending pod `startTime` is empty, and this causes the function `isExecutorIdleTimedOut()` always return true for pending pods. This can be reproduced locally, run the following job ``` ${SPARK_HOME}/bin/spark-submit --master k8s://http://localhost:8001 --deploy-mode cluster --name spark-group-example \ --master k8s://http://localhost:8001 --deploy-mode cluster \ --class org.apache.spark.examples.GroupByTest \ --conf spark.executor.instances=1 \ --conf spark.kubernetes.namespace=spark-test \ --conf spark.kubernetes.executor.request.cores=1 \ --conf spark.dynamicAllocation.enabled=true \ --conf spark.shuffle.service.enabled=true \ --conf spark.dynamicAllocation.shuffleTracking.enabled=true \ --conf spark.shuffle.service.enabled=false \ --conf spark.kubernetes.container.image=local/spark:3.3.0 \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ local:///opt/spark/examples/jars/spark-examples_2.12-3.3.0-SNAPSHOT.jar \ 1000 1000 100 1000 ``` the local cluster doesn't have enough resources to run more than 4 executors, the rest of the executor pods will be pending. The job will have task backlogs and triggers to request more executors from K8s: ``` 21/10/19 22:51:45 INFO ExecutorPodsAllocator: Going to request 1 executors from Kubernetes for ResourceProfile Id: 0, target: 1 running: 0. 21/10/19 22:51:51 INFO ExecutorPodsAllocator: Going to request 1 executors from Kubernetes for ResourceProfile Id: 0, target: 2 running: 1. 21/10/19 22:51:52 INFO ExecutorPodsAllocator: Going to request 2 executors from Kubernetes for ResourceProfile Id: 0, target: 4 running: 2. 21/10/19 22:51:53 INFO ExecutorPodsAllocator: Going to request 4 executors from Kubernetes for ResourceProfile Id: 0, target: 8 running: 4. ... 21/10/19 22:52:14 INFO ExecutorPodsAllocator: Deleting 39 excess pod requests (23,59,32,41,50,68,35,44,17,8,53,62,26,71,11,56,29,38,47,20,65,5,14,46,64,73,55,49,40,67,58,13,22,31,7,16,52,70,43). 21/10/19 22:52:18 INFO ExecutorPodsAllocator: Deleting 28 excess pod requests (25,34,61,37,10,19,28,60,69,63,45,54,72,36,18,9,27,21,57,12,48,30,39,66,15,42,24,33). ``` At `22:51:45`, it starts to request executors; and at `22:52:14` it starts to delete excess executor pods. This is 29s but spark.dynamicAllocation.executorIdleTimeout is set to 60s. The config was not honored. ### What changes were proposed in this pull request? Change the check from using pod's `startTime` to `creationTimestamp`. [creationTimestamp](https://github.com/kubernetes/apimachinery/blob/e6c90c4366be1504309a6aafe0d816856450f36a/pkg/apis/meta/v1/types.go#L193-L201) is the timestamp when a pod gets created on K8s: ``` // CreationTimestamp is a timestamp representing the server time when this object was // created. It is not guaranteed to be set in happens-before order across separate operations. // Clients may not set this value. It is represented in RFC3339 form and is in UTC. ``` [startTime](https://github.com/kubernetes/api/blob/2a5dae08c42b1e8fdc1379432d8898efece65363/core/v1/types.go#L3664-L3667) is the timestamp when pod gets started: ``` // RFC 3339 date and time at which the object was acknowledged by the Kubelet. // This is before the Kubelet pulled the container image(s) for the pod. // +optional ``` a pending pod's startTime is empty. Here is a example of a pending pod: ``` NAMESPACE NAME READY STATUS RESTARTS AGE default pending-pod-example 0/1 Pending 0 2s kubectl get pod pending-pod-example -o yaml | grep creationTimestamp ---> creationTimestamp: "2021-10-19T16:17:52Z" // pending pod has no startTime kubectl get pod pending-pod-example -o yaml | grep startTime ---> // empty // running pod has startTime set to the timestamp when the pod gets started kubectl get pod coredns-558bd4d5db-6qrtx -n kube-system -o yaml | grep startTime f:startTime: {} ---> startTime: "2021-08-04T23:44:44Z" ``` ### Why are the changes needed? This fixed the issue that `spark.dynamicAllocation.executorIdleTimeout` currently is not honored by pending executor pods. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The PR includes the UT changes, that has the testing coverage for this issue. Closes apache#34319 from yangwwei/SPARK-37049. Authored-by: Weiwei Yang <wyang@cloudera.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 041cd5d) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…t]Source` to DeveloperApi ### What changes were proposed in this pull request? This PR aims to promote `ExecutorPodsWatchSnapshotSource` and `ExecutorPodsPollingSnapshotSource` as **stable** `DeveloperApi` in order to maintain it officially in a backward compatible way at Apache Spark 3.3.0. ### Why are the changes needed? - Since SPARK-24248 at Apache Spark 2.4.0, `ExecutorPodsWatchSnapshotSource` and `ExecutorPodsPollingSnapshotSource` have been used to monitor executor pods without any interface changes for over 3 years. - Apache Spark 3.1.1 makes `Kubernetes` module GA and provides an extensible external cluster manager framework. New `ExternalClusterManager` for K8s environment need to depend on this to monitor pods. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual review. Closes apache#34751 from dongjoon-hyun/SPARK-37497. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 2b04496) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…shutting down ### What changes were proposed in this pull request? This PR prevents reregistering BlockManager when a Executor is shutting down. It is achieved by checking `executorShutdown` before calling `env.blockManager.reregister()`. ### Why are the changes needed? This change is required since Spark reports executors as active, even they are removed. I was testing Dynamic Allocation on K8s with about 300 executors. While doing so, when the executors were torn down due to `spark.dynamicAllocation.executorIdleTimeout`, I noticed all the executor pods being removed from K8s, however, under the "Executors" tab in SparkUI, I could see some executors listed as alive. [spark.sparkContext.statusTracker.getExecutorInfos.length](https://github.com/apache/spark/blob/65da9287bc5112564836a555cd2967fc6b05856f/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala#L105) also returned a value greater than 1. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a new test. ## Logs Following are the logs of the executor(Id:303) which re-registers `BlockManager` ``` 21/04/02 21:33:28 INFO CoarseGrainedExecutorBackend: Got assigned task 1076 21/04/02 21:33:28 INFO Executor: Running task 4.0 in stage 3.0 (TID 1076) 21/04/02 21:33:28 INFO MapOutputTrackerWorker: Updating epoch to 302 and clearing cache 21/04/02 21:33:28 INFO TorrentBroadcast: Started reading broadcast variable 3 21/04/02 21:33:28 INFO TransportClientFactory: Successfully created connection to /100.100.195.227:33703 after 76 ms (62 ms spent in bootstraps) 21/04/02 21:33:28 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 2.4 KB, free 168.0 MB) 21/04/02 21:33:28 INFO TorrentBroadcast: Reading broadcast variable 3 took 168 ms 21/04/02 21:33:28 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 3.9 KB, free 168.0 MB) 21/04/02 21:33:29 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 1, fetching them 21/04/02 21:33:29 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTrackerda-lite-test-4-7a57e478947d206d-driver-svc.dex-app-n5ttnbmg.svc:7078) 21/04/02 21:33:29 INFO MapOutputTrackerWorker: Got the output locations 21/04/02 21:33:29 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks including 1 local blocks and 1 remote blocks 21/04/02 21:33:30 INFO TransportClientFactory: Successfully created connection to /100.100.80.103:40971 after 660 ms (528 ms spent in bootstraps) 21/04/02 21:33:30 INFO ShuffleBlockFetcherIterator: Started 1 remote fetches in 1042 ms 21/04/02 21:33:31 INFO Executor: Finished task 4.0 in stage 3.0 (TID 1076). 1276 bytes result sent to driver . . . 21/04/02 21:34:16 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown 21/04/02 21:34:16 INFO Executor: Told to re-register on heartbeat 21/04/02 21:34:16 INFO BlockManager: BlockManager BlockManagerId(303, 100.100.122.34, 41265, None) re-registering with master 21/04/02 21:34:16 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(303, 100.100.122.34, 41265, None) 21/04/02 21:34:16 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(303, 100.100.122.34, 41265, None) 21/04/02 21:34:16 INFO BlockManager: Reporting 0 blocks to the master. 21/04/02 21:34:16 INFO MemoryStore: MemoryStore cleared 21/04/02 21:34:16 INFO BlockManager: BlockManager stopped 21/04/02 21:34:16 INFO FileDataSink: Closing sink with output file = /tmp/safari-events/.des_analysis/safari-events/hdp_spark_monitoring_random-container-037caf27-6c77-433f-820f-03cd9c7d9b6e-spark-8a492407d60b401bbf4309a14ea02ca2_events.tsv 21/04/02 21:34:16 INFO HonestProfilerBasedThreadSnapshotProvider: Stopping agent 21/04/02 21:34:16 INFO HonestProfilerHandler: Stopping honest profiler agent 21/04/02 21:34:17 INFO ShutdownHookManager: Shutdown hook called 21/04/02 21:34:17 INFO ShutdownHookManager: Deleting directory /var/data/spark-d886588c-2a7e-491d-bbcb-4f58b3e31001/spark-4aa337a0-60c0-45da-9562-8c50eaff3cea ``` Closes apache#32043 from sumeetgajjar/SPARK-34949. Authored-by: Sumeet Gajjar <sumeetgajjar93@gmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit a9ca197) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
… finished ### What changes were proposed in this pull request? Close SparkContext after the Main method has finished, to allow SparkApplication on K8S to complete. This is fixed version of [merged and reverted PR](apache#32081). ### Why are the changes needed? if I don't call the method sparkContext.stop() explicitly, then a Spark driver process doesn't terminate even after its Main method has been completed. This behaviour is different from spark on yarn, where the manual sparkContext stopping is not required. It looks like, the problem is in using non-daemon threads, which prevent the driver jvm process from terminating. So I have inserted code that closes sparkContext automatically. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually on the production AWS EKS environment in my company. Closes apache#32283 from kotlovs/close-spark-context-on-exit-2. Authored-by: skotlov <skotlov@joom.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com> (cherry picked from commit b17a0e6) Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…ext in non-K8s env ### What changes were proposed in this pull request? According to the discussion on apache#32283 , this PR aims to limit the feature of SPARK-34674 to K8s environment only. ### Why are the changes needed? To reduce the behavior change in non-K8s environment. ### Does this PR introduce _any_ user-facing change? The change behavior is consistent with 3.1.1 and older Spark releases. ### How was this patch tested? N/A Closes apache#33403 from dongjoon-hyun/SPARK-36193. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit fd3e9ce) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…lectFetchRequests ### What changes were proposed in this pull request? This PR fixes perf regression at the executor side when creating fetch requests with large initial partitions  In NetEase, we had an online job that took `45min` to "fetch" about 100MB of shuffle data, which actually turned out that it was just collecting fetch requests slowly. Normally, such a task should finish in seconds. See the `DEBUG` log ``` 21/06/22 11:52:26 DEBUG BlockManagerStorageEndpoint: Sent response: 0 to kyuubi.163.org: 21/06/22 11:53:05 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 3941440 at BlockManagerId(12, .., 43559, None) with 19 blocks 21/06/22 11:53:44 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 3716400 at BlockManagerId(20, .., 38287, None) with 18 blocks 21/06/22 11:54:41 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 4559280 at BlockManagerId(6, .., 39689, None) with 22 blocks 21/06/22 11:55:08 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 3120160 at BlockManagerId(33, .., 39449, None) with 15 blocks ``` I also create a test case locally with my local laptop docker env to give some reproducible cases. ``` bin/spark-sql --conf spark.kubernetes.file.upload.path=./ --master k8s://https://kubernetes.docker.internal:6443 --conf spark.kubernetes.container.image=yaooqinn/spark:v20210624-5 -c spark.kubernetes.context=docker-for-desktop_1 --num-executors 5 --driver-memory 5g --conf spark.kubernetes.executor.podNamePrefix=sparksql ``` ```sql SET spark.sql.adaptive.enabled=true; SET spark.sql.shuffle.partitions=3000; SELECT /*+ REPARTITION */ 1 as pid, id from range(1, 1000000, 1, 500); SELECT /*+ REPARTITION(pid, id) */ 1 as pid, id from range(1, 1000000, 1, 500); ``` ### Why are the changes needed? fix perf regression which was introduced by SPARK-29292 (3ad4863) in v3.1.0. 3ad4863 is for support compilation with scala 2.13 but the performance losses is huge. We need to consider backporting this PR to branch 3.1. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Mannully, #### before ```log 21/06/23 13:54:22 DEBUG ShuffleBlockFetcherIterator: maxBytesInFlight: 50331648, targetRemoteRequestSize: 10066329, maxBlocksInFlightPerAddress: 2147483647 21/06/23 13:54:38 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 2314708 at BlockManagerId(2, 10.1.3.114, 36423, None) with 86 blocks 21/06/23 13:54:59 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 2636612 at BlockManagerId(3, 10.1.3.115, 34293, None) with 87 blocks 21/06/23 13:55:18 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 2508706 at BlockManagerId(4, 10.1.3.116, 41869, None) with 90 blocks 21/06/23 13:55:34 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 2350854 at BlockManagerId(5, 10.1.3.117, 45787, None) with 85 blocks 21/06/23 13:55:34 INFO ShuffleBlockFetcherIterator: Getting 438 (11.8 MiB) non-empty blocks including 90 (2.5 MiB) local and 0 (0.0 B) host-local and 348 (9.4 MiB) remote blocks 21/06/23 13:55:34 DEBUG ShuffleBlockFetcherIterator: Sending request for 87 blocks (2.5 MiB) from 10.1.3.115:34293 21/06/23 13:55:34 INFO TransportClientFactory: Successfully created connection to /10.1.3.115:34293 after 1 ms (0 ms spent in bootstraps) 21/06/23 13:55:34 DEBUG ShuffleBlockFetcherIterator: Sending request for 90 blocks (2.4 MiB) from 10.1.3.116:41869 21/06/23 13:55:34 INFO TransportClientFactory: Successfully created connection to /10.1.3.116:41869 after 2 ms (0 ms spent in bootstraps) 21/06/23 13:55:34 DEBUG ShuffleBlockFetcherIterator: Sending request for 85 blocks (2.2 MiB) from 10.1.3.117:45787 ``` ```log 21/06/23 14:00:45 INFO MapOutputTracker: Broadcast outputstatuses size = 411, actual size = 828997 21/06/23 14:00:45 INFO MapOutputTrackerWorker: Got the map output locations 21/06/23 14:00:45 DEBUG ShuffleBlockFetcherIterator: maxBytesInFlight: 50331648, targetRemoteRequestSize: 10066329, maxBlocksInFlightPerAddress: 2147483647 21/06/23 14:00:55 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 1894389 at BlockManagerId(2, 10.1.3.114, 36423, None) with 99 blocks 21/06/23 14:01:04 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 1919993 at BlockManagerId(3, 10.1.3.115, 34293, None) with 100 blocks 21/06/23 14:01:14 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 1977186 at BlockManagerId(5, 10.1.3.117, 45787, None) with 103 blocks 21/06/23 14:01:23 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 1938336 at BlockManagerId(4, 10.1.3.116, 41869, None) with 101 blocks 21/06/23 14:01:23 INFO ShuffleBlockFetcherIterator: Getting 500 (9.1 MiB) non-empty blocks including 97 (1820.3 KiB) local and 0 (0.0 B) host-local and 403 (7.4 MiB) remote blocks 21/06/23 14:01:23 DEBUG ShuffleBlockFetcherIterator: Sending request for 101 blocks (1892.9 KiB) from 10.1.3.116:41869 21/06/23 14:01:23 DEBUG ShuffleBlockFetcherIterator: Sending request for 103 blocks (1930.8 KiB) from 10.1.3.117:45787 21/06/23 14:01:23 DEBUG ShuffleBlockFetcherIterator: Sending request for 99 blocks (1850.0 KiB) from 10.1.3.114:36423 21/06/23 14:01:23 DEBUG ShuffleBlockFetcherIterator: Sending request for 100 blocks (1875.0 KiB) from 10.1.3.115:34293 21/06/23 14:01:23 INFO ShuffleBlockFetcherIterator: Started 4 remote fetches in 37889 ms ``` #### After ```log 21/06/24 13:01:16 DEBUG ShuffleBlockFetcherIterator: maxBytesInFlight: 50331648, targetRemoteRequestSize: 10066329, maxBlocksInFlightPerAddress: 2147483647 21/06/24 13:01:16 INFO ShuffleBlockFetcherIterator: ==> Call blockInfos.map(_._2).sum: 40 ms 21/06/24 13:01:16 INFO ShuffleBlockFetcherIterator: ==> Call mergeFetchBlockInfo for shuffle_0_9_2990_2997/9: 0 ms 21/06/24 13:01:16 INFO ShuffleBlockFetcherIterator: ==> Call mergeFetchBlockInfo for shuffle_0_15_2395_2997/15: 0 ms ``` Closes apache#33063 from yaooqinn/SPARK-35879. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Kent Yao <yao@apache.org> (cherry picked from commit 14d4dec) Signed-off-by: Kent Yao <yao@apache.org>
### What changes were proposed in this pull request? This fixes a regression in `DataFrameReader.schema(StructType)`, to avoid NPE if the given `StructType` is null. Note that, passing null to Spark public APIs leads to undefined behavior. There is no document mentioning the null behavior, and it's just an accident that `DataFrameReader.schema(StructType)` worked before. So I think this is not a 3.1 blocker. ### Why are the changes needed? It fixes a 3.1 regression ### Does this PR introduce _any_ user-facing change? yea, now `df.read.schema(null: StructType)` is a noop as before, while in the current branch-3.1 it throws NPE. ### How was this patch tested? It's undefined behavior and is very obvious, so I didn't add a test. We should add tests when we clearly define and fix the null behavior for all public APIs. Closes apache#31593 from cloud-fan/minor. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 02c784c) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…table When resolving a view, we use the captured view name in `AnalysisContext` to distinguish whether a relation name is a view or a table. But if the resolution failed, other rules (e.g. `ResolveTables`) will try to resolve the relation again but without `AnalysisContext`. So, in this case, the resolution may be incorrect. For example, if the view refers to a dropped table while a view with the same name exists, the dropped table will be resolved as a view rather than an unresolved exception. bugfix no newly added test cases Closes apache#31606 from linhongliu-db/fix-temp-view-master. Lead-authored-by: Linhong Liu <linhong.liu@databricks.com> Co-authored-by: Linhong Liu <67896261+linhongliu-db@users.noreply.github.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit be675a0) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…titionsByFilter ### What changes were proposed in this pull request? Skip null value during rewrite `InSet` to `>= and <=` at getPartitionsByFilter. ### Why are the changes needed? Spark will convert `InSet` to `>= and <=` if it's values size over `spark.sql.hive.metastorePartitionPruningInSetThreshold` during pruning partition . At this case, if values contain a null, we will get such exception ``` java.lang.NullPointerException at org.apache.spark.unsafe.types.UTF8String.compareTo(UTF8String.java:1389) at org.apache.spark.unsafe.types.UTF8String.compareTo(UTF8String.java:50) at scala.math.LowPriorityOrderingImplicits$$anon$3.compare(Ordering.scala:153) at java.util.TimSort.countRunAndMakeAscending(TimSort.java:355) at java.util.TimSort.sort(TimSort.java:220) at java.util.Arrays.sort(Arrays.java:1438) at scala.collection.SeqLike.sorted(SeqLike.scala:659) at scala.collection.SeqLike.sorted$(SeqLike.scala:647) at scala.collection.AbstractSeq.sorted(Seq.scala:45) at org.apache.spark.sql.hive.client.Shim_v0_13.convert$1(HiveShim.scala:772) at org.apache.spark.sql.hive.client.Shim_v0_13.$anonfun$convertFilters$4(HiveShim.scala:826) at scala.collection.immutable.Stream.flatMap(Stream.scala:489) at org.apache.spark.sql.hive.client.Shim_v0_13.convertFilters(HiveShim.scala:826) at org.apache.spark.sql.hive.client.Shim_v0_13.getPartitionsByFilter(HiveShim.scala:848) at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getPartitionsByFilter$1(HiveClientImpl.scala:750) ``` ### Does this PR introduce _any_ user-facing change? Yes, bug fix. ### How was this patch tested? Add test. Closes apache#31632 from ulysses-you/SPARK-34515. Authored-by: ulysses-you <ulyssesyou18@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 999d3b8) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? This pr make DPP support LIKE ANY/ALL expression: ```sql SELECT date_id, product_id FROM fact_sk f JOIN dim_store s ON f.store_id = s.store_id WHERE s.country LIKE ANY ('%D%E%', '%A%B%') ``` ### Why are the changes needed? Improve query performance. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes apache#31563 from wangyum/SPARK-34436. Lead-authored-by: Yuming Wang <yumwang@apache.org> Co-authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 4a3200b) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…etastore ### What changes were proposed in this pull request? Skip `InSet` null value during push filter to Hive metastore. ### Why are the changes needed? If `InSet` contains a null value, we should skip it and push other values to metastore. To keep same behavior with `In`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Add test. Closes apache#31659 from ulysses-you/SPARK-34550. Authored-by: ulysses-you <ulyssesyou18@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> (cherry picked from commit 82267ac) Signed-off-by: HyukjinKwon <gurwls223@apache.org>
… respect case sensitive conf ### What changes were proposed in this pull request? This PR makes partition spec parsing respect case sensitive conf. ### Why are the changes needed? When parsing the partition spec, Spark will call `org.apache.spark.sql.catalyst.parser.ParserUtils.checkDuplicateKeys` to check if there are duplicate partition column names in the list. But this method is always case sensitive and doesn't detect duplicate partition column names when using different cases. ### Does this PR introduce _any_ user-facing change? Yep. This prevents users from writing incorrect queries such as `INSERT OVERWRITE t PARTITION (c='2', C='3') VALUES (1)` when they don't enable case sensitive conf. ### How was this patch tested? The new added test will fail without this change. Closes apache#31669 from zsxwing/SPARK-34556. Authored-by: Shixiong Zhu <zsxwing@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> (cherry picked from commit 62737e1) Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…fails for column name having a dot **What changes were proposed in this pull request?** This PR fixes dataframe.na.fillMap() for column having a dot in the name as mentioned in [SPARK-34417](https://issues.apache.org/jira/browse/SPARK-34417). Use resolved attributes of a column for replacing null values. **Why are the changes needed?** dataframe.na.fillMap() does not work for column having a dot in the name **Does this PR introduce any user-facing change?** None **How was this patch tested?** Added unit test for the same Closes apache#31545 from amandeep-sharma/master. Lead-authored-by: Amandeep Sharma <happyaman91@gmail.com> Co-authored-by: Amandeep Sharma <amandeep.sharma@oracle.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 4bda3c0) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…esort ### What changes were proposed in this pull request? Today, child expressions may be resolved based on "real" or metadata output attributes. We should prefer the real attribute during resolution if one exists. ### Why are the changes needed? Today, attempting to resolve an expression when there is a "real" output attribute and a metadata attribute with the same name results in resolution failure. This is likely unexpected, as the user may not know about the metadata attribute. ### Does this PR introduce _any_ user-facing change? Yes. Previously, the user would see an error message when resolving a column with the same name as a "real" output attribute and a metadata attribute as below: ``` org.apache.spark.sql.AnalysisException: Reference 'index' is ambiguous, could be: testcat.ns1.ns2.tableTwo.index, testcat.ns1.ns2.tableOne.index.; line 1 pos 71 at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:363) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:107) ``` Now, resolution succeeds and provides the "real" output attribute. ### How was this patch tested? Added a unit test. Closes apache#31654 from karenfeng/fallback-resolve-metadata. Authored-by: Karen Feng <karen.feng@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 2e54d68) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… class name in NewInstance.doGenCode ### What changes were proposed in this pull request? Use `Utils.getSimpleName` to avoid hitting `Malformed class name` error in `NewInstance.doGenCode`. ### Why are the changes needed? On older JDK versions (e.g. JDK8u), nested Scala classes may trigger `java.lang.Class.getSimpleName` to throw an `java.lang.InternalError: Malformed class name` error. In this particular case, creating an `ExpressionEncoder` on such a nested Scala class would create a `NewInstance` expression under the hood, which will trigger the problem during codegen. Similar to apache#29050, we should use Spark's `Utils.getSimpleName` utility function in place of `Class.getSimpleName` to avoid hitting the issue. There are two other occurrences of `java.lang.Class.getSimpleName` in the same file, but they're safe because they're only guaranteed to be only used on Java classes, which don't have this problem, e.g.: ```scala // Make a copy of the data if it's unsafe-backed def makeCopyIfInstanceOf(clazz: Class[_ <: Any], value: String) = s"$value instanceof ${clazz.getSimpleName}? ${value}.copy() : $value" val genFunctionValue: String = lambdaFunction.dataType match { case StructType(_) => makeCopyIfInstanceOf(classOf[UnsafeRow], genFunction.value) case ArrayType(_, _) => makeCopyIfInstanceOf(classOf[UnsafeArrayData], genFunction.value) case MapType(_, _, _) => makeCopyIfInstanceOf(classOf[UnsafeMapData], genFunction.value) case _ => genFunction.value } ``` The Unsafe-* family of types are all Java types, so they're okay. ### Does this PR introduce _any_ user-facing change? Fixes a bug that throws an error when using `ExpressionEncoder` on some nested Scala types, otherwise no changes. ### How was this patch tested? Added a test case to `org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite`. It'll fail on JDK8u before the fix, and pass after the fix. Closes apache#31709 from rednaxelafx/spark-34596-master. Authored-by: Kris Mok <kris.mok@databricks.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> (cherry picked from commit ecf4811) Signed-off-by: HyukjinKwon <gurwls223@apache.org>
… blocks ### What changes were proposed in this pull request? Fix a problems which can lead to data correctness after part blocks retry in `OneForOneBlockFetcher` when use `FetchShuffleBlocks` . ### Why are the changes needed? This is a data correctness bug, It's is no problems when use old protocol to send `OpenBlocks` before fetch chunks in `OneForOneBlockFetcher`; In latest branch, `OpenBlocks` has been replaced to `FetchShuffleBlocks`. Howerver, `FetchShuffleBlocks` read shuffle blocks order is not the same as `blockIds` in `OneForOneBlockFetcher`; the `blockIds` is used to match blockId with shuffle data with index, now it is out of order; It will lead to read wrong block chunk when some blocks fetch failed in `OneForOneBlockFetcher`, it will retry the rest of the blocks in `blockIds` based on the `blockIds`'s order. ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes apache#31643 from seayoun/yuhaiyang_fix_use_FetchShuffleBlocks_order. Lead-authored-by: yuhaiyang <yuhaiyang@yuhaiyangs-MacBook-Pro.local> Co-authored-by: yuhaiyang <yuhaiyang@172.19.25.126> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 4e43819) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Add metadataOutput as a fallback to resolution. Builds off apache#31654. The metadata columns could not be resolved via `df.col("metadataColName")` from the DataFrame API. Yes, the metadata columns can now be resolved as described above. Scala unit test. Closes apache#31668 from karenfeng/spark-34555. Authored-by: Karen Feng <karen.feng@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit b01dd12) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…tPolicy when insert into v2 tables ### What changes were proposed in this pull request? This is a followup of apache#27597 and simply apply the fix in the v2 table insertion code path. ### Why are the changes needed? bug fix ### Does this PR introduce _any_ user-facing change? yes, now v2 table insertion with static partitions also follow StoreAssignmentPolicy. ### How was this patch tested? moved the test from apache#27597 to the general test suite `SQLInsertTestSuite`, which covers DS v2, file source, and hive tables. Closes apache#31726 from cloud-fan/insert. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> (cherry picked from commit 8f1eec4) Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…upport partition columns containing dot for DSv2 ### What changes were proposed in this pull request? `ResolveInsertInto.staticDeleteExpression` should use `UnresolvedAttribute.quoted` to create the delete expression so that we will treat the entire `attr.name` as a column name. ### Why are the changes needed? When users use `dot` in a partition column name, queries like ```INSERT OVERWRITE $t1 PARTITION (`a.b` = 'a') (`c.d`) VALUES('b')``` is not working. ### Does this PR introduce _any_ user-facing change? Without this test, the above query will throw ``` [info] org.apache.spark.sql.AnalysisException: cannot resolve '`a.b`' given input columns: [a.b, c.d]; [info] 'OverwriteByExpression RelationV2[a.b#17, c.d#18] default.tbl, ('a.b <=> cast(a as string)), false [info] +- Project [a.b#19, ansi_cast(col1#16 as string) AS c.d#20] [info] +- Project [cast(a as string) AS a.b#19, col1#16] [info] +- LocalRelation [col1#16] ``` With the fix, the query will run correctly. ### How was this patch tested? The new added test. Closes apache#31713 from zsxwing/SPARK-34599. Authored-by: Shixiong Zhu <zsxwing@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 53e4dba) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? For command `CreateTableAsSelect` we use `InsertIntoHiveTable`, `InsertIntoHadoopFsRelationCommand` to insert data. We will update metrics of `InsertIntoHiveTable`, `InsertIntoHadoopFsRelationCommand` in `FileFormatWriter.write()`, but we only show CreateTableAsSelectCommand in WebUI SQL Tab. We need to update `CreateTableAsSelectCommand`'s metrics too. Before this PR:  After this PR:   ### Why are the changes needed? Complete SQL Metrics ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? <!-- MT Closes apache#31679 from AngersZhuuuu/SPARK-34567. Authored-by: Angerszhuuuu <angers.zhu@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 401e270) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
….logicalPlan ### What changes were proposed in this pull request? Set the active SparkSession to `sparkSessionForStream` and diable AQE & CBO before initializing the `StreamExecution.logicalPlan`. ### Why are the changes needed? The active session should be `sparkSessionForStream`. Otherwise, settings like https://github.com/apache/spark/blob/6b34745cb9b294c91cd126c2ea44c039ee83cb84/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L332-L335 wouldn't take effect if callers access them from the active SQLConf, e.g., the rule of `InsertAdaptiveSparkPlan`. Besides, unlike `InsertAdaptiveSparkPlan` (which skips streaming plan), `CostBasedJoinReorder` seems to have the chance to take effect theoretically. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Tested manually. Before the fix, `InsertAdaptiveSparkPlan` would try to apply AQE on the plan(wouldn't take effect though). After this fix, the rule returns directly. Closes apache#31600 from Ngone51/active-session-for-stream. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit e7e0161) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…correct order ### What changes were proposed in this pull request? Make the "duration" column in standalone mode master UI sorted by numeric duration, hence the column can be sorted by the correct order. Before changes:  After changes:  ### Why are the changes needed? Fix a UI bug to make the sorting consistent across different pages. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Ran several apps with different durations and verified the duration column on the master page can be sorted correctly. Closes apache#31743 from baohe-zhang/SPARK-32924. Authored-by: Baohe Zhang <baohe.zhang@verizonmedia.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com> (cherry picked from commit 9ac5ee2) Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request? Add allow list to capture sql config for view. ### Why are the changes needed? Spark use origin text sql to store view then capture and store sql config into view metadata. Capture config will skip some config with some prefix, e.g. `spark.sql.optimizer.` but unfortunately `spark.sql.optimizer.disableHints` is start with `spark.sql.optimizer.`. We need a allow list to help capture the config. ### Does this PR introduce _any_ user-facing change? Yes bug fix. ### How was this patch tested? Add test. Closes apache#31732 from ulysses-you/SPARK-34613. Authored-by: ulysses-you <ulyssesyou18@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 43aacd5) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…s written in legacy mode ### What changes were proposed in this pull request? This PR fixes an issue when reading of a Parquet file written with legacy mode would fail due to incorrect Parquet LIST to ArrayType conversion. The issue arises when using schema evolution and utilising the parquet-mr reader. 2-level LIST annotated types could be parsed incorrectly as 3-level LIST annotated types because their underlying element type does not match the full inferred Catalyst schema. ### Why are the changes needed? It appears to be a long-standing issue with the legacy mode due to the imprecise check in ParquetRowConverter that was trying to determine Parquet backward compatibility using Catalyst schema: `DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType)` in https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala#L606. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a new test case in ParquetInteroperabilitySuite.scala. Closes apache#34044 from sadikovi/parquet-legacy-write-mode-list-issue. Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit ec26d94) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
….NaN ### What changes were proposed in this pull request? For query ``` select array_except(array(cast('nan' as double), 1d), array(cast('nan' as double))) ``` This returns [NaN, 1d], but it should return [1d]. This issue is caused by `OpenHashSet` can't handle `Double.NaN` and `Float.NaN` too. In this pr fix this based on apache#33955 ### Why are the changes needed? Fix bug ### Does this PR introduce _any_ user-facing change? ArrayExcept won't show handle equal `NaN` value ### How was this patch tested? Added UT Closes apache#33994 from AngersZhuuuu/SPARK-36753. Authored-by: Angerszhuuuu <angers.zhu@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit a7cbe69) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ng UpdateBlockInfo ### What changes were proposed in this pull request? Delegate potentially blocking call to `mapOutputTracker.updateMapOutput` from within `UpdateBlockInfo` from `dispatcher-BlockManagerMaster` to the threadpool to avoid blocking the endpoint. This code path is only accessed for `ShuffleIndexBlockId`, other blocks are still executed on the `dispatcher-BlockManagerMaster` itself. Change `updateBlockInfo` to return `Future[Boolean]` instead of `Boolean`. Response will be sent to RPC caller upon successful completion of the future. Introduce a unit test that forces `MapOutputTracker` to make a broadcast as part of `MapOutputTracker.serializeOutputStatuses` when running decommission tests. ### Why are the changes needed? [SPARK-36782](https://issues.apache.org/jira/browse/SPARK-36782) describes a deadlock occurring if the `dispatcher-BlockManagerMaster` is allowed to block while waiting for write access to data structures. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test as introduced in this PR. --- Ping eejbyfeldt for notice. Closes apache#34043 from f-thiele/SPARK-36782. Lead-authored-by: Fabian A.J. Thiele <fabian.thiele@posteo.de> Co-authored-by: Emil Ejbyfeldt <eejbyfeldt@liveintent.com> Co-authored-by: Fabian A.J. Thiele <fthiele@liveintent.com> Signed-off-by: Gengliang Wang <gengliang@apache.org>
…thread pool ### What changes were proposed in this pull request? This's a follow-up of apache#34043. This PR proposes to only handle shuffle blocks in the separate thread pool and leave other blocks the same behavior as it is. ### Why are the changes needed? To avoid any potential overhead. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass existing tests. Closes apache#34076 from Ngone51/spark-36782-follow-up. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Gengliang Wang <gengliang@apache.org> (cherry picked from commit 9d8ac7c) Signed-off-by: Gengliang Wang <gengliang@apache.org>
### What changes were proposed in this pull request? InSet should handle NaN ``` InSet(Literal(Double.NaN), Set(Double.NaN, 1d)) should return true, but return false. ``` ### Why are the changes needed? InSet should handle NaN ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UT Closes apache#34033 from AngersZhuuuu/SPARK-36792. Authored-by: Angerszhuuuu <angers.zhu@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 64f4bf4) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ld copy dataset_id tag to avoid ambiguous self join ### What changes were proposed in this pull request? This PR backports the change of SPARK-36874 (apache#34172) mainly, and SPARK-34634 (apache#31752) partially to care about the ambiguous self join for `ScriptTransformation`. This PR fixes an issue that ambiguous self join can't be detected if the left and right DataFrame are swapped. This is an example. ``` val df1 = Seq((1, 2, "A1"),(2, 1, "A2")).toDF("key1", "key2", "value") val df2 = df1.filter($"value" === "A2") df1.join(df2, df1("key1") === df2("key2")) // Ambiguous self join is detected and AnalysisException is thrown. df2.join(df1, df1("key1") === df2("key2)) // Ambiguous self join is not detected. ``` The root cause seems that an inner function `collectConflictPlans` in `ResolveReference.dedupRight.` doesn't copy the `dataset_id` tag when it copies a `LogicalPlan`. ### Why are the changes needed? Bug fix. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New tests. Closes apache#34205 from sarutak/backport-SPARK-36874. Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…ad incorrect behavior ### What changes were proposed in this pull request? Incorrect order of variable initialization may lead to incorrect behavior, related code: TorrentBroadcast.scala , TorrentBroadCast will get wrong checksumEnabled value after initialization, this may not be what we need, we can move L94 front of setConf(SparkEnv.get.conf) to avoid this. Supplement: Snippet 1 ```scala class Broadcast { def setConf(): Unit = { checksumEnabled = true } setConf() var checksumEnabled = false } println(new Broadcast().checksumEnabled) ``` output: ```scala false ``` Snippet 2 ```scala class Broadcast { var checksumEnabled = false def setConf(): Unit = { checksumEnabled = true } setConf() } println(new Broadcast().checksumEnabled) ``` output: ```scala true ``` ### Why are the changes needed? we can move L94 front of setConf(SparkEnv.get.conf) to avoid this. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? No Closes apache#33957 from daugraph/branch0. Authored-by: daugraph <daugraph@qq.com> Signed-off-by: Sean Owen <srowen@gmail.com> (cherry picked from commit 65f6a7c) Signed-off-by: Sean Owen <srowen@gmail.com>
… submitting job to DAGScheduler ### What changes were proposed in this pull request? This PR fixes a longstanding issue where the `DAGScheduler'`s single-threaded event processing loop could become blocked by slow `RDD.getPartitions()` calls, preventing other events (like task completions and concurrent job submissions) from being processed in a timely manner. With this patch's change, Spark will now call `.partitions` on every RDD in the DAG before submitting a job to the scheduler, ensuring that the expensive `getPartitions()` calls occur outside of the scheduler event loop. #### Background The `RDD.partitions` method lazily computes an RDD's partitions by calling `RDD.getPartitions()`. The `getPartitions()` method is invoked only once per RDD and its result is cached in the `RDD.partitions_` private field. Sometimes the `getPartitions()` call can be expensive: for example, `HadoopRDD.getPartitions()` performs file listing operations. The `.partitions` method is invoked at many different places in Spark's code, including many existing call sites that are outside of the scheduler event loop. As a result, it's _often_ the case that an RDD's partitions will have been computed before the RDD is submitted to the DAGScheduler. For example, [`submitJob` calls `rdd.partitions.length`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L837), so the DAG root's partitions will be computed outside of the scheduler event loop. However, there's still some cases where `partitions` gets evaluated for the first time inside of the `DAGScheduler` internals. For example, [`ShuffledRDD.getPartitions`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L92-L94) doesn't call `.partitions` on the RDD being shuffled, so a plan with a ShuffledRDD at the root won't necessarily result in `.partitions` having been called on all RDDs prior to scheduler job submission. #### Correctness: proving that we make no excess `.partitions` calls This PR adds code to traverse the DAG prior to job submission and call `.partitions` on every RDD encountered. I'd like to argue that this results in no _excess_ `.partitions` calls: in every case where the new code calls `.partitions` there is existing code which would have called `.partitions` at some point during a successful job execution: - Assume that this is the first time we are computing every RDD in the DAG. - Every RDD appears in some stage. - [`submitStage` will call `submitMissingTasks`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1438) on every stage root RDD. - [`submitStage` calls `getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1687-L1696) on every stage root RDD. - [`getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2995-L3043) visits the RDD and all of its parents RDDs that are computed in the same stage (via narrow dependencies) and calls `.partitions` on each RDD visited. - Therefore `.partitions` is invoked on every RDD in the DAG by the time the job has successfully completed. - Therefore this patch's change does not introduce any new calls to `.partitions` which would not have otherwise occurred (assuming the job succeeded). #### Ordering of `.partitions` calls I don't think the order in which `.partitions` calls occur matters for correctness: the DAGScheduler happens to invoke `.partitions` in a particular order today (defined by the DAG traversal order in internal scheduler methods), but there's many lots of out-of-order `.partition` calls occurring elsewhere in the codebase. #### Handling of exceptions in `.partitions` I've chosen **not** to add special error-handling for the new `.partitions` calls: if exceptions occur then they'll bubble up, unwrapped, to the user code submitting the Spark job. It's sometimes important to preserve exception wrapping behavior, but I don't think that concern is warranted in this particular case: whether `getPartitions` occurred inside or outside of the scheduler (impacting whether exceptions manifest in wrapped or unwrapped form, and impacting whether failed jobs appear in the Spark UI) was not crisply defined (and in some rare cases could even be [influenced by Spark settings in non-obvious ways](https://github.com/apache/spark/blob/10d5303174bf4a47508f6227bbdb1eaf4c92fcdb/core/src/main/scala/org/apache/spark/Partitioner.scala#L75-L79)), so I think it's both unlikely that users were relying on the old behavior and very difficult to preserve it. #### Should this have a configuration flag? Per discussion from a previous PR trying to solve this problem (apache#24438 (review)), I've decided to skip adding a configuration flag for this. ### Why are the changes needed? This fixes a longstanding scheduler performance problem which has been reported by multiple users. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I added a regression test in `BasicSchedulerIntegrationSuite` to cover the regular job submission codepath (`DAGScheduler.submitJob`)This test uses CountDownLatches to simulate the submission of a job containing an RDD with a slow `getPartitions()` call and checks that a concurrently-submitted job is not blocked. I have **not** added separate integration tests for the `runApproximateJob` and `submitMapStage` codepaths (both of which also received the same fix). Closes apache#34265 from JoshRosen/SPARK-23626. Authored-by: Josh Rosen <joshrosen@databricks.com> Signed-off-by: Josh Rosen <joshrosen@databricks.com> (cherry picked from commit c4e975e) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
This PR backport apache#34365 to branch-3.1 ### What changes were proposed in this pull request? Invalidate the table cache after alter table properties (set and unset). ### Why are the changes needed? The table properties can change the behavior of wriing. e.g. the parquet table with `parquet.compression`. If you execute the following SQL, we will get the file with snappy compression rather than zstd. ``` CREATE TABLE t (c int) STORED AS PARQUET; // cache table metadata SELECT * FROM t; ALTER TABLE t SET TBLPROPERTIES('parquet.compression'='zstd'); INSERT INTO TABLE t values(1); ``` So we should invalidate the table cache after alter table properties. ### Does this PR introduce _any_ user-facing change? yes, bug fix ### How was this patch tested? Add test Closes apache#34390 from ulysses-you/SOARK-37098-3.1. Authored-by: ulysses-you <ulyssesyou18@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…edImperativeAggregate Currently, ``` val namedObservation = Observation("named") val df = spark.range(100) val observed_df = df.observe( namedObservation, percentile_approx($"id", lit(0.5), lit(100)).as("percentile_approx_val")) observed_df.collect() namedObservation.get ``` throws exception as follows: ``` 15:16:27.994 ERROR org.apache.spark.util.Utils: Exception encountered java.io.NotSerializableException: org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile$PercentileDigest at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2(TaskResult.scala:55) at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2$adapted(TaskResult.scala:55) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:55) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1434) at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:51) at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:616) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` This PR will fix the issue. After the change, `assert(namedObservation.get === Map("percentile_approx_val" -> 49))` `java.io.NotSerializableException` will not happen. Fix `NotSerializableException` when observe with `TypedImperativeAggregate`. No. This PR change the implement of `AggregatingAccumulator` who uses serialize and deserialize of `TypedImperativeAggregate` now. New tests. Closes apache#34474 from beliefer/SPARK-37203. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 3f3201a) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
For case ``` withTempDir { dir => withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false") { withTable("test_precision") { val df = sql("SELECT 'dummy' AS name, 1000000000000000000010.7000000000000010 AS value") df.write.mode("Overwrite").parquet(dir.getAbsolutePath) sql( s""" |CREATE EXTERNAL TABLE test_precision(name STRING, value DECIMAL(18,6)) |STORED AS PARQUET LOCATION '${dir.getAbsolutePath}' |""".stripMargin) checkAnswer(sql("SELECT * FROM test_precision"), Row("dummy", null)) } } } ``` We write a data with schema It's caused by you create a df with ``` root |-- name: string (nullable = false) |-- value: decimal(38,16) (nullable = false) ``` but create table schema ``` root |-- name: string (nullable = false) |-- value: decimal(18,6) (nullable = false) ``` This will cause enforcePrecisionScale return `null` ``` public HiveDecimal getPrimitiveJavaObject(Object o) { return o == null ? null : this.enforcePrecisionScale(((HiveDecimalWritable)o).getHiveDecimal()); } ``` Then throw NPE when call `toCatalystDecimal ` We should judge if the return value is `null` to avoid throw NPE Fix bug No Added UT Closes apache#34519 from AngersZhuuuu/SPARK-37196. Authored-by: Angerszhuuuu <angers.zhu@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit a4f8ffb) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This PR fixes a `NullPointerException` in `WholeStageCodegenExec` caused by the expression `WidthBucket`. The cause of this NPE is that `WidthBucket` calls `WidthBucket.computeBucketNumber`, which can return `null`, but the generated code cannot deal with `null`s. This fixes a `NullPointerException` in Spark SQL. No Added tests to `MathExpressionsSuite`. This suite already had tests for `WidthBucket` with interval inputs, but lacked tests with double inputs. I checked that the tests failed without the fix, and succeed with the fix. Closes apache#34670 from tomvanbussel/SPARK-37388. Authored-by: Tom van Bussel <tom.vanbussel@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 77f3974) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… between v3.1 and v2 This backports apache#34697 to 3.1 ### What changes were proposed in this pull request? We will store table schema in table properties for the read-side to restore. In Spark 3.1, we add char/varchar support natively. In some commands like `create table`, `alter table` with these types, the `char(x)` or `varchar(x)` will be stored directly to those properties. If a user uses Spark 2 to read such a table it will fail to parse the schema. FYI, https://github.com/apache/spark/blob/branch-2.4/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala#L136 A table can be a newly created one by Spark 3.1 and later or an existing one modified by Spark 3.1 and on. ### Why are the changes needed? backward compatibility ### Does this PR introduce _any_ user-facing change? That's not necessarily user-facing as a bugfix and only related to internal table properties. ### How was this patch tested? manully Closes apache#34736 from yaooqinn/PR_TOOL_PICK_PR_34697_BRANCH-3.1. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
**What changes were proposed in this pull request?** Change the deserialization mapping for primitive type void. **Why are the changes needed?** The void primitive type in Scala should be classOf[Unit] not classOf[Void]. Spark erroneously [map it](https://github.com/apache/spark/blob/v3.2.0/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala#L80) differently than all other primitive types. Here is the code: ``` private object JavaDeserializationStream { val primitiveMappings = Map[String, Class[_]]( "boolean" -> classOf[Boolean], "byte" -> classOf[Byte], "char" -> classOf[Char], "short" -> classOf[Short], "int" -> classOf[Int], "long" -> classOf[Long], "float" -> classOf[Float], "double" -> classOf[Double], "void" -> classOf[Void] ) } ``` Spark code is Here is the demonstration: ``` scala> classOf[Long] val res0: Class[Long] = long scala> classOf[Double] val res1: Class[Double] = double scala> classOf[Byte] val res2: Class[Byte] = byte scala> classOf[Void] val res3: Class[Void] = class java.lang.Void <--- this is wrong scala> classOf[Unit] val res4: Class[Unit] = void <---- this is right ``` It will result in Spark deserialization error if the Spark code contains void primitive type: `java.io.InvalidClassException: java.lang.Void; local class name incompatible with stream class name "void"` **Does this PR introduce any user-facing change?** no **How was this patch tested?** Changed test, also tested e2e with the code results deserialization error and it pass now. Closes apache#34816 from daijyc/voidtype. Authored-by: Daniel Dai <jdai@pinterest.com> Signed-off-by: Sean Owen <srowen@gmail.com> (cherry picked from commit fb40c0e) Signed-off-by: Sean Owen <srowen@gmail.com>
… for Generate This is a performance regression since Spark 3.1, caused by https://issues.apache.org/jira/browse/SPARK-32295 If you run the query in the JIRA ticket ``` Seq( (1, "x", "x", "x", "x", "x", "x", "x", "x", "x", "x", "x", "x", "x", "x", "x", "x", "x", "x", "x", "x") ).toDF() .checkpoint() // or save and reload to truncate lineage .createOrReplaceTempView("sub") session.sql(""" SELECT * FROM ( SELECT EXPLODE( ARRAY( * ) ) result FROM ( SELECT _1 a, _2 b, _3 c, _4 d, _5 e, _6 f, _7 g, _8 h, _9 i, _10 j, _11 k, _12 l, _13 m, _14 n, _15 o, _16 p, _17 q, _18 r, _19 s, _20 t, _21 u FROM sub ) ) WHERE result != '' """).show() ``` You will hit OOM. The reason is that: 1. We infer additional predicates with `Generate`. In this case, it's `size(array(cast(_1#21 as string), _2#22, _3#23, ...) > 0` 2. Because of the cast, the `ConstantFolding` rule can't optimize this `size(array(...))`. 3. We end up with a plan containing this part ``` +- Project [_1#21 AS a#106, _2#22 AS b#107, _3#23 AS c#108, _4#24 AS d#109, _5#25 AS e#110, _6#26 AS f#111, _7#27 AS g#112, _8#28 AS h#113, _9#29 AS i#114, _10#30 AS j#115, _11#31 AS k#116, _12#32 AS l#117, _13#33 AS m#118, _14#34 AS n#119, _15#35 AS o#120, _16#36 AS p#121, _17#37 AS q#122, _18#38 AS r#123, _19#39 AS s#124, _20#40 AS t#125, _21#41 AS u#126] +- Filter (size(array(cast(_1#21 as string), _2#22, _3#23, _4#24, _5#25, _6#26, _7#27, _8#28, _9#29, _10#30, _11#31, _12#32, _13#33, _14#34, _15#35, _16#36, _17#37, _18#38, _19#39, _20#40, _21#41), true) > 0) +- LogicalRDD [_1#21, _2#22, _3#23, _4#24, _5#25, _6#26, _7#27, _8#28, _9#29, _10#30, _11#31, _12#32, _13#33, _14#34, _15#35, _16#36, _17#37, _18#38, _19#39, _20#40, _21#41] ``` When calculating the constraints of the `Project`, we generate around 2^20 expressions, due to this code ``` var allConstraints = child.constraints projectList.foreach { case a Alias(l: Literal, _) => allConstraints += EqualNullSafe(a.toAttribute, l) case a Alias(e, _) => // For every alias in `projectList`, replace the reference in constraints by its attribute. allConstraints ++= allConstraints.map(_ transform { case expr: Expression if expr.semanticEquals(e) => a.toAttribute }) allConstraints += EqualNullSafe(e, a.toAttribute) case _ => // Don't change. } ``` There are 3 issues here: 1. We may infer complicated predicates from `Generate` 2. `ConstanFolding` rule is too conservative. At least `Cast` has no side effect with ANSI-off. 3. When calculating constraints, we should have a upper bound to avoid generating too many expressions. This fixes the first 2 issues, and leaves the third one for the future. fix a performance issue no new tests, and run the query in JIRA ticket locally. Closes apache#34823 from cloud-fan/perf. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 1fac7a9) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
….sql.legacy.allowNegativeScaleOfDecimal is enabled Backport apache#34811 ### What changes were proposed in this pull request? Fix cast string type to decimal type only if `spark.sql.legacy.allowNegativeScaleOfDecimal` is enabled. For example: ```scala import org.apache.spark.sql.types._ import org.apache.spark.sql.Row spark.conf.set("spark.sql.legacy.allowNegativeScaleOfDecimal", true) val data = Seq(Row("7.836725755512218E38")) val schema = StructType(Array(StructField("a", StringType, false))) val df =spark.createDataFrame(spark.sparkContext.parallelize(data), schema) df.select(col("a").cast(DecimalType(37,-17))).show ``` The result is null since [SPARK-32706](https://issues.apache.org/jira/browse/SPARK-32706). ### Why are the changes needed? Fix regression bug. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes apache#34851 from wangyum/SPARK-37451-branch-3.1. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…sters ### What changes were proposed in this pull request? After an improvement in SPARK-31486, contributor uses 'asyncSendToMasterAndForwardReply' method instead of 'activeMasterEndpoint.askSync' to get the status of driver. Since the driver's status is only available in active master and the 'asyncSendToMasterAndForwardReply' method iterate over all of the masters, we have to handle the response from the backup masters in the client, which the developer did not consider in the SPARK-31486 change. So drivers running in cluster mode and on a cluster with multi masters affected by this bug. ### Why are the changes needed? We need to find if the response received from a backup master client must ignore it. ### Does this PR introduce _any_ user-facing change? No, It's only fixed a bug and brings back the ability to deploy in cluster mode on multi-master clusters. ### How was this patch tested? Closes apache#34911 from mohamadrezarostami/fix-a-bug-in-report-driver-status. Authored-by: Mohamadreza Rostami <mohamadrezarostami2@gmail.com> Signed-off-by: yi.wu <yi.wu@databricks.com>
### What changes were proposed in this pull request? Fix NPE ``` scala> Row(null).getSeq(0) java.lang.NullPointerException at org.apache.spark.sql.Row.getSeq(Row.scala:319) at org.apache.spark.sql.Row.getSeq$(Row.scala:319) at org.apache.spark.sql.catalyst.expressions.GenericRow.getSeq(rows.scala:166) ``` ### Why are the changes needed? bug fixing ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? new UT Closes apache#34928 from huaxingao/npe. Authored-by: Huaxin Gao <huaxin_gao@apple.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit fcf636d) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…State() ### What changes were proposed in this pull request? This PR fixes a correctness issue in the CodeGenerator.addBufferedState() helper method (which is used by the SortMergeJoinExec operator). The addBufferedState() method generates code for buffering values that come from a row in an operator's input iterator, performing any necessary copying so that the buffered values remain correct after the input iterator advances to the next row. The current logic does not correctly handle UDTs: these fall through to the match statement's default branch, causing UDT values to be buffered without copying. This is problematic if the UDT's underlying SQL type is an array, map, struct, or string type (since those types require copying). Failing to copy values can lead to correctness issues or crashes. This patch's fix is simple: when the dataType is a UDT, use its underlying sqlType for determining whether values need to be copied. I used an existing helper function to perform this type unwrapping. ### Why are the changes needed? Fix a correctness issue. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I manually tested this change by re-running a workload which failed with a segfault prior to this patch. See JIRA for more details: https://issues.apache.org/jira/browse/SPARK-37784 So far I have been unable to come up with a CI-runnable regression test which would have failed prior to this change (my only working reproduction runs in a pre-production environment and does not fail in my development environment). Closes apache#35066 from JoshRosen/SPARK-37784. Authored-by: Josh Rosen <joshrosen@databricks.com> Signed-off-by: Josh Rosen <joshrosen@databricks.com> (cherry picked from commit eeef48f) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
…n a thread out of RpcEnv ### What changes were proposed in this pull request? This PR proposes to let `WorkerWatcher` run `System.exit` in a separate thread instead of some thread of `RpcEnv`. ### Why are the changes needed? `System.exit` will trigger the shutdown hook to run `executor.stop`, which will result in the same deadlock issue with SPARK-14180. But note that since Spark upgrades to Hadoop 3 recently, each hook now will have a [timeout threshold](https://github.com/apache/hadoop/blob/d4794dd3b2ba365a9d95ad6aafcf43a1ea40f777/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ShutdownHookManager.java#L205-L209) which forcibly interrupt the hook execution once reaches timeout. So, the deadlock issue doesn't really exist in the master branch. However, it's still critical for previous releases and is a wrong behavior that should be fixed. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Tested manually. Closes apache#35069 from Ngone51/fix-workerwatcher-exit. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: yi.wu <yi.wu@databricks.com> (cherry picked from commit 639d6f4) Signed-off-by: yi.wu <yi.wu@databricks.com>
### What changes were proposed in this pull request? This reverts commit 450b415. ### Why are the changes needed? In apache#32888, shahidki31 change taskInfo.index to taskInfo.taskId. However, we generally use `index.attempt` or `taskId` to distinguish tasks within a stage, not `taskId.attempt`. Thus apache#32888 was a wrong fix issue, we should revert it. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? origin test suites Closes apache#35160 from stczwd/SPARK-37860. Authored-by: stczwd <qcsd2011@163.com> Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com> (cherry picked from commit 3d2fde5) Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
…c`'s process output iterator Backport apache#35368 to 3.1. ### What changes were proposed in this pull request? Fix hasNext in HiveScriptTransformationExec's process output iterator to always return false if it had previously returned false. ### Why are the changes needed? When hasNext on the process output iterator returns false, it leaves the iterator in a state (i.e., scriptOutputWritable is not null) such that the next call returns true. The Guava Ordering used in TakeOrderedAndProjectExec will call hasNext on the process output iterator even after an earlier call had returned false. This results in fake rows when script transform is used with `order by` and `limit`. For example: ``` create or replace temp view t as select * from values (1), (2), (3) as t(a); select transform(a) USING 'cat' AS (a int) FROM t order by a limit 10; ``` This returns: ``` NULL NULL NULL 1 2 3 ``` ### Does this PR introduce _any_ user-facing change? No, other than removing the correctness issue. ### How was this patch tested? New unit test. Closes apache#35375 from bersprockets/SPARK-38075_3.1. Authored-by: Bruce Robbins <bersprockets@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)
How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.