Skip to content

Commit

Permalink
refactor: profiling
Browse files Browse the repository at this point in the history
  • Loading branch information
chunshao90 committed Jun 12, 2023
1 parent 1852365 commit 76ab93d
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 52 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions components/profile/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ features = ["stats", "profiling", "unprefixed_malloc_on_supported_platforms"]
jemalloc-ctl = "0.3.2"
jemallocator = "0.3.2"
log = { workspace = true }
pprof = { version = "0.11.1", features = ["flamegraph"] }
40 changes: 33 additions & 7 deletions components/profile/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Memory profiler for running application based on jemalloc features.

Expand All @@ -9,6 +9,7 @@ use std::{
io::Read,
sync::{Mutex, MutexGuard},
thread, time,
time::Duration,
};

use jemalloc_ctl::{Access, AsName};
Expand Down Expand Up @@ -36,8 +37,9 @@ static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;

const PROF_ACTIVE: &[u8] = b"prof.active\0";
const PROF_DUMP: &[u8] = b"prof.dump\0";
const PROFILE_OUTPUT_FILE_OS_PATH: &[u8] = b"/tmp/profile.out\0";
const PROFILE_OUTPUT_FILE_PATH: &str = "/tmp/profile.out";
const PROFILE_MEM_OUTPUT_FILE_OS_PATH: &[u8] = b"/tmp/profile_mem.out\0";
const PROFILE_MEM_OUTPUT_FILE_PATH: &str = "/tmp/profile_mem.out";
const PROFILE_CPU_OUTPUT_FILE_PATH: &str = "/tmp/flamegraph.svg";

