Skip to content

Commit bc12098

Browse files
committed
Fallback to Spark if off-heap config is not enabled
1 parent 4ed55c1 commit bc12098

File tree

1 file changed

+15
-1
lines changed

1 file changed

+15
-1
lines changed

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1162,8 +1162,22 @@ object CometSparkSessionExtensions extends Logging {
11621162
}
11631163
}
11641164

1165+
private[comet] def isOffHeapEnabled(conf: SQLConf): Boolean =
1166+
conf.contains("spark.memory.offHeap.enabled") &&
1167+
conf.getConfString("spark.memory.offHeap.enabled").toBoolean
1168+
1169+
// Copied from org.apache.spark.util.Utils which is private to Spark.
1170+
private[comet] def isTesting: Boolean = {
1171+
System.getenv("SPARK_TESTING") != null || System.getProperty("spark.testing") != null
1172+
}
1173+
1174+
// Check whether Comet shuffle is enabled:
1175+
// 1. `COMET_EXEC_SHUFFLE_ENABLED` is true
1176+
// 2. `spark.shuffle.manager` is set to `CometShuffleManager`
1177+
// 3. Off-heap memory is enabled || Spark/Comet unit testing
11651178
private[comet] def isCometShuffleEnabled(conf: SQLConf): Boolean =
1166-
COMET_EXEC_SHUFFLE_ENABLED.get(conf) && isCometShuffleManagerEnabled(conf)
1179+
COMET_EXEC_SHUFFLE_ENABLED.get(conf) && isCometShuffleManagerEnabled(conf) &&
1180+
(isOffHeapEnabled(conf) || isTesting)
11671181

11681182
private[comet] def getCometShuffleNotEnabledReason(conf: SQLConf): Option[String] = {
11691183
if (!COMET_EXEC_SHUFFLE_ENABLED.get(conf)) {

0 commit comments

Comments
 (0)