-
Notifications
You must be signed in to change notification settings - Fork 253
perf: Add memory profiling #1702
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
8a44d31
a339522
3b88a27
2c17a71
00e3e47
ad395fd
2aa4c5d
97e8cc7
077f5af
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -68,6 +68,10 @@ use crate::execution::shuffle::{read_ipc_compressed, CompressionCodec}; | |||||||||
| use crate::execution::spark_plan::SparkPlan; | ||||||||||
| use log::info; | ||||||||||
| use once_cell::sync::Lazy; | ||||||||||
| #[cfg(target_os = "linux")] | ||||||||||
| use procfs::process::Process; | ||||||||||
| #[cfg(feature = "jemalloc")] | ||||||||||
| use tikv_jemalloc_ctl::{epoch, stats}; | ||||||||||
|
|
||||||||||
| static TOKIO_RUNTIME: Lazy<Runtime> = Lazy::new(|| { | ||||||||||
| let mut builder = tokio::runtime::Builder::new_multi_thread(); | ||||||||||
|
|
@@ -126,6 +130,8 @@ struct ExecutionContext { | |||||||||
| pub explain_native: bool, | ||||||||||
| /// Memory pool config | ||||||||||
| pub memory_pool_config: MemoryPoolConfig, | ||||||||||
| /// Whether to log memory usage on each call to execute_plan | ||||||||||
| pub memory_profiling_enabled: bool, | ||||||||||
| } | ||||||||||
|
|
||||||||||
| /// Accept serialized query plan and return the address of the native query plan. | ||||||||||
|
|
@@ -151,6 +157,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( | |||||||||
| task_attempt_id: jlong, | ||||||||||
| debug_native: jboolean, | ||||||||||
| explain_native: jboolean, | ||||||||||
| memory_profiling_enabled: jboolean, | ||||||||||
| ) -> jlong { | ||||||||||
| try_unwrap_or_throw(&e, |mut env| { | ||||||||||
| // Init JVM classes | ||||||||||
|
|
@@ -231,6 +238,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( | |||||||||
| debug_native: debug_native == 1, | ||||||||||
| explain_native: explain_native == 1, | ||||||||||
| memory_pool_config, | ||||||||||
| memory_profiling_enabled: memory_profiling_enabled != JNI_FALSE, | ||||||||||
| }); | ||||||||||
|
|
||||||||||
| Ok(Box::into_raw(exec_context) as i64) | ||||||||||
|
|
@@ -359,6 +367,41 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( | |||||||||
| // Retrieve the query | ||||||||||
| let exec_context = get_execution_context(exec_context); | ||||||||||
|
|
||||||||||
| // memory profiling is only available on linux | ||||||||||
| if exec_context.memory_profiling_enabled { | ||||||||||
| #[cfg(target_os = "linux")] | ||||||||||
| { | ||||||||||
| let pid = std::process::id(); | ||||||||||
| let process = Process::new(pid as i32).unwrap(); | ||||||||||
| let statm = process.statm().unwrap(); | ||||||||||
| let page_size = procfs::page_size(); | ||||||||||
| println!( | ||||||||||
| "NATIVE_MEMORY: {{ resident: {:.0} }}", | ||||||||||
| (statm.resident * page_size) as f64 / (1024.0 * 1024.0) | ||||||||||
| ); | ||||||||||
|
Comment on lines
+378
to
+381
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
|
|
||||||||||
| #[cfg(feature = "jemalloc")] | ||||||||||
| { | ||||||||||
| // Obtain a MIB for the `epoch`, `stats.allocated`, and | ||||||||||
| // `atats.resident` keys: | ||||||||||
| let e = epoch::mib().unwrap(); | ||||||||||
| let allocated = stats::allocated::mib().unwrap(); | ||||||||||
| let resident = stats::resident::mib().unwrap(); | ||||||||||
|
|
||||||||||
| // Many statistics are cached and only updated | ||||||||||
| // when the epoch is advanced: | ||||||||||
| e.advance().unwrap(); | ||||||||||
|
|
||||||||||
| // Read statistics using MIB key: | ||||||||||
| let allocated = allocated.read().unwrap() as f64 / (1024.0 * 1024.0); | ||||||||||
| let resident = resident.read().unwrap() as f64 / (1024.0 * 1024.0); | ||||||||||
| println!( | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wondering should we feed the metrics to log or to plan metrics
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These metrics are for the overall process, so not specific to any particular plan. This feature just allows us to watch the executor logs and see when memory starts to approach the limit. I hope this will help us debug the cause of OOMs, although it will still primarily be a manual process. |
||||||||||
| "NATIVE_MEMORY_JEMALLOC: {{ allocated: {allocated:.0}, resident: {resident:.0} }}" | ||||||||||
| ); | ||||||||||
| } | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| let exec_context_id = exec_context.id; | ||||||||||
|
|
||||||||||
| // Initialize the execution stream. | ||||||||||
|
|
||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,8 @@ | |
|
|
||
| package org.apache.comet | ||
|
|
||
| import java.lang.management.ManagementFactory | ||
|
|
||
| import org.apache.spark._ | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.network.util.ByteUnit | ||
|
|
@@ -57,6 +59,7 @@ class CometExecIterator( | |
| extends Iterator[ColumnarBatch] | ||
| with Logging { | ||
|
|
||
| private val memoryProfilingEnabled = CometConf.COMET_MEMORY_PROFILING.get() | ||
| private val nativeLib = new Native() | ||
| private val nativeUtil = new NativeUtil() | ||
| private val cometBatchIterators = inputs.map { iterator => | ||
|
|
@@ -92,7 +95,8 @@ class CometExecIterator( | |
| memoryLimitPerTask = getMemoryLimitPerTask(conf), | ||
| taskAttemptId = TaskContext.get().taskAttemptId, | ||
| debug = COMET_DEBUG_ENABLED.get(), | ||
| explain = COMET_EXPLAIN_NATIVE_ENABLED.get()) | ||
| explain = COMET_EXPLAIN_NATIVE_ENABLED.get(), | ||
| memoryProfilingEnabled) | ||
| } | ||
|
|
||
| private var nextBatch: Option[ColumnarBatch] = None | ||
|
|
@@ -130,6 +134,22 @@ class CometExecIterator( | |
| def getNextBatch(): Option[ColumnarBatch] = { | ||
| assert(partitionIndex >= 0 && partitionIndex < numParts) | ||
|
|
||
| if (memoryProfilingEnabled) { | ||
| val memoryMXBean = ManagementFactory.getMemoryMXBean | ||
| val heap = memoryMXBean.getHeapMemoryUsage | ||
| val nonHeap = memoryMXBean.getNonHeapMemoryUsage | ||
|
|
||
| def mb(n: Long) = n / 1024 / 1024 | ||
|
|
||
| // scalastyle:off println | ||
| println( | ||
| "JVM_MEMORY: { " + | ||
| s"heapUsed: ${mb(heap.getUsed)}, heapCommitted: ${mb(heap.getCommitted)}, " + | ||
| s"nonHeapUsed: ${mb(nonHeap.getUsed)}, nonHeapCommitted: ${mb(nonHeap.getCommitted)} " + | ||
| "}") | ||
|
Comment on lines
+144
to
+149
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps |
||
| // scalastyle:on println | ||
| } | ||
|
|
||
| nativeUtil.getNextBatch( | ||
| numOutputCols, | ||
| (arrayAddrs, schemaAddrs) => { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should the if statement be inside cfg?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did that originally, but the compiler complained about the unused variable
memory_profiling_enabledon non-Linux platforms.