Skip to content

Commit

Permalink
stop SparkContext
Browse files Browse the repository at this point in the history
  • Loading branch information
sunchao committed Feb 15, 2024
1 parent b7f790c commit a6502e0
Showing 1 changed file with 18 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,18 +236,25 @@ class PluginContainerSuite extends SparkFunSuite with LocalSparkContext {
.set(SparkLauncher.SPARK_MASTER, "local-cluster[2,1,1024]")
.set(PLUGINS, Seq(classOf[MemoryOverridePlugin].getName()))

val sc = new SparkContext(conf)
val memoryManager = sc.env.memoryManager
var sc: SparkContext = null
try {
sc = new SparkContext(conf)
val memoryManager = sc.env.memoryManager

assert(memoryManager.tungstenMemoryMode == MemoryMode.OFF_HEAP)
assert(memoryManager.maxOffHeapStorageMemory == MemoryOverridePlugin.offHeapMemory)
assert(memoryManager.tungstenMemoryMode == MemoryMode.OFF_HEAP)
assert(memoryManager.maxOffHeapStorageMemory == MemoryOverridePlugin.offHeapMemory)

// Ensure all executors has started
TestUtils.waitUntilExecutorsUp(sc, 1, 60000)
// Ensure all executors has started
TestUtils.waitUntilExecutorsUp(sc, 1, 60000)

// Check executor memory is also updated
val execInfo = sc.statusTracker.getExecutorInfos.head
assert(execInfo.totalOffHeapStorageMemory() == MemoryOverridePlugin.offHeapMemory)
// Check executor memory is also updated
val execInfo = sc.statusTracker.getExecutorInfos.head
assert(execInfo.totalOffHeapStorageMemory() == MemoryOverridePlugin.offHeapMemory)
} finally {
if (sc != null) {
sc.stop()
}
}
}
}

Expand All @@ -257,9 +264,8 @@ class MemoryOverridePlugin extends SparkPlugin {
override def init(sc: SparkContext, pluginContext: PluginContext): JMap[String, String] = {
// Take the original executor memory, and set `spark.memory.offHeap.size` to be the
// same value. Also set `spark.memory.offHeap.enabled` to true.
val originalExecutorMemBytes = {
val originalExecutorMemBytes =
sc.conf.getSizeAsMb(EXECUTOR_MEMORY.key, EXECUTOR_MEMORY.defaultValueString)
}
sc.conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")
sc.conf.set(MEMORY_OFFHEAP_SIZE.key, s"${originalExecutorMemBytes}M")
MemoryOverridePlugin.offHeapMemory = sc.conf.getSizeAsBytes(MEMORY_OFFHEAP_SIZE.key)
Expand All @@ -274,8 +280,8 @@ class MemoryOverridePlugin extends SparkPlugin {
}

object MemoryOverridePlugin {
var driverContext: PluginContext = _
var offHeapMemory: Long = _
var totalExecutorMemory: Long = _
}

class NonLocalModeSparkPlugin extends SparkPlugin {
Expand Down

0 comments on commit a6502e0

Please sign in to comment.