Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: profiling #989

Merged
merged 2 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions components/profile/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ features = ["stats", "profiling", "unprefixed_malloc_on_supported_platforms"]
jemalloc-ctl = "0.3.2"
jemallocator = "0.3.2"
log = { workspace = true }
pprof = { version = "0.11.1", features = ["flamegraph"] }
65 changes: 47 additions & 18 deletions components/profile/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Memory profiler for running application based on jemalloc features.
//! Profiler for running application.

use std::{
fmt::Formatter,
Expand All @@ -9,6 +9,7 @@ use std::{
io::Read,
sync::{Mutex, MutexGuard},
thread, time,
time::Duration,
};

use jemalloc_ctl::{Access, AsName};
Expand Down Expand Up @@ -36,8 +37,9 @@ static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;

const PROF_ACTIVE: &[u8] = b"prof.active\0";
const PROF_DUMP: &[u8] = b"prof.dump\0";
const PROFILE_OUTPUT_FILE_OS_PATH: &[u8] = b"/tmp/profile.out\0";
const PROFILE_OUTPUT_FILE_PATH: &str = "/tmp/profile.out";
const PROFILE_HEAP_OUTPUT_FILE_OS_PATH: &[u8] = b"/tmp/profile_heap.out\0";
const PROFILE_HEAP_OUTPUT_FILE_PATH: &str = "/tmp/profile_heap.out";
const PROFILE_CPU_OUTPUT_FILE_PATH: &str = "/tmp/flamegraph_cpu.svg";

fn set_prof_active(active: bool) -> Result<()> {
let name = PROF_ACTIVE.name();
Expand All @@ -46,15 +48,15 @@ fn set_prof_active(active: bool) -> Result<()> {

fn dump_profile() -> Result<()> {
let name = PROF_DUMP.name();
name.write(PROFILE_OUTPUT_FILE_OS_PATH)
name.write(PROFILE_HEAP_OUTPUT_FILE_OS_PATH)
.map_err(Error::Jemalloc)
}

struct ProfLockGuard<'a>(MutexGuard<'a, ()>);

/// ProfLockGuard hold the profile lock and take responsibilities for
/// (de)activating mem profiling. NOTE: Keeping mem profiling on may cause some
/// extra runtime cost so we choose to activating it dynamically.
/// (de)activating heap profiling. NOTE: Keeping heap profiling on may cause
/// some extra runtime cost so we choose to activating it dynamically.
impl<'a> ProfLockGuard<'a> {
pub fn new(guard: MutexGuard<'a, ()>) -> Result<Self> {
set_prof_active(true)?;
Expand All @@ -71,7 +73,7 @@ impl<'a> Drop for ProfLockGuard<'a> {
}

pub struct Profiler {
mem_prof_lock: Mutex<()>,
heap_prof_lock: Mutex<()>,
}

impl Default for Profiler {
Expand All @@ -83,19 +85,22 @@ impl Default for Profiler {
impl Profiler {
pub fn new() -> Self {
Self {
mem_prof_lock: Mutex::new(()),
heap_prof_lock: Mutex::new(()),
}
}

// dump_mem_prof collects mem profiling data in `seconds`.
// dump_heap_prof collects heap profiling data in `seconds`.
// TODO(xikai): limit the profiling duration
pub fn dump_mem_prof(&self, seconds: u64) -> Result<Vec<u8>> {
pub fn dump_heap_prof(&self, seconds: u64) -> Result<Vec<u8>> {
// concurrent profiling is disabled.
let lock_guard = self.mem_prof_lock.try_lock().map_err(|e| Error::Internal {
msg: format!("failed to acquire mem_prof_lock, err:{e}"),
})?;
let lock_guard = self
.heap_prof_lock
.try_lock()
.map_err(|e| Error::Internal {
msg: format!("failed to acquire heap_prof_lock, err:{e}"),
})?;
info!(
"Profiler::dump_mem_prof start memory profiling {} seconds",
"Profiler::dump_heap_prof start heap profiling {} seconds",
seconds
);

Expand All @@ -109,7 +114,7 @@ impl Profiler {
.create(true)
.write(true)
.truncate(true)
.open(PROFILE_OUTPUT_FILE_PATH)
.open(PROFILE_HEAP_OUTPUT_FILE_PATH)
.map_err(|e| {
error!("Failed to open prof data file, err:{}", e);
Error::IO(e)
Expand All @@ -119,13 +124,13 @@ impl Profiler {
dump_profile().map_err(|e| {
error!(
"Failed to dump prof to {}, err:{}",
PROFILE_OUTPUT_FILE_PATH, e
PROFILE_HEAP_OUTPUT_FILE_PATH, e
);
e
})?;

// read the profile results into buffer
let mut f = File::open(PROFILE_OUTPUT_FILE_PATH).map_err(|e| {
let mut f = File::open(PROFILE_HEAP_OUTPUT_FILE_PATH).map_err(|e| {
error!("Failed to open prof data file, err:{}", e);
Error::IO(e)
})?;
Expand All @@ -138,4 +143,28 @@ impl Profiler {

Ok(buffer)
}

pub fn dump_cpu_prof(&self, seconds: u64) -> Result<()> {
let guard = pprof::ProfilerGuardBuilder::default()
.frequency(100)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()
.map_err(|e| Error::Internal {
msg: format!("Profiler guard, err:{e}"),
})?;

thread::sleep(Duration::from_secs(seconds));

let report = guard.report().build().map_err(|e| Error::Internal {
msg: format!("Report build, err:{e}"),
})?;
let file = File::create(PROFILE_CPU_OUTPUT_FILE_PATH).map_err(|e| {
error!("Failed to create cpu profile svg file, err:{}", e);
Error::IO(e)
})?;
report.flamegraph(file).map_err(|e| Error::Internal {
msg: format!("Flamegraph output, err:{e}"),
})?;
Ok(())
}
}
1 change: 0 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ meta_client = { workspace = true }
opensrv-mysql = "0.1.0"
partition_table_engine = { workspace = true }
paste = { workspace = true }
pprof = { version = "0.11.1", features = ["flamegraph"] }
profile = { workspace = true }
prom-remote-api = { workspace = true, features = ["warp"] }
prometheus = { workspace = true }
Expand Down
72 changes: 34 additions & 38 deletions server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
//! Http service

use std::{
collections::HashMap, convert::Infallible, error::Error as StdError, fs::File, net::IpAddr,
sync::Arc, thread, time::Duration,
collections::HashMap, convert::Infallible, error::Error as StdError, net::IpAddr, sync::Arc,
time::Duration,
};

use analytic_engine::setup::OpenedWals;
Expand Down Expand Up @@ -80,6 +80,12 @@ pub enum Error {
backtrace: Backtrace,
},

#[snafu(display("Fail to do cpu profiling, err:{}.\nBacktrace:\n{}", source, backtrace))]
ProfileCPU {
source: profile::Error,
backtrace: Backtrace,
},

#[snafu(display("Fail to join async task, err:{}.", source))]
JoinAsyncTask { source: common_util::runtime::Error },

Expand Down Expand Up @@ -184,8 +190,8 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
// debug APIs
.or(self.flush_memtable())
.or(self.update_log_level())
.or(self.heap_profile())
.or(self.cpu_profile())
.or(self.profile_cpu())
.or(self.profile_heap())
.or(self.server_config())
.or(self.stats())
.with(warp::log("http_requests"))
Expand Down Expand Up @@ -393,62 +399,51 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
warp::path!("metrics").and(warp::get()).map(metrics::dump)
}

// GET /debug/heap_profile/{seconds}
fn heap_profile(
// GET /debug/profile/cpu/{seconds}
fn profile_cpu(
&self,
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
warp::path!("debug" / "heap_profile" / ..)
warp::path!("debug" / "profile" / "cpu" / ..)
.and(warp::path::param::<u64>())
.and(warp::get())
.and(self.with_profiler())
.and(self.with_runtime())
.and_then(
|duration_sec: u64, profiler: Arc<Profiler>, runtime: Arc<Runtime>| async move {
let handle = runtime.spawn_blocking(move || {
profiler.dump_mem_prof(duration_sec).context(ProfileHeap)
let handle = runtime.spawn_blocking(move || -> Result<()> {
profiler.dump_cpu_prof(duration_sec).context(ProfileCPU)
});
let result = handle.await.context(JoinAsyncTask);
match result {
Ok(Ok(prof_data)) => Ok(prof_data.into_response()),
Ok(Err(e)) => Err(reject::custom(e)),
Ok(_) => Ok("ok"),
Err(e) => Err(reject::custom(e)),
}
},
)
}

// GET /debug/cpu_profile/{seconds}
fn cpu_profile(
// GET /debug/profile/heap/{seconds}
fn profile_heap(
&self,
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
warp::path!("debug" / "cpu_profile" / ..)
warp::path!("debug" / "profile" / "heap" / ..)
.and(warp::path::param::<u64>())
.and(warp::get())
.and(self.with_profiler())
.and(self.with_runtime())
.and_then(|duration_sec: u64, runtime: Arc<Runtime>| async move {
let handle = runtime.spawn_blocking(move || -> Result<()> {
let guard = pprof::ProfilerGuardBuilder::default()
.frequency(100)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()
.box_err()
.context(Internal)?;

thread::sleep(Duration::from_secs(duration_sec));

let report = guard.report().build().box_err().context(Internal)?;
let file = File::create("/tmp/flamegraph.svg")
.box_err()
.context(Internal)?;
report.flamegraph(file).box_err().context(Internal)?;
Ok(())
});
let result = handle.await.context(JoinAsyncTask);
match result {
Ok(_) => Ok("ok"),
Err(e) => Err(reject::custom(e)),
}
})
.and_then(
|duration_sec: u64, profiler: Arc<Profiler>, runtime: Arc<Runtime>| async move {
let handle = runtime.spawn_blocking(move || {
profiler.dump_heap_prof(duration_sec).context(ProfileHeap)
});
let result = handle.await.context(JoinAsyncTask);
match result {
Ok(Ok(prof_data)) => Ok(prof_data.into_response()),
Ok(Err(e)) => Err(reject::custom(e)),
Err(e) => Err(reject::custom(e)),
}
},
)
}

// GET /debug/config
Expand Down Expand Up @@ -695,6 +690,7 @@ fn error_to_status_code(err: &Error) -> StatusCode {
| Error::MissingProxy { .. }
| Error::ParseIpAddr { .. }
| Error::ProfileHeap { .. }
| Error::ProfileCPU { .. }
| Error::Internal { .. }
| Error::JoinAsyncTask { .. }
| Error::AlreadyStarted { .. }
Expand Down