Skip to content

Conversation

@wbo4958
Copy link
Contributor

@wbo4958 wbo4958 commented Sep 21, 2023

What changes were proposed in this pull request?

This PR is a follow-up of #37268 which supports stage level task resource profile for standalone cluster when dynamic allocation disabled. This PR enables stage-level task resource profile for yarn cluster.

Why are the changes needed?

Users who work on spark ML/DL cases running on Yarn would expect stage-level task resource profile feature.

Does this PR introduce any user-facing change?

No

How was this patch tested?

The current tests of #37268 can also cover this PR since both yarn and standalone cluster share the same TaskSchedulerImpl class which implements this feature. Apart from that, modifying the existing test to cover yarn cluster. Apart from that, I also performed some manual tests which have been updated in the comments.

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the CORE label Sep 21, 2023
@wbo4958
Copy link
Contributor Author

wbo4958 commented Sep 21, 2023

Hi @ivoson, I believe this PR is a follow-up of your previous PR #37268 since both yarn and standalone cluster share the same TaskSchedulerImpl. But I still would like to know what's your concern about this PR. Thx.

@tgravescs
Copy link
Contributor

Please update description to say what this PR does. Specifically this does introduce user facing changes because there is now a new feature available to users of yarn.

Was this tested on a real cluster, what all tests were run other then unit tests?

You also need to update the documentation: docs/configuration.md, docs/running-on-yarn.md

@wbo4958 wbo4958 marked this pull request as draft September 26, 2023 23:35
@wbo4958
Copy link
Contributor Author

wbo4958 commented Sep 26, 2023

Put it in draft until Kubernetes support is available. @tgravescs thx for the review.

@ivoson
Copy link
Contributor

ivoson commented Sep 27, 2023

Hi @ivoson, I believe this PR is a follow-up of your previous PR #37268 since both yarn and standalone cluster share the same TaskSchedulerImpl. But I still would like to know what's your concern about this PR. Thx.

Thanks @wbo4958 for ping me and work on this. No concerns about adding the support for yarn cluster. Please feel free to go ahead.

@wbo4958
Copy link
Contributor Author

wbo4958 commented Sep 27, 2023

Manual tests

Due to the challenges of conducting yarn application tests within Spark unit tests, I took the initiative to manually perform several tests on our internal Yarn cluster.

With dynamic allocation disabled.

spark-shell --master yarn --num-executors=1 --executor-cores=4 --conf spark.task.cpus=1 \
   --conf spark.dynamicAllocation.enabled=false

The above command requires 1 executor with 4 CPU cores, and the default task.cpus = 1, so the default tasks parallelism is 4 at a time.

  1. task.cores=1

Test code:

import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}

val rdd = sc.range(0, 100, 1, 4)
var rdd1 = rdd.repartition(3)

val treqs = new TaskResourceRequests().cpus(1)
val rp = new ResourceProfileBuilder().require(treqs).build

rdd1 = rdd1.withResources(rp)
rdd1.collect()

When the required task.cpus=1, executor.cores=4 (No executor resource specified, use the default one), there will be 4 tasks running for rp.

The entire Spark application consists of a single Spark job that will be divided into two stages. The first shuffle stage comprises four tasks, all of which will be executed simultaneously.

shuffle-stage

And the second ResultStage comprises 3 tasks, and all of which will be executed simultaneously since the required task.cpus is 1.

result-stage-task cores=1

  1. task.cores=2

Test code,

import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}

val rdd = sc.range(0, 100, 1, 4)
var rdd1 = rdd.repartition(3)

val treqs = new TaskResourceRequests().cpus(2)
val rp = new ResourceProfileBuilder().require(treqs).build

rdd1 = rdd1.withResources(rp)
rdd1.collect()

When the required task.cpus=2, executor.cores=4 (No executor resource specified, use the default one), there will be 2 tasks running for rp.

The first shuffle stage behaves the same as the first one.

The second ResultStage comprises 3 tasks, so the first 2 tasks will be running at a time, and then execute the last task.

result-stage-task cores=2

  1. task.cores=3

Test code,

import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}

val rdd = sc.range(0, 100, 1, 4)
var rdd1 = rdd.repartition(3)

val treqs = new TaskResourceRequests().cpus(3)
val rp = new ResourceProfileBuilder().require(treqs).build

rdd1 = rdd1.withResources(rp)
rdd1.collect()

When the required task.cpus=3, executor.cores=4 (No executor resource specified, use the default one), there will be 1 task running for rp.

The first shuffle stage behaves the same as the first one.

The second ResultStage comprises 3 tasks, all of which will be running serially.

result-stage-task cores=3

  1. task.cores=5
import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}

val rdd = sc.range(0, 100, 1, 4)
var rdd1 = rdd.repartition(3)
val treqs = new TaskResourceRequests().cpus(5)
val rp = new ResourceProfileBuilder().require(treqs).build

rdd1 = rdd1.withResources(rp)

exception happened.

