diff --git a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala index 844858b09e3c0..b4cfce5f3b050 100644 --- a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala +++ b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala @@ -236,18 +236,25 @@ class PluginContainerSuite extends SparkFunSuite with LocalSparkContext { .set(SparkLauncher.SPARK_MASTER, "local-cluster[2,1,1024]") .set(PLUGINS, Seq(classOf[MemoryOverridePlugin].getName())) - val sc = new SparkContext(conf) - val memoryManager = sc.env.memoryManager + var sc: SparkContext = null + try { + sc = new SparkContext(conf) + val memoryManager = sc.env.memoryManager - assert(memoryManager.tungstenMemoryMode == MemoryMode.OFF_HEAP) - assert(memoryManager.maxOffHeapStorageMemory == MemoryOverridePlugin.offHeapMemory) + assert(memoryManager.tungstenMemoryMode == MemoryMode.OFF_HEAP) + assert(memoryManager.maxOffHeapStorageMemory == MemoryOverridePlugin.offHeapMemory) - // Ensure all executors has started - TestUtils.waitUntilExecutorsUp(sc, 1, 60000) + // Ensure all executors has started + TestUtils.waitUntilExecutorsUp(sc, 1, 60000) - // Check executor memory is also updated - val execInfo = sc.statusTracker.getExecutorInfos.head - assert(execInfo.totalOffHeapStorageMemory() == MemoryOverridePlugin.offHeapMemory) + // Check executor memory is also updated + val execInfo = sc.statusTracker.getExecutorInfos.head + assert(execInfo.totalOffHeapStorageMemory() == MemoryOverridePlugin.offHeapMemory) + } finally { + if (sc != null) { + sc.stop() + } + } } } @@ -257,9 +264,8 @@ class MemoryOverridePlugin extends SparkPlugin { override def init(sc: SparkContext, pluginContext: PluginContext): JMap[String, String] = { // Take the original executor memory, and set `spark.memory.offHeap.size` to be the // same value. Also set `spark.memory.offHeap.enabled` to true. - val originalExecutorMemBytes = { + val originalExecutorMemBytes = sc.conf.getSizeAsMb(EXECUTOR_MEMORY.key, EXECUTOR_MEMORY.defaultValueString) - } sc.conf.set(MEMORY_OFFHEAP_ENABLED.key, "true") sc.conf.set(MEMORY_OFFHEAP_SIZE.key, s"${originalExecutorMemBytes}M") MemoryOverridePlugin.offHeapMemory = sc.conf.getSizeAsBytes(MEMORY_OFFHEAP_SIZE.key) @@ -274,8 +280,8 @@ class MemoryOverridePlugin extends SparkPlugin { } object MemoryOverridePlugin { + var driverContext: PluginContext = _ var offHeapMemory: Long = _ - var totalExecutorMemory: Long = _ } class NonLocalModeSparkPlugin extends SparkPlugin {