Skip to content

Conversation

@wbo4958
Copy link
Contributor

@wbo4958 wbo4958 commented Oct 11, 2023

What changes were proposed in this pull request?

This PR is a follow-up of #37268 which supports stage-level task resource profile for standalone cluster when dynamic allocation is disabled. This PR enables stage-level task resource profile for the Kubernetes cluster.

Why are the changes needed?

Users who work on spark ML/DL cases running on Kubernetes would expect stage-level task resource profile feature.

Does this PR introduce any user-facing change?

No

How was this patch tested?

The current tests of #37268 can also cover this PR since both Kubernetes and standalone cluster share the same TaskSchedulerImpl class which implements this feature. Apart from that, modifying the existing test to cover the Kubernetes cluster. Apart from that, I also performed some manual tests which have been updated in the comments.

Was this patch authored or co-authored using generative AI tooling?

No

@wbo4958
Copy link
Contributor Author

wbo4958 commented Oct 11, 2023

This PR is similar to the #43030. And I did some manual tests and the results can be found in the following comments.

@wbo4958
Copy link
Contributor Author

wbo4958 commented Oct 11, 2023

Manual tests

Due to the challenges of conducting Kubernetes application tests within Spark unit tests, I took the initiative to manually perform several tests on the local Kubernetes cluster.

I followed https://jaceklaskowski.github.io/spark-kubernetes-book/demo/spark-shell-on-minikube/ tutorial to setup a local Kubernetes cluster.

minikube delete
minikube start
eval $(minikube -p minikube docker-env)
cd $SPARK_HOME
./bin/docker-image-tool.sh \
  -m \
  -t pr_k8s \
  build

eval $(minikube -p minikube docker-env)
kubectl create ns spark-demo
kubens spark-demo
cd $SPARK_HOME

K8S_SERVER=$(kubectl config view --output=jsonpath='{.clusters[].cluster.server}')

With dynamic allocation disabled.

./bin/spark-shell --master k8s://$K8S_SERVER   \
  --conf spark.kubernetes.container.image=spark:pr_k8s \
  --conf spark.kubernetes.context=minikube  \
  --conf spark.kubernetes.namespace=spark-demo   \
  --verbose \
  --num-executors=1 \
  --conf spark.executor.cores=4  \
  --conf spark.task.cpus=1  \
  --conf spark.dynamicAllocation.enabled=fasle

The above command requires 1 executor with 4 CPU cores, and the default task.cpus = 1, so the default tasks parallelism is 4 at a time.

  1. task.cores=1

Test code:

import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}

val rdd = sc.range(0, 100, 1, 4)
var rdd1 = rdd.repartition(3)

val treqs = new TaskResourceRequests().cpus(1)
val rp = new ResourceProfileBuilder().require(treqs).build

rdd1 = rdd1.withResources(rp)
rdd1.collect()

When the required task.cpus=1, executor.cores=4 (No executor resource specified, use the default one), there will be 4 tasks running for rp.

The entire Spark application consists of a single Spark job that will be divided into two stages. The first shuffle stage comprises four tasks, all of which will be executed simultaneously. And the second ResultStage comprises 3 tasks, and all of which will be executed simultaneously since the required task.cpus is 1.

task_1

  1. task.cores=2

Test code,

import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}

val rdd = sc.range(0, 100, 1, 4)
var rdd1 = rdd.repartition(3)

val treqs = new TaskResourceRequests().cpus(2)
val rp = new ResourceProfileBuilder().require(treqs).build

rdd1 = rdd1.withResources(rp)
rdd1.collect()

When the required task.cpus=2, executor.cores=4 (No executor resource specified, use the default one), there will be 2 tasks running for rp.

The first shuffle stage behaves the same as the first one.

The second ResultStage comprises 3 tasks, so the first 2 tasks will be running at a time, and then execute the last task.

task_2

  1. task.cores=3

Test code,

import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}

val rdd = sc.range(0, 100, 1, 4)
var rdd1 = rdd.repartition(3)

val treqs = new TaskResourceRequests().cpus(3)
val rp = new ResourceProfileBuilder().require(treqs).build

rdd1 = rdd1.withResources(rp)
rdd1.collect()

When the required task.cpus=3, executor.cores=4 (No executor resource specified, use the default one), there will be 1 task running for rp.

The first shuffle stage behaves the same as the first one.

The second ResultStage comprises 3 tasks, all of which will be running serially.

task_3

  1. task.cores=5

exception happened.

import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}

val rdd = sc.range(0, 100, 1, 4)
var rdd1 = rdd.repartition(3)
val treqs = new TaskResourceRequests().cpus(5)
val rp = new ResourceProfileBuilder().require(treqs).build

rdd1 = rdd1.withResources(rp)

