-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-45495][core] Support stage level task resource profile for k8s cluster when dynamic allocation disabled #43323
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…c allocation disabled
|
This PR is similar to the #43030. And I did some manual tests and the results can be found in the following comments. |
Manual testsDue to the challenges of conducting Kubernetes application tests within Spark unit tests, I took the initiative to manually perform several tests on the local Kubernetes cluster. I followed https://jaceklaskowski.github.io/spark-kubernetes-book/demo/spark-shell-on-minikube/ tutorial to setup a local Kubernetes cluster. minikube delete
minikube start
eval $(minikube -p minikube docker-env)
cd $SPARK_HOME
./bin/docker-image-tool.sh \
-m \
-t pr_k8s \
build
eval $(minikube -p minikube docker-env)
kubectl create ns spark-demo
kubens spark-demo
cd $SPARK_HOME
K8S_SERVER=$(kubectl config view --output=jsonpath='{.clusters[].cluster.server}')With dynamic allocation disabled../bin/spark-shell --master k8s://$K8S_SERVER \
--conf spark.kubernetes.container.image=spark:pr_k8s \
--conf spark.kubernetes.context=minikube \
--conf spark.kubernetes.namespace=spark-demo \
--verbose \
--num-executors=1 \
--conf spark.executor.cores=4 \
--conf spark.task.cpus=1 \
--conf spark.dynamicAllocation.enabled=fasleThe above command requires 1 executor with 4 CPU cores, and the default
Test code: import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}
val rdd = sc.range(0, 100, 1, 4)
var rdd1 = rdd.repartition(3)
val treqs = new TaskResourceRequests().cpus(1)
val rp = new ResourceProfileBuilder().require(treqs).build
rdd1 = rdd1.withResources(rp)
rdd1.collect()When the required The entire Spark application consists of a single Spark job that will be divided into two stages. The first shuffle stage comprises four tasks, all of which will be executed simultaneously. And the second ResultStage comprises 3 tasks, and all of which will be executed simultaneously since the required
Test code, import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}
val rdd = sc.range(0, 100, 1, 4)
var rdd1 = rdd.repartition(3)
val treqs = new TaskResourceRequests().cpus(2)
val rp = new ResourceProfileBuilder().require(treqs).build
rdd1 = rdd1.withResources(rp)
rdd1.collect()When the required The first shuffle stage behaves the same as the first one. The second ResultStage comprises 3 tasks, so the first 2 tasks will be running at a time, and then execute the last task.
Test code, import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}
val rdd = sc.range(0, 100, 1, 4)
var rdd1 = rdd.repartition(3)
val treqs = new TaskResourceRequests().cpus(3)
val rp = new ResourceProfileBuilder().require(treqs).build
rdd1 = rdd1.withResources(rp)
rdd1.collect()When the required The first shuffle stage behaves the same as the first one. The second ResultStage comprises 3 tasks, all of which will be running serially.
exception happened. scala> import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}
|
| val rdd = sc.range(0, 100, 1, 4)
| var rdd1 = rdd.repartition(3)
| val treqs = new TaskResourceRequests().cpus(5)
| val rp = new ResourceProfileBuilder().require(treqs).build
|
| rdd1 = rdd1.withResources(rp)
|
| rdd1.collect()
warning: 1 deprecation (since 2.13.3); for details, enable `:setting -deprecation` or `:replay -deprecation`
org.apache.spark.SparkException: The number of cores per executor (=4) has to be >= the number of cpus per task = 5.
at org.apache.spark.resource.ResourceUtils$.validateTaskCpusLargeEnough(ResourceUtils.scala:412)
at org.apache.spark.resource.ResourceProfile.calculateTasksAndLimitingResource(ResourceProfile.scala:182)
at org.apache.spark.resource.ResourceProfile.$anonfun$limitingResource$1(ResourceProfile.scala:152)
at scala.Option.getOrElse(Option.scala:201)
at org.apache.spark.resource.ResourceProfile.limitingResource(ResourceProfile.scala:151)
at org.apache.spark.resource.ResourceProfileManager.addResourceProfile(ResourceProfileManager.scala:141)
at org.apache.spark.rdd.RDD.withResources(RDD.scala:1829)
... 42 elided
|
With dynamic allocation enabled../bin/spark-shell --master k8s://$K8S_SERVER \
--conf spark.kubernetes.container.image=spark:pr_k8s \
--conf spark.kubernetes.context=minikube \
--conf spark.kubernetes.namespace=spark-demo \
--verbose \
--num-executors=1 \
--conf spark.executor.cores=4 \
--conf spark.task.cpus=1 \
--conf spark.dynamicAllocation.enabled=trueThe above command enables the dynamic allocation and the max executors required is set to 1 in order to test. TaskResourceProfile without any specific executor request informationTest code, import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}
val rdd = sc.range(0, 100, 1, 4)
var rdd1 = rdd.repartition(3)
val treqs = new TaskResourceRequests().cpus(3)
val rp = new ResourceProfileBuilder().require(treqs).build
rdd1 = rdd1.withResources(rp)
rdd1.collect()The rp refers to the TaskResourceProfile without any specific executor request information, thus the executor information will utilize the default values from Default ResourceProfile (executor.cores=4). The above code will require an extra executor which will have the same Different executor request informationimport org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfileBuilder, TaskResourceRequests}
val rdd = sc.range(0, 100, 1, 4)
var rdd1 = rdd.repartition(3)
val ereqs = new ExecutorResourceRequests().cores(6);
val treqs = new TaskResourceRequests().cpus(5)
val rp = new ResourceProfileBuilder().require(ereqs).require(treqs).build
rdd1 = rdd1.withResources(rp)
rdd1.collect()we can see the "Executor ID = 2" has the 6 CPU cores. |
|
Hi @tgravescs @mridulm, Could you help review this PR which is similar to #43030. Thx very much. |
tgravescs
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Look good, minor nit on the exception text.
| }.getMessage | ||
| assert(error === "TaskResourceProfiles are only supported for Standalone " + | ||
| "and Yarn cluster for now when dynamic allocation is disabled.") | ||
| "and Yarn and Kubernetes cluster for now when dynamic allocation is disabled.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit could change this to "Standalone, YARN, and Kubernetes"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion, Done.
|
+1 looks good. |
… cluster when dynamic allocation disabled ### What changes were proposed in this pull request? This PR is a follow-up of #37268 which supports stage-level task resource profile for standalone cluster when dynamic allocation is disabled. This PR enables stage-level task resource profile for the Kubernetes cluster. ### Why are the changes needed? Users who work on spark ML/DL cases running on Kubernetes would expect stage-level task resource profile feature. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The current tests of #37268 can also cover this PR since both Kubernetes and standalone cluster share the same TaskSchedulerImpl class which implements this feature. Apart from that, modifying the existing test to cover the Kubernetes cluster. Apart from that, I also performed some manual tests which have been updated in the comments. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43323 from wbo4958/k8s-stage-level. Authored-by: Bobby Wang <wbo4958@gmail.com> Signed-off-by: Thomas Graves <tgraves@apache.org> (cherry picked from commit 632eabd) Signed-off-by: Thomas Graves <tgraves@apache.org>







What changes were proposed in this pull request?
This PR is a follow-up of #37268 which supports stage-level task resource profile for standalone cluster when dynamic allocation is disabled. This PR enables stage-level task resource profile for the Kubernetes cluster.
Why are the changes needed?
Users who work on spark ML/DL cases running on Kubernetes would expect stage-level task resource profile feature.
Does this PR introduce any user-facing change?
No
How was this patch tested?
The current tests of #37268 can also cover this PR since both Kubernetes and standalone cluster share the same TaskSchedulerImpl class which implements this feature. Apart from that, modifying the existing test to cover the Kubernetes cluster. Apart from that, I also performed some manual tests which have been updated in the comments.
Was this patch authored or co-authored using generative AI tooling?
No