Skip to content

Commit 998086c

Browse files
Warren Zhutgravescs
authored andcommitted
[SPARK-30794][CORE] Stage Level scheduling: Add ability to set off heap memory
### What changes were proposed in this pull request? Support set off heap memory in `ExecutorResourceRequests` ### Why are the changes needed? Support stage level scheduling ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UT in `ResourceProfileSuite` and `DAGSchedulerSuite` Closes #28972 from warrenzhu25/30794. Authored-by: Warren Zhu <zhonzh@microsoft.com> Signed-off-by: Thomas Graves <tgraves@apache.org>
1 parent a82aee0 commit 998086c

File tree

8 files changed

+102
-15
lines changed

8 files changed

+102
-15
lines changed

core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,20 @@ class ExecutorResourceRequests() extends Serializable {
5454
this
5555
}
5656

57+
/**
58+
* Specify off heap memory. The value specified will be converted to MiB.
59+
* This value only take effect when MEMORY_OFFHEAP_ENABLED is true.
60+
*
61+
* @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
62+
* Default unit is MiB if not specified.
63+
*/
64+
def offHeapMemory(amount: String): this.type = {
65+
val amountMiB = JavaUtils.byteStringAsMb(amount)
66+
val req = new ExecutorResourceRequest(OFFHEAP_MEM, amountMiB)
67+
_executorResources.put(OFFHEAP_MEM, req)
68+
this
69+
}
70+
5771
/**
5872
* Specify overhead memory. The value specified will be converted to MiB.
5973
*

core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,13 +243,15 @@ object ResourceProfile extends Logging {
243243
// task resources
244244
val CPUS = "cpus"
245245
// Executor resources
246+
// Make sure add new executor resource in below allSupportedExecutorResources
246247
val CORES = "cores"
247248
val MEMORY = "memory"
249+
val OFFHEAP_MEM = "offHeap"
248250
val OVERHEAD_MEM = "memoryOverhead"
249251
val PYSPARK_MEM = "pyspark.memory"
250252

251253
// all supported spark executor resources (minus the custom resources like GPUs/FPGAs)
252-
val allSupportedExecutorResources = Seq(CORES, MEMORY, OVERHEAD_MEM, PYSPARK_MEM)
254+
val allSupportedExecutorResources = Seq(CORES, MEMORY, OVERHEAD_MEM, PYSPARK_MEM, OFFHEAP_MEM)
253255

254256
val UNKNOWN_RESOURCE_PROFILE_ID = -1
255257
val DEFAULT_RESOURCE_PROFILE_ID = 0
@@ -295,6 +297,10 @@ object ResourceProfile extends Logging {
295297
ereqs.memory(conf.get(EXECUTOR_MEMORY).toString)
296298
conf.get(EXECUTOR_MEMORY_OVERHEAD).map(mem => ereqs.memoryOverhead(mem.toString))
297299
conf.get(PYSPARK_EXECUTOR_MEMORY).map(mem => ereqs.pysparkMemory(mem.toString))
300+
if (conf.get(MEMORY_OFFHEAP_ENABLED)) {
301+
// Explicitly add suffix b as default unit of offHeapMemory is Mib
302+
ereqs.offHeapMemory(conf.get(MEMORY_OFFHEAP_SIZE).toString + "b")
303+
}
298304
val execReq = ResourceUtils.parseAllResourceRequests(conf, SPARK_EXECUTOR_PREFIX)
299305
execReq.foreach { req =>
300306
val name = req.id.resourceName

core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.resource
1919

2020
import org.apache.spark.{SparkConf, SparkFunSuite}
21-
import org.apache.spark.internal.config.{EXECUTOR_CORES, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
21+
import org.apache.spark.internal.config._
2222
import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY
2323
import org.apache.spark.resource.TestResourceIDs._
2424

@@ -55,6 +55,8 @@ class ResourceProfileSuite extends SparkFunSuite {
5555
"pyspark memory empty if not specified")
5656
assert(rprof.executorResources.get(ResourceProfile.OVERHEAD_MEM) == None,
5757
"overhead memory empty if not specified")
58+
assert(rprof.executorResources.get(ResourceProfile.OFFHEAP_MEM) == None,
59+
"offHeap memory empty if not specified")
5860
assert(rprof.taskResources.size === 1,
5961
"Task resources should just contain cpus by default")
6062
assert(rprof.taskResources(ResourceProfile.CPUS).amount === 1,
@@ -69,14 +71,16 @@ class ResourceProfileSuite extends SparkFunSuite {
6971
conf.set(EXECUTOR_MEMORY_OVERHEAD.key, "1g")
7072
conf.set(EXECUTOR_MEMORY.key, "4g")
7173
conf.set(EXECUTOR_CORES.key, "4")
74+
conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")
75+
conf.set(MEMORY_OFFHEAP_SIZE.key, "3m")
7276
conf.set(TASK_GPU_ID.amountConf, "1")
7377
conf.set(EXECUTOR_GPU_ID.amountConf, "1")
7478
conf.set(EXECUTOR_GPU_ID.discoveryScriptConf, "nameOfScript")
7579
val rprof = ResourceProfile.getOrCreateDefaultProfile(conf)
7680
assert(rprof.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
7781
val execResources = rprof.executorResources
78-
assert(execResources.size === 5, s"Executor resources should contain cores, pyspark " +
79-
s"memory, memory overhead, memory, and gpu $execResources")
82+
assert(execResources.size === 6, s"Executor resources should contain cores, pyspark " +
83+
s"memory, memory overhead, memory, offHeap memory and gpu $execResources")
8084
assert(execResources.contains("gpu"), "Executor resources should have gpu")
8185
assert(rprof.executorResources(ResourceProfile.CORES).amount === 4,
8286
"Executor resources should have 4 core")
@@ -88,6 +92,8 @@ class ResourceProfileSuite extends SparkFunSuite {
8892
"pyspark memory empty if not specified")
8993
assert(rprof.executorResources(ResourceProfile.OVERHEAD_MEM).amount == 1024,
9094
"overhead memory empty if not specified")
95+
assert(rprof.executorResources(ResourceProfile.OFFHEAP_MEM).amount == 3,
96+
"Executor resources should have 3 offHeap memory")
9197
assert(rprof.taskResources.size === 2,
9298
"Task resources should just contain cpus and gpu")
9399
assert(rprof.taskResources.contains("gpu"), "Task resources should have gpu")
@@ -172,14 +178,14 @@ class ResourceProfileSuite extends SparkFunSuite {
172178

173179
val ereqs = new ExecutorResourceRequests()
174180
ereqs.cores(2).memory("4096")
175-
ereqs.memoryOverhead("2048").pysparkMemory("1024")
181+
ereqs.memoryOverhead("2048").pysparkMemory("1024").offHeapMemory("3072")
176182
val treqs = new TaskResourceRequests()
177183
treqs.cpus(1)
178184

179185
rprof.require(treqs)
180186
rprof.require(ereqs)
181187

182-
assert(rprof.executorResources.size === 5)
188+
assert(rprof.executorResources.size === 6)
183189
assert(rprof.executorResources(ResourceProfile.CORES).amount === 2,
184190
"Executor resources should have 2 cores")
185191
assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 4096,
@@ -188,6 +194,8 @@ class ResourceProfileSuite extends SparkFunSuite {
188194
"Executor resources should have 2048 overhead memory")
189195
assert(rprof.executorResources(ResourceProfile.PYSPARK_MEM).amount === 1024,
190196
"Executor resources should have 1024 pyspark memory")
197+
assert(rprof.executorResources(ResourceProfile.OFFHEAP_MEM).amount === 3072,
198+
"Executor resources should have 3072 offHeap memory")
191199

192200
assert(rprof.taskResources.size === 2)
193201
assert(rprof.taskResources("cpus").amount === 1, "Task resources should have cpu")
@@ -217,7 +225,7 @@ class ResourceProfileSuite extends SparkFunSuite {
217225
val rprof = new ResourceProfileBuilder()
218226
val ereqs = new ExecutorResourceRequests()
219227
ereqs.memory("4g")
220-
ereqs.memoryOverhead("2000m").pysparkMemory("512000k")
228+
ereqs.memoryOverhead("2000m").pysparkMemory("512000k").offHeapMemory("1g")
221229
rprof.require(ereqs)
222230

223231
assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 4096,
@@ -226,6 +234,8 @@ class ResourceProfileSuite extends SparkFunSuite {
226234
"Executor resources should have 2000 overhead memory")
227235
assert(rprof.executorResources(ResourceProfile.PYSPARK_MEM).amount === 500,
228236
"Executor resources should have 512 pyspark memory")
237+
assert(rprof.executorResources(ResourceProfile.OFFHEAP_MEM).amount === 1024,
238+
"Executor resources should have 1024 offHeap memory")
229239
}
230240

231241
test("Test TaskResourceRequest fractional") {
@@ -256,4 +266,32 @@ class ResourceProfileSuite extends SparkFunSuite {
256266
}.getMessage()
257267
assert(taskError.contains("The resource amount 0.7 must be either <= 0.5, or a whole number."))
258268
}
269+
270+
test("ResourceProfile has correct custom executor resources") {
271+
val rprof = new ResourceProfileBuilder()
272+
val eReq = new ExecutorResourceRequests()
273+
.cores(2).memory("4096")
274+
.memoryOverhead("2048").pysparkMemory("1024").offHeapMemory("3072")
275+
.resource("gpu", 2)
276+
rprof.require(eReq)
277+
278+
// Update this if new resource type added
279+
assert(ResourceProfile.allSupportedExecutorResources.size === 5,
280+
"Executor resources should have 5 supported resources")
281+
assert(ResourceProfile.getCustomExecutorResources(rprof.build).size === 1,
282+
"Executor resources should have 1 custom resource")
283+
}
284+
285+
test("ResourceProfile has correct custom task resources") {
286+
val rprof = new ResourceProfileBuilder()
287+
val taskReq = new TaskResourceRequests()
288+
.resource("gpu", 1)
289+
val eReq = new ExecutorResourceRequests()
290+
.cores(2).memory("4096")
291+
.memoryOverhead("2048").pysparkMemory("1024").offHeapMemory("3072")
292+
rprof.require(taskReq).require(eReq)
293+
294+
assert(ResourceProfile.getCustomTaskResources(rprof.build).size === 1,
295+
"Task resources should have 1 custom resource")
296+
}
259297
}

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3286,7 +3286,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
32863286
assert(mergedRp.taskResources.get(GPU).get.amount == 1)
32873287

32883288
val ereqs5 = new ExecutorResourceRequests().cores(1).memory("3g")
3289-
.memoryOverhead("1g").pysparkMemory("2g").resource(GPU, 1, "disc")
3289+
.memoryOverhead("1g").pysparkMemory("2g").offHeapMemory("4g").resource(GPU, 1, "disc")
32903290
val treqs5 = new TaskResourceRequests().cpus(1).resource(GPU, 1)
32913291
val rp5 = new ResourceProfile(ereqs5.requests, treqs5.requests)
32923292
val ereqs6 = new ExecutorResourceRequests().cores(8).resource(FPGA, 2, "fdisc")
@@ -3296,7 +3296,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
32963296

32973297
assert(mergedRp.getTaskCpus.get == 2)
32983298
assert(mergedRp.getExecutorCores.get == 8)
3299-
assert(mergedRp.executorResources.size == 6)
3299+
assert(mergedRp.executorResources.size == 7)
33003300
assert(mergedRp.taskResources.size == 3)
33013301
assert(mergedRp.executorResources.get(GPU).get.amount == 1)
33023302
assert(mergedRp.executorResources.get(GPU).get.discoveryScript == "disc")
@@ -3307,6 +3307,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
33073307
assert(mergedRp.executorResources.get(ResourceProfile.MEMORY).get.amount == 3072)
33083308
assert(mergedRp.executorResources.get(ResourceProfile.PYSPARK_MEM).get.amount == 2048)
33093309
assert(mergedRp.executorResources.get(ResourceProfile.OVERHEAD_MEM).get.amount == 1024)
3310+
assert(mergedRp.executorResources.get(ResourceProfile.OFFHEAP_MEM).get.amount == 4096)
33103311

33113312
val ereqs7 = new ExecutorResourceRequests().cores(1).memory("3g")
33123313
.resource(GPU, 4, "disc")

python/pyspark/resource/requests.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ class ExecutorResourceRequests(object):
9191
_MEMORY = "memory"
9292
_OVERHEAD_MEM = "memoryOverhead"
9393
_PYSPARK_MEM = "pyspark.memory"
94+
_OFFHEAP_MEM = "offHeap"
9495

9596
def __init__(self, _jvm=None, _requests=None):
9697
from pyspark import SparkContext
@@ -139,6 +140,14 @@ def pysparkMemory(self, amount):
139140
ExecutorResourceRequest(self._PYSPARK_MEM, _parse_memory(amount))
140141
return self
141142

143+
def offheapMemory(self, amount):
144+
if self._java_executor_resource_requests is not None:
145+
self._java_executor_resource_requests.offHeapMemory(amount)
146+
else:
147+
self._executor_resources[self._OFFHEAP_MEM] = \
148+
ExecutorResourceRequest(self._OFFHEAP_MEM, _parse_memory(amount))
149+
return self
150+
142151
def cores(self, amount):
143152
if self._java_executor_resource_requests is not None:
144153
self._java_executor_resource_requests.cores(amount)

python/pyspark/resource/tests/test_resources.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,16 @@ class ResourceProfileTests(unittest.TestCase):
2525
def test_profile_before_sc(self):
2626
rpb = ResourceProfileBuilder()
2727
ereqs = ExecutorResourceRequests().cores(2).memory("6g").memoryOverhead("1g")
28-
ereqs.pysparkMemory("2g").resource("gpu", 2, "testGpus", "nvidia.com")
28+
ereqs.pysparkMemory("2g").offheapMemory("3g").resource("gpu", 2, "testGpus", "nvidia.com")
2929
treqs = TaskResourceRequests().cpus(2).resource("gpu", 2)
3030

3131
def assert_request_contents(exec_reqs, task_reqs):
32-
self.assertEqual(len(exec_reqs), 5)
32+
self.assertEqual(len(exec_reqs), 6)
3333
self.assertEqual(exec_reqs["cores"].amount, 2)
3434
self.assertEqual(exec_reqs["memory"].amount, 6144)
3535
self.assertEqual(exec_reqs["memoryOverhead"].amount, 1024)
3636
self.assertEqual(exec_reqs["pyspark.memory"].amount, 2048)
37+
self.assertEqual(exec_reqs["offHeap"].amount, 3072)
3738
self.assertEqual(exec_reqs["gpu"].amount, 2)
3839
self.assertEqual(exec_reqs["gpu"].discoveryScript, "testGpus")
3940
self.assertEqual(exec_reqs["gpu"].resourceName, "gpu")

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,6 @@ private[yarn] class YarnAllocator(
308308
if (!rpIdToYarnResource.contains(rp.id)) {
309309
// Start with the application or default settings
310310
var heapMem = executorMemory.toLong
311-
// Note we currently don't support off heap memory in ResourceProfile - SPARK-30794
312311
var offHeapMem = executorOffHeapMemory.toLong
313312
var overheadMem = memoryOverhead.toLong
314313
var pysparkMem = pysparkWorkerMemory.toLong
@@ -326,6 +325,8 @@ private[yarn] class YarnAllocator(
326325
overheadMem = execReq.amount
327326
case ResourceProfile.PYSPARK_MEM =>
328327
pysparkMem = execReq.amount
328+
case ResourceProfile.OFFHEAP_MEM =>
329+
offHeapMem = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf, execReq)
329330
case ResourceProfile.CORES =>
330331
cores = execReq.amount.toInt
331332
case "gpu" =>

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils
2828
import org.apache.spark.{SecurityManager, SparkConf}
2929
import org.apache.spark.internal.config._
3030
import org.apache.spark.launcher.YarnCommandBuilderUtils
31+
import org.apache.spark.resource.ExecutorResourceRequest
3132
import org.apache.spark.util.Utils
3233

3334
object YarnSparkHadoopUtil {
@@ -187,11 +188,27 @@ object YarnSparkHadoopUtil {
187188
* Convert MEMORY_OFFHEAP_SIZE to MB Unit, return 0 if MEMORY_OFFHEAP_ENABLED is false.
188189
*/
189190
def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf): Int = {
191+
val sizeInMB = Utils.memoryStringToMb(sparkConf.get(MEMORY_OFFHEAP_SIZE).toString)
192+
checkOffHeapEnabled(sparkConf, sizeInMB).toInt
193+
}
194+
195+
/**
196+
* Get offHeap memory size from [[ExecutorResourceRequest]]
197+
* return 0 if MEMORY_OFFHEAP_ENABLED is false.
198+
*/
199+
def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf,
200+
execRequest: ExecutorResourceRequest): Long = {
201+
checkOffHeapEnabled(sparkConf, execRequest.amount)
202+
}
203+
204+
/**
205+
* return 0 if MEMORY_OFFHEAP_ENABLED is false.
206+
*/
207+
def checkOffHeapEnabled(sparkConf: SparkConf, offHeapSize: Long): Long = {
190208
if (sparkConf.get(MEMORY_OFFHEAP_ENABLED)) {
191-
val sizeInMB = Utils.memoryStringToMb(sparkConf.get(MEMORY_OFFHEAP_SIZE).toString)
192-
require(sizeInMB > 0,
209+
require(offHeapSize > 0,
193210
s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when ${MEMORY_OFFHEAP_ENABLED.key} == true")
194-
sizeInMB
211+
offHeapSize
195212
} else {
196213
0
197214
}

0 commit comments

Comments
 (0)