rdd1.collect()
scala> import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}
     | 
     | val rdd = sc.range(0, 100, 1, 4)
     | var rdd1 = rdd.repartition(3)
     | val treqs = new TaskResourceRequests().cpus(5)
     | val rp = new ResourceProfileBuilder().require(treqs).build
     | 
     | rdd1 = rdd1.withResources(rp)
     | 
     | rdd1.collect()
warning: 1 deprecation (since 2.13.3); for details, enable `:setting -deprecation` or `:replay -deprecation`
org.apache.spark.SparkException: The number of cores per executor (=4) has to be >= the number of cpus per task = 5.
  at org.apache.spark.resource.ResourceUtils$.validateTaskCpusLargeEnough(ResourceUtils.scala:412)
  at org.apache.spark.resource.ResourceProfile.calculateTasksAndLimitingResource(ResourceProfile.scala:182)
  at org.apache.spark.resource.ResourceProfile.$anonfun$limitingResource$1(ResourceProfile.scala:152)
  at scala.Option.getOrElse(Option.scala:201)
  at org.apache.spark.resource.ResourceProfile.limitingResource(ResourceProfile.scala:151)
  at org.apache.spark.resource.ResourceProfileManager.addResourceProfile(ResourceProfileManager.scala:141)
  at org.apache.spark.rdd.RDD.withResources(RDD.scala:1829)
  ... 42 elided

@wbo4958
Copy link
Contributor Author

wbo4958 commented Oct 11, 2023

With dynamic allocation enabled.

./bin/spark-shell --master k8s://$K8S_SERVER   \
  --conf spark.kubernetes.container.image=spark:pr_k8s \
  --conf spark.kubernetes.context=minikube  \
  --conf spark.kubernetes.namespace=spark-demo   \
  --verbose \
  --num-executors=1 \
  --conf spark.executor.cores=4  \
  --conf spark.task.cpus=1  \
  --conf spark.dynamicAllocation.enabled=true

The above command enables the dynamic allocation and the max executors required is set to 1 in order to test.

TaskResourceProfile without any specific executor request information

Test code,

import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}

val rdd = sc.range(0, 100, 1, 4)
var rdd1 = rdd.repartition(3)

val treqs = new TaskResourceRequests().cpus(3)
val rp = new ResourceProfileBuilder().require(treqs).build

rdd1 = rdd1.withResources(rp)
rdd1.collect()

The rp refers to the TaskResourceProfile without any specific executor request information, thus the executor information will utilize the default values from Default ResourceProfile (executor.cores=4).

The above code will require an extra executor which will have the same executor.cores/memory as the default ResourceProfile.

executor_cores_4_4

task_3_executor_4

Different executor request information

import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfileBuilder, TaskResourceRequests}

val rdd = sc.range(0, 100, 1, 4)
var rdd1 = rdd.repartition(3)

val ereqs = new ExecutorResourceRequests().cores(6);
val treqs = new TaskResourceRequests().cpus(5)

val rp = new ResourceProfileBuilder().require(ereqs).require(treqs).build

rdd1 = rdd1.withResources(rp)
rdd1.collect()

executor_cores_4_6

we can see the "Executor ID = 2" has the 6 CPU cores.

task_5_executor_cores_6

@wbo4958
Copy link
Contributor Author

wbo4958 commented Oct 11, 2023

Hi @tgravescs @mridulm, Could you help review this PR which is similar to #43030. Thx very much.

Copy link
Contributor

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look good, minor nit on the exception text.

}.getMessage
assert(error === "TaskResourceProfiles are only supported for Standalone " +
"and Yarn cluster for now when dynamic allocation is disabled.")
"and Yarn and Kubernetes cluster for now when dynamic allocation is disabled.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit could change this to "Standalone, YARN, and Kubernetes"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, Done.

@tgravescs
Copy link
Contributor

+1 looks good.

asfgit pushed a commit that referenced this pull request Oct 13, 2023
… cluster when dynamic allocation disabled

### What changes were proposed in this pull request?
This PR is a follow-up of #37268 which supports stage-level task resource profile for standalone cluster when dynamic allocation is disabled. This PR enables stage-level task resource profile for the Kubernetes cluster.

### Why are the changes needed?

Users who work on spark ML/DL cases running on Kubernetes would expect stage-level task resource profile feature.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

The current tests of #37268 can also cover this PR since both Kubernetes and standalone cluster share the same TaskSchedulerImpl class which implements this feature. Apart from that, modifying the existing test to cover the Kubernetes cluster. Apart from that, I also performed some manual tests which have been updated in the comments.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43323 from wbo4958/k8s-stage-level.

Authored-by: Bobby Wang <wbo4958@gmail.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
(cherry picked from commit 632eabd)
Signed-off-by: Thomas Graves <tgraves@apache.org>
@asfgit asfgit closed this in 632eabd Oct 13, 2023
@tgravescs
Copy link
Contributor

merged to master and branch-3.5. Thanks @wbo4958 @mridulm

@wbo4958 wbo4958 deleted the k8s-stage-level branch October 16, 2023 23:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants