Skip to content

Commit

Permalink
enhancement(observability): add tracking allocations (#14995)
Browse files Browse the repository at this point in the history
* add allocations reporting

* checkpoint

* try separating allocs and deallocs

* checkpoint bug

* checkpoint with object size limit

* checkpoint 1024

* remove heuristic and try CachePadded

* more comments

* test

* unsafecell

* more inlines

* no zeroing

* back to refcell

* checkpoint

* added a gauge

* checkpoint

* fixed deadlock

* Update src/internal_telemetry/allocations/mod.rs

Co-authored-by: Jesse Szwedko <jesse@szwedko.me>

* remove group id from gauge

* try reducing atomic contention

* checkpoint

* init thread ids early

* checkpoint

* see if overhead is from thread_id

* checkpoint

* checkpoint

* try thread locals w/ const init

* checkpoint

* forgot the %

* go back to const thread locals

* do no bounds checking when tracing

* checkpoint

* checkpoint

* remove unnecessary branch

* add bounds checking

* enable macOS again

* groupstack update

* checkpoint

Co-authored-by: Jesse Szwedko <jesse@szwedko.me>
  • Loading branch information
arshiyasolei and jszwedko authored Nov 9, 2022
1 parent 31383ef commit 225eac5
Show file tree
Hide file tree
Showing 11 changed files with 204 additions and 47 deletions.
29 changes: 29 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ typetag = { version = "0.2.3", default-features = false }
url = { version = "2.3.1", default-features = false, features = ["serde"] }
uuid = { version = "1", default-features = false, features = ["serde", "v4"] }
warp = { version = "0.3.3", default-features = false }
arr_macro = { version = "0.1.3" }
crossbeam-utils = { version = "0.8.12", default-features = false }

# depending on fork for bumped nix dependency
# https://github.com/heim-rs/heim/pull/360
Expand Down
7 changes: 7 additions & 0 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@ impl Application {
let mut rt_builder = runtime::Builder::new_multi_thread();
rt_builder.enable_all().thread_name("vector-worker");

#[cfg(feature = "allocation-tracing")]
{
rt_builder.on_thread_start(|| {
crate::internal_telemetry::allocations::init_thread_id();
});
}

if let Some(threads) = root_opts.threads {
if threads < 1 {
#[allow(clippy::print_stderr)]
Expand Down
2 changes: 1 addition & 1 deletion src/internal_telemetry/allocations/allocator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub use self::tracing_allocator::GroupedTraceableAllocator;
/// aggregating and processing the allocator events. While `GroupedTraceableAllocator` already
/// avoids reentrantly tracing (de)allocations, this method provides a way to do so from _outside_
/// of the `GlobalAlloc` codepath.
#[allow(dead_code)]
#[inline(always)]
pub fn without_allocation_tracing<F>(f: F)
where
F: FnOnce(),
Expand Down
13 changes: 4 additions & 9 deletions src/internal_telemetry/allocations/allocator/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,23 @@ impl<const N: usize> GroupStack<N> {
///
/// If the stack is empty, then the root allocation group is the defacto active allocation group, and is returned as such.
pub const fn current(&self) -> AllocationGroupId {
if self.current_top == 0 {
AllocationGroupId::ROOT
} else {
self.slots[self.current_top - 1]
}
self.slots[self.current_top]
}

/// Pushes an allocation group on to the stack, marking it as the active allocation group.
pub fn push(&mut self, group: AllocationGroupId) {
self.current_top += 1;
if self.current_top >= self.slots.len() {
panic!("tried to push new allocation group to the current stack, but hit the limit of {} entries", N);
}
self.slots[self.current_top] = group;
self.current_top += 1;
}

/// Pops and returns the previous allocation group that was on the stack.
pub fn pop(&mut self) -> AllocationGroupId {
/// Pops the previous allocation group that was on the stack.
pub fn pop(&mut self) {
if self.current_top == 0 {
panic!("tried to pop current allocation group from the stack but the stack is empty");
}
self.current_top -= 1;
self.slots[self.current_top]
}
}
2 changes: 1 addition & 1 deletion src/internal_telemetry/allocations/allocator/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ thread_local! {
/// Any allocations which occur on this thread will be associated with whichever token is
/// present at the time of the allocation.
pub(crate) static LOCAL_ALLOCATION_GROUP_STACK: RefCell<GroupStack<256>> =
RefCell::new(GroupStack::new());
const { RefCell::new(GroupStack::new()) };
}

/// The identifier that uniquely identifiers an allocation group.
Expand Down
20 changes: 6 additions & 14 deletions src/internal_telemetry/allocations/allocator/tracing_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ impl<A, T> GroupedTraceableAllocator<A, T> {
}

impl<A: GlobalAlloc, T: Tracer> GroupedTraceableAllocator<A, T> {
#[inline(always)]
unsafe fn get_wrapped_allocation(
&self,
object_layout: Layout,
Expand All @@ -33,13 +34,10 @@ impl<A: GlobalAlloc, T: Tracer> GroupedTraceableAllocator<A, T> {
handle_alloc_error(actual_layout);
}

// Zero out the group ID field to make sure it's in the `None` state.
//
// SAFETY: We know that `actual_ptr` is at least aligned enough for casting it to `*mut usize` as the layout for
// the allocation backing this pointer ensures the first field in the layout is `usize.
#[allow(clippy::cast_ptr_alignment)]
let group_id_ptr = actual_ptr.cast::<usize>();
group_id_ptr.write(0);

// SAFETY: If the allocation succeeded and `actual_ptr` is valid, then it must be valid to advance by
// `offset_to_object` as it would land within the allocation.
Expand All @@ -59,13 +57,6 @@ unsafe impl<A: GlobalAlloc, T: Tracer> GlobalAlloc for GroupedTraceableAllocator
try_with_suspended_allocation_group(
#[inline(always)]
|group_id| {
// We only set the group ID in the wrapper header if we're tracing an allocation, because when it
// comes back to us during deallocation, we want to skip doing any checks at all if it's already
// zero.
//
// If we never trace the allocation, tracing the deallocation will only produce incorrect numbers,
// and that includes even if we just used the rule of "always attribute allocations to the root
// allocation group by default".
group_id_ptr.write(group_id.as_raw());
self.tracer.trace_allocation(object_size, group_id);
},
Expand Down Expand Up @@ -99,21 +90,22 @@ unsafe impl<A: GlobalAlloc, T: Tracer> GlobalAlloc for GroupedTraceableAllocator

try_with_suspended_allocation_group(
#[inline(always)]
|_| self.tracer.trace_deallocation(object_size, source_group_id),
|_| {
self.tracer.trace_deallocation(object_size, source_group_id);
},
);
}
}

#[inline(always)]
fn get_wrapped_layout(object_layout: Layout) -> (Layout, usize) {
static HEADER_LAYOUT: Layout = Layout::new::<usize>();

// We generate a new allocation layout that gives us a location to store the active allocation group ID ahead
// of the requested allocation, which lets us always attempt to retrieve it on the deallocation path. We'll
// always set this to zero, and conditionally update it to the actual allocation group ID if tracking is enabled.
// of the requested allocation, which lets us always attempt to retrieve it on the deallocation path.
let (actual_layout, offset_to_object) = HEADER_LAYOUT
.extend(object_layout)
.expect("wrapping requested layout resulted in overflow");
let actual_layout = actual_layout.pad_to_align();

(actual_layout, offset_to_object)
}
134 changes: 128 additions & 6 deletions src/internal_telemetry/allocations/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,74 @@
//! Allocation tracking exposed via internal telemetry.

mod allocator;
use std::{
cell::Cell,
sync::{
atomic::{AtomicI64, AtomicUsize, Ordering},
Mutex,
},
thread,
time::Duration,
};

use arr_macro::arr;
use metrics::gauge;
use rand_distr::num_traits::ToPrimitive;

use self::allocator::Tracer;
pub(crate) use self::allocator::{AllocationGroupId, AllocationLayer, GroupedTraceableAllocator};

pub(crate) use self::allocator::{
without_allocation_tracing, AllocationGroupId, AllocationLayer, GroupedTraceableAllocator,
};

use crossbeam_utils::CachePadded;

const NUM_GROUPS: usize = 128;
const NUM_BUCKETS: usize = 8;
/// These arrays represent the memory usage for each group per thread.
///
/// Each thread is meant to update it's own group statistics, which significantly reduces atomic contention.
/// We pad each Atomic to reduce false sharing effects.
///
/// TODO:
///
/// Currently, we reach atomic contention once the number of threads exceeds 8. One potential solution
/// involves using thread locals to track memory stats.
static GROUP_MEM_STATS: [[CachePadded<AtomicI64>; NUM_GROUPS]; NUM_BUCKETS] = [
arr![CachePadded::new(AtomicI64::new(0)); 128],
arr![CachePadded::new(AtomicI64::new(0)); 128],
arr![CachePadded::new(AtomicI64::new(0)); 128],
arr![CachePadded::new(AtomicI64::new(0)); 128],
arr![CachePadded::new(AtomicI64::new(0)); 128],
arr![CachePadded::new(AtomicI64::new(0)); 128],
arr![CachePadded::new(AtomicI64::new(0)); 128],
arr![CachePadded::new(AtomicI64::new(0)); 128],
];

// TODO: Replace this with [`std::thread::ThreadId::as_u64`] once it is stabilized.
static THREAD_COUNTER: AtomicUsize = AtomicUsize::new(0);

thread_local! {
static THREAD_ID: Cell<usize> = const { Cell::new(0) };
}
// By using the Option type, we can do statics w/o the need of other creates such as lazy_static
struct GroupInfo {
component_kind: Option<String>,
component_type: Option<String>,
component_id: Option<String>,
}

impl GroupInfo {
const fn new() -> Self {
Self {
component_id: None,
component_kind: None,
component_type: None,
}
}
}

static GROUP_INFO: [Mutex<GroupInfo>; NUM_GROUPS] = arr![Mutex::new(GroupInfo::new()); 128];

pub type Allocator<A> = GroupedTraceableAllocator<A, MainTracer>;

Expand All @@ -15,14 +80,50 @@ pub struct MainTracer;

impl Tracer for MainTracer {
#[inline(always)]
fn trace_allocation(&self, _object_size: usize, _group_id: AllocationGroupId) {}
fn trace_allocation(&self, object_size: usize, group_id: AllocationGroupId) {
GROUP_MEM_STATS[THREAD_ID.with(|t| t.get()) % 8][group_id.as_raw()]
.fetch_add(object_size as i64, Ordering::Relaxed);
}

#[inline(always)]
fn trace_deallocation(&self, _object_size: usize, _source_group_id: AllocationGroupId) {}
fn trace_deallocation(&self, object_size: usize, source_group_id: AllocationGroupId) {
GROUP_MEM_STATS[THREAD_ID.with(|t| t.get()) % 8][source_group_id.as_raw()]
.fetch_sub(object_size as i64, Ordering::Relaxed);
}
}

/// Initializes allocation tracing.
pub const fn init_allocation_tracing() {}
pub fn init_allocation_tracing() {
let alloc_processor = thread::Builder::new().name("vector-alloc-processor".to_string());
alloc_processor
.spawn(|| {
without_allocation_tracing(|| loop {
for group_idx in 0..NUM_GROUPS {
let mut mem_used = 0;
for bucket in &GROUP_MEM_STATS {
mem_used += bucket[group_idx].load(Ordering::Relaxed);
}
if mem_used == 0 {
continue;
}
let group_info = GROUP_INFO[group_idx].lock().unwrap();
gauge!(
"component_allocated_bytes",
mem_used.to_f64().expect("failed to convert group_id from int to float"),
"component_kind" => group_info.component_kind.clone().unwrap_or_else(|| "root".to_string()),
"component_type" => group_info.component_type.clone().unwrap_or_else(|| "root".to_string()),
"component_id" => group_info.component_id.clone().unwrap_or_else(|| "root".to_string()));
}
thread::sleep(Duration::from_millis(5000));
})
})
.unwrap();
}

/// Initializes the thread local ID.
pub fn init_thread_id() {
THREAD_ID.with(|t| t.replace(THREAD_COUNTER.fetch_add(1, Ordering::Relaxed)));
}

/// Acquires an allocation group ID.
///
Expand All @@ -31,6 +132,27 @@ pub const fn init_allocation_tracing() {}
/// a [`tracing::Span`] to achieve this" we utilize the logical invariants provided by spans --
/// entering, exiting, and how spans exist as a stack -- in order to handle keeping the "current
/// allocation group" accurate across all threads.
pub fn acquire_allocation_group_id() -> AllocationGroupId {
AllocationGroupId::register().expect("failed to register allocation group token")
pub fn acquire_allocation_group_id(
component_id: String,
component_type: String,
component_kind: String,
) -> AllocationGroupId {
let group_id =
AllocationGroupId::register().expect("failed to register allocation group token");
let idx = group_id.as_raw();
match GROUP_INFO.get(idx) {
Some(mutex) => {
let mut writer = mutex.lock().unwrap();
*writer = GroupInfo {
component_id: Some(component_id),
component_kind: Some(component_kind),
component_type: Some(component_type),
};
group_id
}
None => {
info!("Maximum number of registrable allocation group IDs reached ({}). Allocations for component '{}' will be attributed to the root allocation group.", NUM_GROUPS, component_id);
AllocationGroupId::ROOT
}
}
}
11 changes: 2 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,11 @@ extern crate tracing;
#[macro_use]
extern crate derivative;

#[cfg(any(
all(feature = "tikv-jemallocator", target_os = "macos"),
all(feature = "tikv-jemallocator", not(feature = "allocation-tracing"))
))]
#[cfg(all(feature = "tikv-jemallocator", not(feature = "allocation-tracing")))]
#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;

#[cfg(all(
feature = "tikv-jemallocator",
feature = "allocation-tracing",
not(target_os = "macos")
))]
#[cfg(all(feature = "tikv-jemallocator", feature = "allocation-tracing",))]
#[global_allocator]
static ALLOC: self::internal_telemetry::allocations::Allocator<tikv_jemallocator::Jemalloc> =
self::internal_telemetry::allocations::get_grouped_tracing_allocator(
Expand Down
13 changes: 9 additions & 4 deletions src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,10 +420,15 @@ pub fn lines_from_zstd_file<P: AsRef<Path>>(path: P) -> Vec<String> {
}

pub fn runtime() -> runtime::Runtime {
runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
let mut rt_builder = runtime::Builder::new_multi_thread();
rt_builder.enable_all();
#[cfg(feature = "allocation-tracing")]
{
rt_builder.on_thread_start(|| {
crate::internal_telemetry::allocations::init_thread_id();
});
}
rt_builder.build().unwrap()
}

// Wait for a Future to resolve, or the duration to elapse (will panic)
Expand Down
Loading

0 comments on commit 225eac5

Please sign in to comment.