-
Couldn't load subscription status.
- Fork 246
feat: Introduce CometTaskMemoryManager and native side memory pool
#83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
76002b9 to
26f4802
Compare
|
cc @viirya |
|
Thanks @sunchao. I will review this in next days (or week). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, this is in great shape. Left some minor comments.
core/src/execution/jni_api.rs
Outdated
| iterators: jobjectArray, | ||
| serialized_query: jbyteArray, | ||
| metrics_node: JObject, | ||
| task_memory_manager_obj: JObject, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about rename this to comet_task_memory_manager_obj?
When I first read the code, I thought it was the Spark's TaskMemoryManager object. However it's comet's CometTaskMemoryManager. It would be clear to call it comet_task_memory_manager_obj
Other occurrence could be renamed too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
core/src/execution/jni_api.rs
Outdated
| .get("use_unified_memory_manager") | ||
| .ok_or(CometError::Internal( | ||
| "Config 'use_unified_memory_manager' is not specified from Comet JVM side".to_string(), | ||
| ))? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe a more permissive way is to treat unsetting use_unified_memory_manager as false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea sure, although this is an internal error from developer side if not set.
| * share the same API as Spark and cannot trigger spill when acquire memory. Therefore, when | ||
| * acquiring memory from native or JVM, spilling can only be triggered from JVM operators. | ||
| */ | ||
| private class NativeMemoryConsumer extends MemoryConsumer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The consumer's toString might be used when the debugging log is turned on.
It would be great that we can override this class to provide toString method and also add a unique flag/id to identify the corresponding consumer for the native plan/execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I can add a toString for now and we can figure out how to use it for debugging purpose later.
| cometBatchIterators, | ||
| protobufQueryPlan, | ||
| nativeMetrics, | ||
| new CometTaskMemoryManager) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm referring this. I think we can pass id to CometTaskMemoryManager and use that for identity mark.
| conf.set("spark.shuffle.manager", shuffleManager) | ||
| conf.set(SQLConf.ANSI_ENABLED.key, "false") | ||
| conf.set(SHUFFLE_MANAGER, shuffleManager) | ||
| conf.set(MEMORY_OFFHEAP_ENABLED.key, "true") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we still going to test the default memory pool implementation in DataFusion?
Seems like all the test code path are the unified memory manager now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the long term I'm thinking to only use the memory pool defined in Comet. This currently requires users to turn on off-heap mode in Spark and set the off-heap memory accordingly, so configuration changes are necessary when they want to use Comet. Ideally we should be able to use DriverPlugin to override the memory settings so Comet may just work out of box (need to change Spark in a few places).
The default memory manager path is kept only for now until we are able to do the override through DriverPlugin. Internally we still run all the Spark SQL tests using the default memory manager, and can probably do the same here too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we should be able to use DriverPlugin to override the memory settings so Comet may just work out of box (need to change Spark in a few places).
Oh. It seems that DriverPlugin is initialized before task scheduler. Which places in Spark do we need for CometDriverPlugin to override memory settings? The memory overhead is already override by comet.
and can probably do the same here too.
Is this still in this PR's scope?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh. It seems that DriverPlugin is initialized before task scheduler. Which places in Spark do we need for CometDriverPlugin to override memory settings? The memory overhead is already override by comet.
I already made one change in Spark: apache/spark#45052 for this. We'll need a few more changes so we can completely overwrite executor memory setting through DriverPlugin.
Is this still in this PR's scope?
Not really. Will do that in #8
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I already made one change in Spark: apache/spark#45052 for this. We'll need a few more changes so we can completely overwrite executor memory setting through DriverPlugin.
Great work. Looking forward that we can completely overwrite memory settings through DriverPlugin.
d58ab8d to
0cfb2f7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM from my side, except the conflict should be resolved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Good catch on check_exception in jni_call.
| fn grow(&self, _: &MemoryReservation, additional: usize) { | ||
| self.used.fetch_add(additional, Relaxed); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to update (acquire) the required memory from JVM memory manager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think grow is not really used by DataFusion except in tests, that's why I didn't do it. But you are right, it's better to add it too for future proof.
core/src/execution/memory_pool.rs
Outdated
| if acquired < additional as i64 { | ||
| return Err(DataFusionError::Execution(format!( | ||
| "Failed to acquire {} bytes, only got {}. Reserved: {}", | ||
| additional, | ||
| acquired, | ||
| self.reserved(), | ||
| ))); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we fail to get the required number and return error, I think we should notify the JVM memory manager to release the allocated acquired bytes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Will add.
| use crate::jvm_bridge::get_global_jclass; | ||
|
|
||
| /// A DataFusion `MemoryPool` implementation for Comet, which delegate to the JVM | ||
| /// side `CometTaskMemoryManager`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment looks not correct as CometMemoryPool is the one implementing DataFusion MemoryPool. Maybe this should be moved to CometMemoryPool?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea let me move it.
| // Called by Comet native through JNI. | ||
| // Returns the actual amount of memory (in bytes) granted. | ||
| public long acquireMemory(long size) { | ||
| return internal.acquireExecutionMemory(size, nativeMemoryConsumer); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we claim Send and Sync for CometMemoryPool, should we make this synchronized method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think TaskMemoryManager is already synchronized on the acquireExecutionMemory and releaseExecutionMemory, so it doesn't seem necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh okay, then it is fine.
| "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") | ||
| conf.set(CometConf.COMET_ENABLED.key, "true") | ||
| conf.set(CometConf.COMET_EXEC_ENABLED.key, "true") | ||
| conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "2g") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is now replaced by
conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")
conf.set(MEMORY_OFFHEAP_SIZE.key, "2g")
below
| SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1g", | ||
| SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "1g", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For broadcast join threshold configs, should we keep them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops I accidentally removed these when rebasing. Let me add them back.
54db52b to
dd67291
Compare
|
@sunchao Do you see we use less memory (e.g., no more extra memory overhead needed) than before this patch when running TPCH/TPCDS queries on EKS? |
|
I tried this with columnar shuffle sometime back and in TPC-DS there were other issues that caused OOM. I plan to try benchmarking this again soon. Let me merge this PR first and address remaining issues in follow-ups. |
|
Thanks, merged |
Which issue does this PR close?
Closes #34.
Rationale for this change
Currently Comet uses the default memory pool implementation in DataFusion, which is not aware of the memory manager on the JVM Spark side. Therefore, in the case when a Spark job has both Spark and Comet operators, we'd need to initialize two memory pools separately for each of them, and make sure there is enough budget in them. In addition, since we cannot trigger spilling from native to JVM, or vise versa, the budget need to be large enough which means Comet typically will need to use more memory than Spark does.
Since Spark already has a
UnifiedMemoryManager, this PR proposes to create a new memory pool implementation which delegate calls to the JVM sideUnifiedMemoryManager, which serves as the source of truth and serves memory acquisition and release from both JVM and native side.What changes are included in this PR?
This PR introduces a
CometMemoryPoolclass on the native side, overriding the default memory pool used by DF. This memory pool dispatches calls to Spark'sTaskMemoryManagerfor acquiring and releasing memory.The newly added memory pool will only be activated when
spark.memory.offHeap.enabledis set totrue. Otherwise, the behavior remains the same as before (andspark.executor.memoryOverheadneed to be large enough for native execution). InTestBosonBase,spark.memory.offHeap.enabledis enabled so all the tests within Comet are tested with the new feature.How are these changes tested?
All the existing tests are updated to use the new memory manager implementation.