Skip to content

Commit

Permalink
Fix memory leak
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed May 4, 2024
1 parent 126675e commit b37070d
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 11 deletions.
11 changes: 1 addition & 10 deletions dev/diffs/3.4.2.diff
Original file line number Diff line number Diff line change
Expand Up @@ -1149,7 +1149,7 @@ index ac710c32296..88a5329e74e 100644
val df = spark.read.parquet(path).selectExpr(projection: _*)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 593bd7bb4ba..f39fe59f36b 100644
index 593bd7bb4ba..2518d715154 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -26,9 +26,11 @@ import org.scalatest.time.SpanSugar._
Expand Down Expand Up @@ -1268,15 +1268,6 @@ index 593bd7bb4ba..f39fe59f36b 100644
val localShuffleRDD0 = localReads(0).execute().asInstanceOf[ShuffledRowRDD]
val localShuffleRDD1 = localReads(1).execute().asInstanceOf[ShuffledRowRDD]
// the final parallelism is math.max(1, numReduces / numMappers): math.max(1, 5/2) = 2
@@ -298,7 +315,7 @@ class AdaptiveQueryExecSuite
.groupBy($"a").count()
checkAnswer(testDf, Seq())
val plan = testDf.queryExecution.executedPlan
- assert(find(plan)(_.isInstanceOf[SortMergeJoinExec]).isDefined)
+ assert(find(plan)(_.isInstanceOf[CometSortMergeJoinExec]).isDefined)
val coalescedReads = collect(plan) {
case r: AQEShuffleReadExec => r
}
@@ -322,7 +339,7 @@ class AdaptiveQueryExecSuite
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,9 @@ object CometSparkSessionExtensions extends Logging {
private[comet] def isCometShuffleEnabled(conf: SQLConf): Boolean =
COMET_EXEC_SHUFFLE_ENABLED.get(conf) &&
(conf.contains("spark.shuffle.manager") && conf.getConfString("spark.shuffle.manager") ==
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") &&
// TODO: AQE coalesce partitions feature causes Comet columnar shuffle memory leak
!conf.coalesceShufflePartitionsEnabled

private[comet] def isCometScanEnabled(conf: SQLConf): Boolean = {
COMET_SCAN_ENABLED.get(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ abstract class CometTestBase
conf.set(MEMORY_OFFHEAP_SIZE.key, "2g")
conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "1g")
conf.set(SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key, "1g")
conf.set(SQLConf.COALESCE_PARTITIONS_ENABLED.key, "false")
conf.set(CometConf.COMET_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key, "true")
Expand Down

0 comments on commit b37070d

Please sign in to comment.