-
Couldn't load subscription status.
- Fork 246
Description
What is the problem the feature request solves?
The DataFusion Comet documentation has a memory tuning section in the tuning guide after addressing #595, it looks simple at first glance, but I found that the actual behavior is more complex than what I've thought.
spark.comet.memory.overhead.factorandspark.comet.memoryOverheadare for per-operator limit, not per-executor/per-worker or per-core. When a comet plan is created, it creates its own memory pool sizedspark.comet.memoryOverhead. Usually, we havespark.executor.coresequal to the number of vCPUs, so the actual amount of memory allocated for comet in the worker instance will be (at least)spark.executor.cores* spark.comet.memoryOverhead.- We have
CometPluginfor configuring comet memory overhead automatically, butCometPlugindoes not account for the existence of multiple executor cores. The actual per-instance comet memory consumption will be more than the configured memory overhead whenspark.executor.cores> 1. - Even when assuming
spark.executor.cores = 1and we are only running one single task on each executor instance, there are still chances to have multiple comet executors allocating multiple memory pools, so the actual memory limit will be multiple times ofspark.comet.memoryOverhead. The following figure shows the DAG of a Spark job. We can see that Stage 205 has 3CometSortnodes, each node may consumespark.comet.memoryOverheadamount of memory. This is a conservative estimation since we assume that all other nodes in this stage won't reserve significant amount of memory.

The conclusion is that the actual memory limit for comet depends on:
spark.comet.memoryOverhead- Number of cores per executor, as well as related configurations such as
spark.task.cpus - The maximum number of memory-intensive Comet nodes in one stage
This makes comet hard to tune and the behavior is hard to estimate (it depends on the actual queries). We'd better make it clear in the tuning guide or revamp the memory-related configurations to make it easier to tune and reason about.
Describe the potential solution
Ideally the spark.comet.memory.overhead.factor and spark.comet.memoryOverhead configure the per executor instance memory limit. I have the following ideas to achieve this:
- Use the unified memory manager introduced by feat: Introduce
CometTaskMemoryManagerand native side memory pool #83. This requires enabling off-heap memory in Spark. I'm not sure why it does not appear in the tuning guide (due to its maturity maybe). The downside is that comet operators cannot trigger the spilling of other memory consumers, which makes it easy to run into issues similar to SparkOutOfMemoryError happens when running CometColumnarExchange #886 due to its greedy/unfair nature. - Making all comet operators in the same task sharing the same FairSpillPool. The memory limit of the fair spill pool can be
spark.comet.memoryOverhead / numTaskSlots. It ensures that each operator can get the minimum amount of memory, especially when we only support self-spilling. The downside is memory under-utilization when the memory requirements of the operators are very uneven (Memory manager triggers unnecessary spills datafusion#2829).
I'm not sure if it is feasible to implement non-self-spill memory reclaiming on top of 1 or 2, but I think it will help a lot to handle various kinds of workloads efficiently.
Additional context
No response