scala> rdd1 = rdd1.withResources(rp)
org.apache.spark.SparkException: The number of cores per executor (=4) has to be >= the number of cpus per task = 5.
  at org.apache.spark.resource.ResourceUtils$.validateTaskCpusLargeEnough(ResourceUtils.scala:412)
  at org.apache.spark.resource.ResourceProfile.calculateTasksAndLimitingResource(ResourceProfile.scala:182)
  at org.apache.spark.resource.ResourceProfile.$anonfun$limitingResource$1(ResourceProfile.scala:152)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.resource.ResourceProfile.limitingResource(ResourceProfile.scala:151)
  at org.apache.spark.resource.ResourceProfileManager.addResourceProfile(ResourceProfileManager.scala:141)
  at org.apache.spark.rdd.RDD.withResources(RDD.scala:1829)
  ... 50 elided

scala> 

@wbo4958 wbo4958 marked this pull request as ready for review September 27, 2023 07:50
@wbo4958
Copy link
Contributor Author

wbo4958 commented Sep 27, 2023

With dynamic allocation enabled.

spark-shell --master yarn --num-executors=1 --executor-cores=4 --conf spark.task.cpus=1 \
  --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=1\

The above command enables the dynamic allocation and the max executors required is set to 1 in order to test.

TaskResourceProfile without any specific executor request information

Test code,

import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}

val rdd = sc.range(0, 100, 1, 4)
var rdd1 = rdd.repartition(3)

val treqs = new TaskResourceRequests().cpus(3)
val rp = new ResourceProfileBuilder().require(treqs).build

rdd1 = rdd1.withResources(rp)
rdd1.collect()

The rp refers to the TaskResourceProfile without any specific executor request information, thus the executor information will utilize the default values from Default ResourceProfile (executor.cores=4).

The above code will require an extra executor which will have the same executor.cores/memory as the default ResourceProfile.

dynamic-no-exec-events

dynamc-no-exec-executors

dynamic-no-exec-tasks

Different executor request information

import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfileBuilder, TaskResourceRequests}

val rdd = sc.range(0, 100, 1, 4)
var rdd1 = rdd.repartition(3)

val ereqs = new ExecutorResourceRequests().cores(6);
val treqs = new TaskResourceRequests().cpus(5)

val rp = new ResourceProfileBuilder().require(ereqs).require(treqs).build

rdd1 = rdd1.withResources(rp)
rdd1.collect()

dynamice-events

dynamic-executors

dynamic-tasks

@wbo4958
Copy link
Contributor Author

wbo4958 commented Sep 27, 2023

Hi @tgravescs @ivoson, Would you please help to review it again? Since it's hard to do the Yarn end 2 end tests on spark unit tests. I did some manual tests, please see the above comments.

BTW, this PR only supports Yarn since I'm not familiar with k8s for now. I will put up another PR for k8s. Thx for the understanding.

@tgravescs
Copy link
Contributor

You still need to update the documentation like I mentioned here: #43030 (comment)

Also need to look at the build failure, doesn't look like this code so maybe something with setup in your repo.

@github-actions github-actions bot added the DOCS label Sep 28, 2023
@tgravescs
Copy link
Contributor

overall code looks good, we need to figure out why tests aren't running/passing.

@wbo4958
Copy link
Contributor Author

wbo4958 commented Sep 28, 2023

Thx @tgravescs, I reran the build/tests, but they still kept failing. Let me rerun them again.

@tgravescs
Copy link
Contributor

@HyukjinKwon would you have any idea on the build failure? Looks like env is just not setup properly but I've never seen that before.

@wbo4958 wbo4958 force-pushed the yarn-task-resoure-profile branch from 5e01eae to 4f68533 Compare October 1, 2023 00:23
@wbo4958 wbo4958 force-pushed the yarn-task-resoure-profile branch from 4f68533 to f5a105f Compare October 1, 2023 22:10
@wbo4958
Copy link
Contributor Author

wbo4958 commented Oct 2, 2023

Hi @tgravescs, Finally, I rebased this PR and forced-updated my PR, now the CI got passed. Thx

@mridulm mridulm closed this in 5b80639 Oct 3, 2023
@mridulm
Copy link
Contributor

mridulm commented Oct 3, 2023

Merged to master.
Thanks for working on this @wbo4958 !
Thanks for the reviews @tgravescs and @ivoson :-)

@tgravescs
Copy link
Contributor

thanks @mridulm any throught/objections to pulling this back into 3.5 line?
Seems fairly low risk improvement

@mridulm
Copy link
Contributor

mridulm commented Oct 4, 2023

I did not backport it given it was an improvement, but don't have objections as such - as you said, it is low risk

asfgit pushed a commit that referenced this pull request Oct 5, 2023
…n cluster when dynamic allocation disabled

### What changes were proposed in this pull request?
This PR is a follow-up of #37268 which supports stage level task resource profile for standalone cluster when dynamic allocation disabled. This PR enables stage-level task resource profile for yarn cluster.

### Why are the changes needed?

Users who work on spark ML/DL cases running on Yarn would expect stage-level task resource profile feature.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

The current tests of #37268 can also cover this PR since both yarn and standalone cluster share the same TaskSchedulerImpl class which implements this feature. Apart from that, modifying the existing test to cover yarn cluster. Apart from that, I also performed some manual tests which have been updated in the comments.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43030 from wbo4958/yarn-task-resoure-profile.

Authored-by: Bobby Wang <wbo4958@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 5b80639)
Signed-off-by: Thomas Graves <tgraves@apache.org>
@tgravescs
Copy link
Contributor

thanks, I merged back into 3.5 branch (3.5.1) as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants