Skip to content

Commit 54db52b

Browse files
committed
comments
1 parent 0cfb2f7 commit 54db52b

File tree

5 files changed

+45
-28
lines changed

5 files changed

+45
-28
lines changed

core/src/execution/memory_pool.rs

Lines changed: 39 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,13 @@ use datafusion::{
3030
execution::memory_pool::{MemoryPool, MemoryReservation},
3131
};
3232

33-
use crate::jvm_bridge::{jni_call, JVMClasses};
33+
use crate::{
34+
errors::CometResult,
35+
jvm_bridge::{jni_call, JVMClasses},
36+
};
3437

38+
/// A DataFusion `MemoryPool` implementation for Comet. Internally this is
39+
/// implemented via delegating calls to [`crate::jvm_bridge::CometTaskMemoryManager`].
3540
pub struct CometMemoryPool {
3641
task_memory_manager_handle: Arc<GlobalRef>,
3742
used: AtomicUsize,
@@ -52,44 +57,56 @@ impl CometMemoryPool {
5257
used: AtomicUsize::new(0),
5358
}
5459
}
60+
61+
fn acquire(&self, additional: usize) -> CometResult<i64> {
62+
let mut env = JVMClasses::get_env();
63+
let handle = self.task_memory_manager_handle.as_obj();
64+
unsafe {
65+
jni_call!(&mut env,
66+
comet_task_memory_manager(handle).acquire_memory(additional as i64) -> i64)
67+
}
68+
}
69+
70+
fn release(&self, size: usize) -> CometResult<()> {
71+
let mut env = JVMClasses::get_env();
72+
let handle = self.task_memory_manager_handle.as_obj();
73+
unsafe {
74+
jni_call!(&mut env, comet_task_memory_manager(handle).release_memory(size as i64) -> ())
75+
}
76+
}
5577
}
5678

5779
unsafe impl Send for CometMemoryPool {}
5880
unsafe impl Sync for CometMemoryPool {}
5981

6082
impl MemoryPool for CometMemoryPool {
6183
fn grow(&self, _: &MemoryReservation, additional: usize) {
84+
self.acquire(additional)
85+
.unwrap_or_else(|_| panic!("Failed to acquire {} bytes", additional));
6286
self.used.fetch_add(additional, Relaxed);
6387
}
6488

6589
fn shrink(&self, _: &MemoryReservation, size: usize) {
66-
let mut env = JVMClasses::get_env();
67-
let handle = self.task_memory_manager_handle.as_obj();
68-
unsafe {
69-
jni_call!(&mut env, comet_task_memory_manager(handle).release_memory(size as i64) -> ())
70-
.unwrap();
71-
}
90+
self.release(size)
91+
.unwrap_or_else(|_| panic!("Failed to release {} bytes", size));
7292
self.used.fetch_sub(size, Relaxed);
7393
}
7494

7595
fn try_grow(&self, _: &MemoryReservation, additional: usize) -> Result<(), DataFusionError> {
7696
if additional > 0 {
77-
let mut env = JVMClasses::get_env();
78-
let handle = self.task_memory_manager_handle.as_obj();
79-
unsafe {
80-
let acquired = jni_call!(&mut env,
81-
comet_task_memory_manager(handle).acquire_memory(additional as i64) -> i64)?;
97+
let acquired = self.acquire(additional)?;
98+
// If the number of bytes we acquired is less than the requested, return an error,
99+
// and hopefully will trigger spilling from the caller side.
100+
if acquired < additional as i64 {
101+
// Release the acquired bytes before throwing error
102+
self.release(acquired as usize)?;
82103

83-
// If the number of bytes we acquired is less than the requested, return an error,
84-
// and hopefully will trigger spilling from the caller side.
85-
if acquired < additional as i64 {
86-
return Err(DataFusionError::Execution(format!(
87-
"Failed to acquire {} bytes, only got {}. Reserved: {}",
88-
additional,
89-
acquired,
90-
self.reserved(),
91-
)));
92-
}
104+
return Err(DataFusionError::Execution(format!(
105+
"Failed to acquire {} bytes, only got {}. Reserved: {}",
106+
additional,
107+
acquired,
108+
self.reserved(),
109+
)));
93110
}
94111
self.used.fetch_add(additional, Relaxed);
95112
}

core/src/jvm_bridge/comet_task_memory_manager.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ use jni::{
2424

2525
use crate::jvm_bridge::get_global_jclass;
2626

27-
/// A DataFusion `MemoryPool` implementation for Comet, which delegate to the JVM
28-
/// side `CometTaskMemoryManager`.
27+
/// A wrapper which delegate acquire/release memory calls to the
28+
/// JVM side `CometTaskMemoryManager`.
2929
#[derive(Debug)]
3030
pub struct CometTaskMemoryManager<'a> {
3131
pub class: JClass<'a>,

spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@
2020
package org.apache.spark.sql
2121

2222
import org.apache.spark.SparkConf
23-
import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, MEMORY_OFFHEAP_SIZE, SHUFFLE_MANAGER}
24-
import org.apache.spark.sql.internal.StaticSQLConf.SPARK_SESSION_EXTENSIONS
23+
import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, MEMORY_OFFHEAP_SIZE}
2524

2625
import org.apache.comet.CometConf
2726

spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,10 @@ import java.nio.file.{Files, Paths}
2525
import scala.collection.JavaConverters._
2626

2727
import org.apache.spark.{SparkConf, SparkContext}
28-
import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, MEMORY_OFFHEAP_SIZE, SHUFFLE_MANAGER}
28+
import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, MEMORY_OFFHEAP_SIZE}
2929
import org.apache.spark.sql.catalyst.TableIdentifier
3030
import org.apache.spark.sql.catalyst.util.{fileToString, resourceToString, stringToFile}
3131
import org.apache.spark.sql.internal.SQLConf
32-
import org.apache.spark.sql.internal.StaticSQLConf.SPARK_SESSION_EXTENSIONS
3332
import org.apache.spark.sql.test.{SharedSparkSession, TestSparkSession}
3433

3534
import org.apache.comet.CometConf

spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ abstract class CometTestBase
6868
conf.set(SHUFFLE_MANAGER, shuffleManager)
6969
conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")
7070
conf.set(MEMORY_OFFHEAP_SIZE.key, "2g")
71+
conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "1g")
72+
conf.set(SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key, "1g")
7173
conf.set(CometConf.COMET_ENABLED.key, "true")
7274
conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
7375
conf.set(CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key, "true")

0 commit comments

Comments
 (0)