Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,12 @@ object CometConf extends ShimCometConf {
defaultValue = true,
notes = Some("stddev is slower than Spark's implementation"))

val COMET_MEMORY_PROFILING: ConfigEntry[Boolean] = conf("spark.comet.memory.profiling")
.doc("Enable logging of JVM and native memory statistics.")
.internal()
.booleanConf
.createWithDefault(false)

val COMET_MEMORY_OVERHEAD: OptionalConfigEntry[Long] = conf("spark.comet.memoryOverhead")
.doc(
"The amount of additional memory to be allocated per executor process for Comet, in MiB, " +
Expand Down
38 changes: 38 additions & 0 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ parquet = { workspace = true, default-features = false, features = ["experimenta
futures = { workspace = true }
mimalloc = { version = "*", default-features = false, optional = true }
tikv-jemallocator = { version = "0.6.0", optional = true, features = ["disable_initial_exec_tls"] }
tikv-jemalloc-ctl = { version = "0.6.0", optional = true, features = ["disable_initial_exec_tls", "stats"] }
tokio = { version = "1", features = ["rt-multi-thread"] }
async-trait = { workspace = true }
log = "0.4"
Expand Down Expand Up @@ -70,6 +71,9 @@ url = { workspace = true }
parking_lot = "0.12.3"
datafusion-comet-objectstore-hdfs = { path = "../hdfs", optional = true, default-features = false, features = ["hdfs"] }

[target.'cfg(target_os = "linux")'.dependencies]
procfs = "0.17.0"

[dev-dependencies]
pprof = { version = "0.14.0", features = ["flamegraph"] }
criterion = { version = "0.5.1", features = ["async_tokio"] }
Expand All @@ -82,7 +86,7 @@ datafusion-functions-nested = { version = "47.0.0" }
[features]
default = []
hdfs = ["datafusion-comet-objectstore-hdfs"]
jemalloc = ["tikv-jemallocator"]
jemalloc = ["tikv-jemallocator", "tikv-jemalloc-ctl"]

# exclude optional packages from cargo machete verifications
[package.metadata.cargo-machete]
Expand Down
43 changes: 43 additions & 0 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ use crate::execution::shuffle::{read_ipc_compressed, CompressionCodec};
use crate::execution::spark_plan::SparkPlan;
use log::info;
use once_cell::sync::Lazy;
#[cfg(target_os = "linux")]
use procfs::process::Process;
#[cfg(feature = "jemalloc")]
use tikv_jemalloc_ctl::{epoch, stats};

static TOKIO_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
let mut builder = tokio::runtime::Builder::new_multi_thread();
Expand Down Expand Up @@ -126,6 +130,8 @@ struct ExecutionContext {
pub explain_native: bool,
/// Memory pool config
pub memory_pool_config: MemoryPoolConfig,
/// Whether to log memory usage on each call to execute_plan
pub memory_profiling_enabled: bool,
}

/// Accept serialized query plan and return the address of the native query plan.
Expand All @@ -151,6 +157,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
task_attempt_id: jlong,
debug_native: jboolean,
explain_native: jboolean,
memory_profiling_enabled: jboolean,
) -> jlong {
try_unwrap_or_throw(&e, |mut env| {
// Init JVM classes
Expand Down Expand Up @@ -231,6 +238,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
debug_native: debug_native == 1,
explain_native: explain_native == 1,
memory_pool_config,
memory_profiling_enabled: memory_profiling_enabled != JNI_FALSE,
});

Ok(Box::into_raw(exec_context) as i64)
Expand Down Expand Up @@ -359,6 +367,41 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
// Retrieve the query
let exec_context = get_execution_context(exec_context);

// memory profiling is only available on linux
if exec_context.memory_profiling_enabled {
#[cfg(target_os = "linux")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the if statement be inside cfg?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did that originally, but the compiler complained about the unused variable memory_profiling_enabled on non-Linux platforms.

{
let pid = std::process::id();
let process = Process::new(pid as i32).unwrap();
let statm = process.statm().unwrap();
let page_size = procfs::page_size();
println!(
"NATIVE_MEMORY: {{ resident: {:.0} }}",
(statm.resident * page_size) as f64 / (1024.0 * 1024.0)
);
Comment on lines +378 to +381
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
println!(
"NATIVE_MEMORY: {{ resident: {:.0} }}",
(statm.resident * page_size) as f64 / (1024.0 * 1024.0)
);


#[cfg(feature = "jemalloc")]
{
// Obtain a MIB for the `epoch`, `stats.allocated`, and
// `atats.resident` keys:
let e = epoch::mib().unwrap();
let allocated = stats::allocated::mib().unwrap();
let resident = stats::resident::mib().unwrap();

// Many statistics are cached and only updated
// when the epoch is advanced:
e.advance().unwrap();

// Read statistics using MIB key:
let allocated = allocated.read().unwrap() as f64 / (1024.0 * 1024.0);
let resident = resident.read().unwrap() as f64 / (1024.0 * 1024.0);
println!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering should we feed the metrics to log or to plan metrics

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These metrics are for the overall process, so not specific to any particular plan. This feature just allows us to watch the executor logs and see when memory starts to approach the limit. I hope this will help us debug the cause of OOMs, although it will still primarily be a manual process.

"NATIVE_MEMORY_JEMALLOC: {{ allocated: {allocated:.0}, resident: {resident:.0} }}"
);
}
}
}

let exec_context_id = exec_context.id;

// Initialize the execution stream.
Expand Down
22 changes: 21 additions & 1 deletion spark/src/main/scala/org/apache/comet/CometExecIterator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.comet

import java.lang.management.ManagementFactory

import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.ByteUnit
Expand Down Expand Up @@ -57,6 +59,7 @@ class CometExecIterator(
extends Iterator[ColumnarBatch]
with Logging {

private val memoryProfilingEnabled = CometConf.COMET_MEMORY_PROFILING.get()
private val nativeLib = new Native()
private val nativeUtil = new NativeUtil()
private val cometBatchIterators = inputs.map { iterator =>
Expand Down Expand Up @@ -92,7 +95,8 @@ class CometExecIterator(
memoryLimitPerTask = getMemoryLimitPerTask(conf),
taskAttemptId = TaskContext.get().taskAttemptId,
debug = COMET_DEBUG_ENABLED.get(),
explain = COMET_EXPLAIN_NATIVE_ENABLED.get())
explain = COMET_EXPLAIN_NATIVE_ENABLED.get(),
memoryProfilingEnabled)
}

private var nextBatch: Option[ColumnarBatch] = None
Expand Down Expand Up @@ -130,6 +134,22 @@ class CometExecIterator(
def getNextBatch(): Option[ColumnarBatch] = {
assert(partitionIndex >= 0 && partitionIndex < numParts)

if (memoryProfilingEnabled) {
val memoryMXBean = ManagementFactory.getMemoryMXBean
val heap = memoryMXBean.getHeapMemoryUsage
val nonHeap = memoryMXBean.getNonHeapMemoryUsage

def mb(n: Long) = n / 1024 / 1024

// scalastyle:off println
println(
"JVM_MEMORY: { " +
s"heapUsed: ${mb(heap.getUsed)}, heapCommitted: ${mb(heap.getCommitted)}, " +
s"nonHeapUsed: ${mb(nonHeap.getUsed)}, nonHeapCommitted: ${mb(nonHeap.getCommitted)} " +
"}")
Comment on lines +144 to +149
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps logInfo ?
If println is preferred, we may have to add back // scalastyle:on println

// scalastyle:on println
}

nativeUtil.getNextBatch(
numOutputCols,
(arrayAddrs, schemaAddrs) => {
Expand Down
3 changes: 2 additions & 1 deletion spark/src/main/scala/org/apache/comet/Native.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ class Native extends NativeBase {
memoryLimitPerTask: Long,
taskAttemptId: Long,
debug: Boolean,
explain: Boolean): Long
explain: Boolean,
memoryProfilingEnabled: Boolean): Long
// scalastyle:on

/**
Expand Down
Loading