Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf,
*/
private[spark] def isSupported(rp: ResourceProfile): Boolean = {
if (rp.isInstanceOf[TaskResourceProfile] && !dynamicEnabled) {
if ((notRunningUnitTests || testExceptionThrown) && !(isStandaloneOrLocalCluster || isYarn)) {
throw new SparkException("TaskResourceProfiles are only supported for Standalone and " +
"Yarn cluster for now when dynamic allocation is disabled.")
if ((notRunningUnitTests || testExceptionThrown) &&
!(isStandaloneOrLocalCluster || isYarn || isK8s)) {
throw new SparkException("TaskResourceProfiles are only supported for Standalone, " +
"Yarn and Kubernetes cluster for now when dynamic allocation is disabled.")
}
} else {
val isNotDefaultProfile = rp.id != ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
val error = intercept[SparkException] {
rpmanager.isSupported(taskProf)
}.getMessage
assert(error === "TaskResourceProfiles are only supported for Standalone " +
"and Yarn cluster for now when dynamic allocation is disabled.")
assert(error === "TaskResourceProfiles are only supported for Standalone, " +
"Yarn and Kubernetes cluster for now when dynamic allocation is disabled.")

// Local cluster: supports task resource profile.
conf.setMaster("local-cluster[1, 1, 1024]")
Expand All @@ -149,6 +149,11 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
conf.setMaster("yarn")
rpmanager = new ResourceProfileManager(conf, listenerBus)
assert(rpmanager.isSupported(taskProf))

// K8s: supports task resource profile.
conf.setMaster("k8s://foo")
rpmanager = new ResourceProfileManager(conf, listenerBus)
assert(rpmanager.isSupported(taskProf))
}

test("isSupported task resource profiles with dynamic allocation enabled") {
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3668,7 +3668,7 @@ See your cluster manager specific page for requirements and details on each of -
# Stage Level Scheduling Overview

The stage level scheduling feature allows users to specify task and executor resource requirements at the stage level. This allows for different stages to run with executors that have different resources. A prime example of this is one ETL stage runs with executors with just CPUs, the next stage is an ML stage that needs GPUs. Stage level scheduling allows for user to request different executors that have GPUs when the ML stage runs rather then having to acquire executors with GPUs at the start of the application and them be idle while the ETL stage is being run.
This is only available for the RDD API in Scala, Java, and Python. It is available on YARN, Kubernetes and Standalone when dynamic allocation is enabled. When dynamic allocation is disabled, it allows users to specify different task resource requirements at stage level, and this is supported on YARN and Standalone cluster right now. See the [YARN](running-on-yarn.html#stage-level-scheduling-overview) page or [Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page or [Standalone](spark-standalone.html#stage-level-scheduling-overview) page for more implementation details.
This is only available for the RDD API in Scala, Java, and Python. It is available on YARN, Kubernetes and Standalone when dynamic allocation is enabled. When dynamic allocation is disabled, it allows users to specify different task resource requirements at stage level, and this is supported on YARN, Kubernetes and Standalone cluster right now. See the [YARN](running-on-yarn.html#stage-level-scheduling-overview) page or [Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page or [Standalone](spark-standalone.html#stage-level-scheduling-overview) page for more implementation details.

See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this feature. When dynamic allocation is disabled, tasks with different task resource requirements will share executors with `DEFAULT_RESOURCE_PROFILE`. While when dynamic allocation is enabled, the current implementation acquires new executors for each `ResourceProfile` created and currently has to be an exact match. Spark does not try to fit tasks into an executor that require a different ResourceProfile than the executor was created with. Executors that are not in use will idle timeout with the dynamic allocation logic. The default configuration for this feature is to only allow one ResourceProfile per stage. If the user associates more then 1 ResourceProfile to an RDD, Spark will throw an exception by default. See config `spark.scheduler.resource.profileMergeConflicts` to control that behavior. The current merge strategy Spark implements when `spark.scheduler.resource.profileMergeConflicts` is enabled is a simple max of each resource within the conflicting ResourceProfiles. Spark will create a new ResourceProfile with the max of each of the resources.

Expand Down
4 changes: 3 additions & 1 deletion docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -1949,5 +1949,7 @@ With the above configuration, the job will be scheduled by YuniKorn scheduler in

### Stage Level Scheduling Overview

Stage level scheduling is supported on Kubernetes when dynamic allocation is enabled. This also requires <code>spark.dynamicAllocation.shuffleTracking.enabled</code> to be enabled since Kubernetes doesn't support an external shuffle service at this time. The order in which containers for different profiles is requested from Kubernetes is not guaranteed. Note that since dynamic allocation on Kubernetes requires the shuffle tracking feature, this means that executors from previous stages that used a different ResourceProfile may not idle timeout due to having shuffle data on them. This could result in using more cluster resources and in the worst case if there are no remaining resources on the Kubernetes cluster then Spark could potentially hang. You may consider looking at config <code>spark.dynamicAllocation.shuffleTracking.timeout</code> to set a timeout, but that could result in data having to be recomputed if the shuffle data is really needed.
Stage level scheduling is supported on Kubernetes:
- When dynamic allocation is disabled: It allows users to specify different task resource requirements at the stage level and will use the same executors requested at startup.
- When dynamic allocation is enabled: It allows users to specify task and executor resource requirements at the stage level and will request the extra executors. This also requires <code>spark.dynamicAllocation.shuffleTracking.enabled</code> to be enabled since Kubernetes doesn't support an external shuffle service at this time. The order in which containers for different profiles is requested from Kubernetes is not guaranteed. Note that since dynamic allocation on Kubernetes requires the shuffle tracking feature, this means that executors from previous stages that used a different ResourceProfile may not idle timeout due to having shuffle data on them. This could result in using more cluster resources and in the worst case if there are no remaining resources on the Kubernetes cluster then Spark could potentially hang. You may consider looking at config <code>spark.dynamicAllocation.shuffleTracking.timeout</code> to set a timeout, but that could result in data having to be recomputed if the shuffle data is really needed.
Note, there is a difference in the way pod template resources are handled between the base default profile and custom ResourceProfiles. Any resources specified in the pod template file will only be used with the base default profile. If you create custom ResourceProfiles be sure to include all necessary resources there since the resources from the template file will not be propagated to custom ResourceProfiles.