-
Couldn't load subscription status.
- Fork 246
Description
Describe the bug
We easily run into this problem when running queries with spark.comet.exec.shuffle.mode=jvm:
org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 67108848 bytes of memory, got 65700208 bytes. Available: 65700208
org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 67108848 bytes of memory, got 65700208 bytes. Available: 65700208
at org.apache.spark.shuffle.comet.CometShuffleMemoryAllocator.allocate(CometShuffleMemoryAllocator.java:132)
at org.apache.spark.shuffle.comet.CometShuffleMemoryAllocator.allocatePage(CometShuffleMemoryAllocator.java:119)
at org.apache.spark.sql.comet.execution.shuffle.SpillWriter.initialCurrentPage(SpillWriter.java:158)
at org.apache.spark.shuffle.sort.CometShuffleExternalSorter.insertRecord(CometShuffleExternalSorter.java:368)
at org.apache.spark.sql.comet.execution.shuffle.CometUnsafeShuffleWriter.insertRecordIntoSorter(CometUnsafeShuffleWriter.java:278)
at org.apache.spark.sql.comet.execution.shuffle.CometUnsafeShuffleWriter.write(CometUnsafeShuffleWriter.java:206)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
We've observed this problem not only on our own workloads but also on TPC-H benchmarks. The above-mentioned exception happens when running TPC-H query 5 on parquet files with scale factor = 1000.
We've tried to disable the comet shuffle manager and use Spark's own shuffle exchange, all TPC-H queries could finish successfully.
Steps to reproduce
Running TPC-H query 5 on a Spark cluster. The detailed environment and spark configurations are listed in Additional context.
Expected behavior
All TPC-H queries should finish successfully.
Additional context
The problem was produced on a self-deployed K8S Spark cluster on AWS.
- Driver/executor instance type: r7i.2xlarge (8 vCPUs, 64GB memory)
- Executor pod resource limit: 6 vCPUs, 48GB memory. We reserved some resources for some reason
- Number of executor instances: 48
- Spark version: 3.4.0
- Java version: 17
- Comet version: commit 9205f0d
Here are relevant spark configurations:
spark.executor.cores 6
spark.executor.memory 30719m
# Reserve native memory for comet, python and other stuff
spark.executor.memoryOverheadFactor 0.6
# Each executor core gets 1.2 GB memory for comet, all 6 executors will use 7.2GB memory.
# I know this is too small for comet, but it should not prevent the query from finishing
spark.comet.memory.overhead.factor 0.04
spark.sql.extensions org.apache.comet.CometSparkSessionExtensions
spark.comet.enabled true
spark.comet.exec.enabled true
spark.comet.exec.all.enabled true
spark.comet.exec.shuffle.enabled true
spark.comet.exec.shuffle.mode jvm
spark.shuffle.manager org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager