Skip to content

Commit

Permalink
Revert changes for enabling native memory management
Browse files Browse the repository at this point in the history
  • Loading branch information
Kontinuation committed Oct 16, 2024
1 parent 5dc8ce0 commit 4607e8d
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,6 @@ class CometTPCDSQuerySuite
override val tpcdsQueries: Seq[String] = tpcdsAllQueries

override val tpcdsQueriesV2_7_0: Seq[String] = tpcdsAllQueriesV2_7_0

private val useOffHeapMemory = sys.env.get("USE_OFFHEAP_MEMORY")
}
with CometTPCDSQueryTestSuite
with ShimCometTPCDSQuerySuite {
Expand All @@ -190,10 +188,8 @@ class CometTPCDSQuerySuite
conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "15g")
conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
if (useOffHeapMemory.getOrElse("false").toBoolean) {
conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")
conf.set(MEMORY_OFFHEAP_SIZE.key, "15g")
}
conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")
conf.set(MEMORY_OFFHEAP_SIZE.key, "15g")
conf
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ import org.apache.comet.shims.ShimCometTPCHQuerySuite
class CometTPCHQuerySuite extends QueryTest with TPCBase with ShimCometTPCHQuerySuite {

private val tpchDataPath = sys.env.get("SPARK_TPCH_DATA")
private val useOffHeapMemory = sys.env.get("USE_OFFHEAP_MEMORY")

val tpchQueries: Seq[String] = Seq(
"q1",
Expand Down Expand Up @@ -92,11 +91,8 @@ class CometTPCHQuerySuite extends QueryTest with TPCBase with ShimCometTPCHQuery
conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
conf.set(CometConf.COMET_SHUFFLE_MODE.key, "jvm")
if (useOffHeapMemory.getOrElse("false").toBoolean) {
conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")
conf.set(MEMORY_OFFHEAP_SIZE.key, "2g")
}
conf
conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")
conf.set(MEMORY_OFFHEAP_SIZE.key, "2g")
}

protected override def createSparkSession: TestSparkSession = {
Expand Down
8 changes: 2 additions & 6 deletions spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,15 @@ abstract class CometTestBase
protected val shuffleManager: String =
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"

private val useOffHeapMemory = sys.env.get("USE_OFFHEAP_MEMORY")

protected def sparkConf: SparkConf = {
val conf = new SparkConf()
conf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
conf.set("spark.ui.enabled", "false")
conf.set(SQLConf.SHUFFLE_PARTITIONS, 10) // reduce parallelism in tests
conf.set(SQLConf.ANSI_ENABLED.key, "false")
conf.set(SHUFFLE_MANAGER, shuffleManager)
if (useOffHeapMemory.getOrElse("false").toBoolean) {
conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")
conf.set(MEMORY_OFFHEAP_SIZE.key, "2g")
}
conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")
conf.set(MEMORY_OFFHEAP_SIZE.key, "2g")
conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "1g")
conf.set(SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key, "1g")
conf.set(CometConf.COMET_ENABLED.key, "true")
Expand Down

0 comments on commit 4607e8d

Please sign in to comment.