Skip to content

Commit

Permalink
Support greedy_task_shared
Browse files Browse the repository at this point in the history
  • Loading branch information
Kontinuation committed Oct 17, 2024
1 parent 33f0424 commit 7f68c4f
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 8 deletions.
5 changes: 3 additions & 2 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,9 @@ object CometConf extends ShimCometConf {
val COMET_EXEC_MEMORY_POOL_TYPE: ConfigEntry[String] = conf("spark.comet.exec.memoryPool")
.doc(
"The type of memory pool to be used for Comet native execution. " +
"Available memory pool types are 'greedy', 'fair_spill', 'fair_spill_task_shared', " +
"'greedy_global' and 'fair_spill_global', By default, this config is 'greedy'.")
"Available memory pool types are 'greedy', 'fair_spill', 'greedy_task_shared', " +
"'fair_spill_task_shared', 'greedy_global' and 'fair_spill_global', By default, " +
"this config is 'greedy'.")
.stringConf
.createWithDefault("greedy")

Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | true |
| spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true |
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 |
| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. Available memory pool types are 'greedy', 'fair_spill', 'fair_spill_task_shared', 'greedy_global' and 'fair_spill_global', By default, this config is 'greedy'. | greedy |
| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. Available memory pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global' and 'fair_spill_global', By default, this config is 'greedy'. | greedy |
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
| spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd |
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | true |
Expand Down
23 changes: 18 additions & 5 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ enum MemoryPoolType {
Unified,
Greedy,
FairSpill,
GreedyTaskShared,
FairSpillTaskShared,
GreedyGlobal,
FairSpillGlobal,
Expand Down Expand Up @@ -317,6 +318,9 @@ fn parse_memory_pool_config(conf: &HashMap<String, String>) -> CometResult<Memor
"fair_spill_task_shared" => {
MemoryPoolConfig::new(MemoryPoolType::FairSpillTaskShared, pool_size_per_task?)
}
"greedy_task_shared" => {
MemoryPoolConfig::new(MemoryPoolType::GreedyTaskShared, pool_size_per_task?)
}
"fair_spill_global" => {
MemoryPoolConfig::new(MemoryPoolType::FairSpillGlobal, pool_size)
}
Expand Down Expand Up @@ -378,14 +382,23 @@ fn create_memory_pool(
});
Some(Arc::clone(memory_pool))
}
MemoryPoolType::FairSpillTaskShared => {
MemoryPoolType::GreedyTaskShared | MemoryPoolType::FairSpillTaskShared => {
let mut memory_pool_map = TASK_SHARED_MEMORY_POOLS.lock().unwrap();
let per_task_memory_pool =
memory_pool_map.entry(task_attempt_id).or_insert_with(|| {
PerTaskMemoryPool::new(Arc::new(TrackConsumersPool::new(
FairSpillPool::new(memory_pool_config.pool_size),
NonZeroUsize::new(10).unwrap(),
)))
let pool: Arc<dyn MemoryPool> =
if memory_pool_config.pool_type == MemoryPoolType::GreedyTaskShared {
Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(memory_pool_config.pool_size),
NonZeroUsize::new(10).unwrap(),
))
} else {
Arc::new(TrackConsumersPool::new(
FairSpillPool::new(memory_pool_config.pool_size),
NonZeroUsize::new(10).unwrap(),
))
};
PerTaskMemoryPool::new(pool)
});
per_task_memory_pool.num_plans += 1;
Some(Arc::clone(&per_task_memory_pool.memory_pool))
Expand Down

0 comments on commit 7f68c4f

Please sign in to comment.