fn set_prof_active(active: bool) -> Result<()> {
let name = PROF_ACTIVE.name();
Expand All @@ -46,7 +48,7 @@ fn set_prof_active(active: bool) -> Result<()> {

fn dump_profile() -> Result<()> {
let name = PROF_DUMP.name();
name.write(PROFILE_OUTPUT_FILE_OS_PATH)
name.write(PROFILE_MEM_OUTPUT_FILE_OS_PATH)
.map_err(Error::Jemalloc)
}

Expand Down Expand Up @@ -109,7 +111,7 @@ impl Profiler {
.create(true)
.write(true)
.truncate(true)
.open(PROFILE_OUTPUT_FILE_PATH)
.open(PROFILE_MEM_OUTPUT_FILE_PATH)
.map_err(|e| {
error!("Failed to open prof data file, err:{}", e);
Error::IO(e)
Expand All @@ -119,13 +121,13 @@ impl Profiler {
dump_profile().map_err(|e| {
error!(
"Failed to dump prof to {}, err:{}",
PROFILE_OUTPUT_FILE_PATH, e
PROFILE_MEM_OUTPUT_FILE_PATH, e
);
e
})?;

// read the profile results into buffer
let mut f = File::open(PROFILE_OUTPUT_FILE_PATH).map_err(|e| {
let mut f = File::open(PROFILE_MEM_OUTPUT_FILE_PATH).map_err(|e| {
error!("Failed to open prof data file, err:{}", e);
Error::IO(e)
})?;
Expand All @@ -138,4 +140,28 @@ impl Profiler {

Ok(buffer)
}

pub fn dump_cpu_prof(&self, seconds: u64) -> Result<()> {
let guard = pprof::ProfilerGuardBuilder::default()
.frequency(100)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()
.map_err(|e| Error::Internal {
msg: format!("Profiler guard, err:{e}"),
})?;

thread::sleep(Duration::from_secs(seconds));

let report = guard.report().build().map_err(|e| Error::Internal {
msg: format!("Report build, err:{e}"),
})?;
let file = File::create(PROFILE_CPU_OUTPUT_FILE_PATH).map_err(|e| {
error!("Failed to create cpu profile svg file, err:{}", e);
Error::IO(e)
})?;
report.flamegraph(file).map_err(|e| Error::Internal {
msg: format!("Flamegraph output, err:{e}"),
})?;
Ok(())
}
}
82 changes: 37 additions & 45 deletions server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
//! Http service

use std::{
collections::HashMap, convert::Infallible, error::Error as StdError, fs::File, net::IpAddr,
sync::Arc, thread, time::Duration,
collections::HashMap, convert::Infallible, error::Error as StdError, net::IpAddr, sync::Arc,
time::Duration,
};

use analytic_engine::setup::OpenedWals;
Expand Down Expand Up @@ -70,12 +70,14 @@ pub enum Error {
#[snafu(display("Missing proxy.\nBacktrace:\n{}", backtrace))]
MissingProxy { backtrace: Backtrace },

#[snafu(display(
"Fail to do heap profiling, err:{}.\nBacktrace:\n{}",
source,
backtrace
))]
ProfileHeap {
#[snafu(display("Fail to do mem profiling, err:{}.\nBacktrace:\n{}", source, backtrace))]
ProfileMem {
source: profile::Error,
backtrace: Backtrace,
},

#[snafu(display("Fail to do cpu profiling, err:{}.\nBacktrace:\n{}", source, backtrace))]
ProfileCPU {
source: profile::Error,
backtrace: Backtrace,
},
Expand Down Expand Up @@ -184,8 +186,8 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
// debug APIs
.or(self.flush_memtable())
.or(self.update_log_level())
.or(self.heap_profile())
.or(self.cpu_profile())
.or(self.profile_cpu())
.or(self.profile_mem())
.or(self.server_config())
.or(self.stats())
.with(warp::log("http_requests"))
Expand Down Expand Up @@ -393,62 +395,51 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
warp::path!("metrics").and(warp::get()).map(metrics::dump)
}

// GET /debug/heap_profile/{seconds}
fn heap_profile(
// GET /debug/profile/cpu/{seconds}
fn profile_cpu(
&self,
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
warp::path!("debug" / "heap_profile" / ..)
warp::path!("debug" / "profile" / "cpu" / ..)
.and(warp::path::param::<u64>())
.and(warp::get())
.and(self.with_profiler())
.and(self.with_runtime())
.and_then(
|duration_sec: u64, profiler: Arc<Profiler>, runtime: Arc<Runtime>| async move {
let handle = runtime.spawn_blocking(move || {
profiler.dump_mem_prof(duration_sec).context(ProfileHeap)
let handle = runtime.spawn_blocking(move || -> Result<()> {
profiler.dump_cpu_prof(duration_sec).context(ProfileCPU)
});
let result = handle.await.context(JoinAsyncTask);
match result {
Ok(Ok(prof_data)) => Ok(prof_data.into_response()),
Ok(Err(e)) => Err(reject::custom(e)),
Ok(_) => Ok("ok"),
Err(e) => Err(reject::custom(e)),
}
},
)
}

// GET /debug/cpu_profile/{seconds}
fn cpu_profile(
// GET /debug/profile/mem/{seconds}
fn profile_mem(
&self,
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
warp::path!("debug" / "cpu_profile" / ..)
warp::path!("debug" / "profile" / "mem" / ..)
.and(warp::path::param::<u64>())
.and(warp::get())
.and(self.with_profiler())
.and(self.with_runtime())
.and_then(|duration_sec: u64, runtime: Arc<Runtime>| async move {
let handle = runtime.spawn_blocking(move || -> Result<()> {
let guard = pprof::ProfilerGuardBuilder::default()
.frequency(100)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()
.box_err()
.context(Internal)?;

thread::sleep(Duration::from_secs(duration_sec));

let report = guard.report().build().box_err().context(Internal)?;
let file = File::create("/tmp/flamegraph.svg")
.box_err()
.context(Internal)?;
report.flamegraph(file).box_err().context(Internal)?;
Ok(())
});
let result = handle.await.context(JoinAsyncTask);
match result {
Ok(_) => Ok("ok"),
Err(e) => Err(reject::custom(e)),
}
})
.and_then(
|duration_sec: u64, profiler: Arc<Profiler>, runtime: Arc<Runtime>| async move {
let handle = runtime.spawn_blocking(move || {
profiler.dump_mem_prof(duration_sec).context(ProfileMem)
});
let result = handle.await.context(JoinAsyncTask);
match result {
Ok(Ok(prof_data)) => Ok(prof_data.into_response()),
Ok(Err(e)) => Err(reject::custom(e)),
Err(e) => Err(reject::custom(e)),
}
},
)
}

// GET /debug/config
Expand Down Expand Up @@ -694,7 +685,8 @@ fn error_to_status_code(err: &Error) -> StatusCode {
| Error::MissingSchemaConfigProvider { .. }
| Error::MissingProxy { .. }
| Error::ParseIpAddr { .. }
| Error::ProfileHeap { .. }
| Error::ProfileMem { .. }
| Error::ProfileCPU { .. }
| Error::Internal { .. }
| Error::JoinAsyncTask { .. }
| Error::AlreadyStarted { .. }
Expand Down

0 comments on commit 76ab93d

Please sign in to comment.