-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-45250][core] Support stage level task resource profile for yarn cluster when dynamic allocation disabled #43030
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Please update description to say what this PR does. Specifically this does introduce user facing changes because there is now a new feature available to users of yarn. Was this tested on a real cluster, what all tests were run other then unit tests? You also need to update the documentation: docs/configuration.md, docs/running-on-yarn.md |
core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
Outdated
Show resolved
Hide resolved
|
Put it in draft until Kubernetes support is available. @tgravescs thx for the review. |
Thanks @wbo4958 for ping me and work on this. No concerns about adding the support for yarn cluster. Please feel free to go ahead. |
Manual testsDue to the challenges of conducting yarn application tests within Spark unit tests, I took the initiative to manually perform several tests on our internal Yarn cluster. With dynamic allocation disabled.spark-shell --master yarn --num-executors=1 --executor-cores=4 --conf spark.task.cpus=1 \
--conf spark.dynamicAllocation.enabled=falseThe above command requires 1 executor with 4 CPU cores, and the default
Test code: import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}
val rdd = sc.range(0, 100, 1, 4)
var rdd1 = rdd.repartition(3)
val treqs = new TaskResourceRequests().cpus(1)
val rp = new ResourceProfileBuilder().require(treqs).build
rdd1 = rdd1.withResources(rp)
rdd1.collect()When the required The entire Spark application consists of a single Spark job that will be divided into two stages. The first shuffle stage comprises four tasks, all of which will be executed simultaneously. And the second ResultStage comprises 3 tasks, and all of which will be executed simultaneously since the required
Test code, import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}
val rdd = sc.range(0, 100, 1, 4)
var rdd1 = rdd.repartition(3)
val treqs = new TaskResourceRequests().cpus(2)
val rp = new ResourceProfileBuilder().require(treqs).build
rdd1 = rdd1.withResources(rp)
rdd1.collect()When the required The first shuffle stage behaves the same as the first one. The second ResultStage comprises 3 tasks, so the first 2 tasks will be running at a time, and then execute the last task.
Test code, import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}
val rdd = sc.range(0, 100, 1, 4)
var rdd1 = rdd.repartition(3)
val treqs = new TaskResourceRequests().cpus(3)
val rp = new ResourceProfileBuilder().require(treqs).build
rdd1 = rdd1.withResources(rp)
rdd1.collect()When the required The first shuffle stage behaves the same as the first one. The second ResultStage comprises 3 tasks, all of which will be running serially.
exception happened. scala> rdd1 = rdd1.withResources(rp)
org.apache.spark.SparkException: The number of cores per executor (=4) has to be >= the number of cpus per task = 5.
at org.apache.spark.resource.ResourceUtils$.validateTaskCpusLargeEnough(ResourceUtils.scala:412)
at org.apache.spark.resource.ResourceProfile.calculateTasksAndLimitingResource(ResourceProfile.scala:182)
at org.apache.spark.resource.ResourceProfile.$anonfun$limitingResource$1(ResourceProfile.scala:152)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.resource.ResourceProfile.limitingResource(ResourceProfile.scala:151)
at org.apache.spark.resource.ResourceProfileManager.addResourceProfile(ResourceProfileManager.scala:141)
at org.apache.spark.rdd.RDD.withResources(RDD.scala:1829)
... 50 elided
scala> |
With dynamic allocation enabled.spark-shell --master yarn --num-executors=1 --executor-cores=4 --conf spark.task.cpus=1 \
--conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=1\The above command enables the dynamic allocation and the max executors required is set to 1 in order to test. TaskResourceProfile without any specific executor request informationTest code, import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}
val rdd = sc.range(0, 100, 1, 4)
var rdd1 = rdd.repartition(3)
val treqs = new TaskResourceRequests().cpus(3)
val rp = new ResourceProfileBuilder().require(treqs).build
rdd1 = rdd1.withResources(rp)
rdd1.collect()The rp refers to the TaskResourceProfile without any specific executor request information, thus the executor information will utilize the default values from Default ResourceProfile (executor.cores=4). The above code will require an extra executor which will have the same Different executor request informationimport org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfileBuilder, TaskResourceRequests}
val rdd = sc.range(0, 100, 1, 4)
var rdd1 = rdd.repartition(3)
val ereqs = new ExecutorResourceRequests().cores(6);
val treqs = new TaskResourceRequests().cpus(5)
val rp = new ResourceProfileBuilder().require(ereqs).require(treqs).build
rdd1 = rdd1.withResources(rp)
rdd1.collect() |
|
Hi @tgravescs @ivoson, Would you please help to review it again? Since it's hard to do the Yarn end 2 end tests on spark unit tests. I did some manual tests, please see the above comments. BTW, this PR only supports Yarn since I'm not familiar with k8s for now. I will put up another PR for k8s. Thx for the understanding. |
|
You still need to update the documentation like I mentioned here: #43030 (comment) Also need to look at the build failure, doesn't look like this code so maybe something with setup in your repo. |
|
overall code looks good, we need to figure out why tests aren't running/passing. |
|
Thx @tgravescs, I reran the build/tests, but they still kept failing. Let me rerun them again. |
core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
Outdated
Show resolved
Hide resolved
|
@HyukjinKwon would you have any idea on the build failure? Looks like env is just not setup properly but I've never seen that before. |
5e01eae to
4f68533
Compare
4f68533 to
f5a105f
Compare
|
Hi @tgravescs, Finally, I rebased this PR and forced-updated my PR, now the CI got passed. Thx |
|
Merged to master. |
|
thanks @mridulm any throught/objections to pulling this back into 3.5 line? |
|
I did not backport it given it was an improvement, but don't have objections as such - as you said, it is low risk |
…n cluster when dynamic allocation disabled ### What changes were proposed in this pull request? This PR is a follow-up of #37268 which supports stage level task resource profile for standalone cluster when dynamic allocation disabled. This PR enables stage-level task resource profile for yarn cluster. ### Why are the changes needed? Users who work on spark ML/DL cases running on Yarn would expect stage-level task resource profile feature. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The current tests of #37268 can also cover this PR since both yarn and standalone cluster share the same TaskSchedulerImpl class which implements this feature. Apart from that, modifying the existing test to cover yarn cluster. Apart from that, I also performed some manual tests which have been updated in the comments. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43030 from wbo4958/yarn-task-resoure-profile. Authored-by: Bobby Wang <wbo4958@gmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit 5b80639) Signed-off-by: Thomas Graves <tgraves@apache.org>
|
thanks, I merged back into 3.5 branch (3.5.1) as well. |










What changes were proposed in this pull request?
This PR is a follow-up of #37268 which supports stage level task resource profile for standalone cluster when dynamic allocation disabled. This PR enables stage-level task resource profile for yarn cluster.
Why are the changes needed?
Users who work on spark ML/DL cases running on Yarn would expect stage-level task resource profile feature.
Does this PR introduce any user-facing change?
No
How was this patch tested?
The current tests of #37268 can also cover this PR since both yarn and standalone cluster share the same TaskSchedulerImpl class which implements this feature. Apart from that, modifying the existing test to cover yarn cluster. Apart from that, I also performed some manual tests which have been updated in the comments.
Was this patch authored or co-authored using generative AI tooling?
No