Skip to content

Commit

Permalink
Bridge: Use jemalloc as the global allocator, except for windows (#1051)
Browse files Browse the repository at this point in the history
This PR changes the global allocator for svix-bridge to jemalloc. Said
allocator is only enabled on non-Windows targets.

This also includes some runtime memory metrics exposed via
opentelemetry.
  • Loading branch information
svix-nick authored Aug 24, 2023
1 parent 6095d5a commit 287c57b
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 1 deletion.
39 changes: 39 additions & 0 deletions bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion bridge/svix-bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,20 @@ deno_runtime = "0.119.0"
deno_ast = "0.27.2"
deadpool = { version = "0.9.5", features = ["unmanaged", "rt_tokio_1"] }

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = { version = "0.5", optional = true }
tikv-jemalloc-ctl = { version = "0.5", optional = true, features = ["use_std"] }

[dev-dependencies]
chrono = "0.4"
tower = "0.4"

[features]
default = ["gcp-pubsub", "rabbitmq", "redis", "sqs"]
default = ["gcp-pubsub", "rabbitmq", "redis", "sqs", "jemalloc"]

gcp-pubsub = ["generic-queue"]
generic-queue = ["dep:svix-bridge-plugin-queue"]
rabbitmq = ["generic-queue"]
redis = ["generic-queue"]
sqs = ["generic-queue"]
jemalloc = ["tikv-jemallocator", "tikv-jemalloc-ctl"]
67 changes: 67 additions & 0 deletions bridge/svix-bridge/src/allocator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//! Allocator stats are only available when we're using jemalloc, and jemalloc doesn't work on windows.
//!
//! 2 impls for the helper functions are therefore provided. One set that does nothing (for windows)
//! and another that works in the non-windows world.
//!
//! Care should be taken to keep the signatures aligned between these two so the callsites can be
//! used consistently regardless of whether jemalloc is in use or not.

#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
pub use supported::*;
#[cfg(any(target_env = "msvc", not(feature = "jemalloc")))]
pub use unsupported::*;

#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
mod supported {
use std::sync::Arc;
use tikv_jemalloc_ctl::{epoch, stats};

pub struct AllocatorStatMibs {
epoch: tikv_jemalloc_ctl::epoch_mib,
allocated: stats::allocated_mib,
resident: stats::resident_mib,
}

pub fn get_allocator_stats(
bust_cache: bool,
mibs: Arc<AllocatorStatMibs>,
) -> anyhow::Result<Option<(usize, usize)>> {
if bust_cache {
// Stats are cached internally and advancing the epoch is a way to invalidate those caches.
mibs.epoch.advance()?;
}
let allocated = mibs.allocated.read()?;
let resident = mibs.resident.read()?;
Ok(Some((allocated, resident)))
}

pub fn get_allocator_stat_mibs() -> anyhow::Result<Arc<AllocatorStatMibs>> {
let e = epoch::mib()?;
let allocated = stats::allocated::mib()?;
let resident = stats::resident::mib()?;

Ok(Arc::new(AllocatorStatMibs {
epoch: e,
allocated,
resident,
}))
}
}

#[cfg(any(target_env = "msvc", not(feature = "jemalloc")))]
mod unsupported {
use anyhow::anyhow;
use std::sync::Arc;
pub struct AllocatorStatMibs;

pub fn get_allocator_stats(
_bust_cache: bool,
_mibs: Arc<AllocatorStatMibs>,
) -> anyhow::Result<Option<(usize, usize)>> {
Ok(None)
}

pub fn get_allocator_stat_mibs() -> anyhow::Result<Arc<AllocatorStatMibs>> {
Err(anyhow!("metric collection is not supported"))
}
}
66 changes: 66 additions & 0 deletions bridge/svix-bridge/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,36 @@ use self::config::Config;
use clap::Parser;
use lazy_static::lazy_static;
use opentelemetry::runtime::Tokio;
use opentelemetry::sdk::export::metrics::aggregation::delta_temporality_selector;
use opentelemetry::sdk::metrics::controllers::BasicController;
use opentelemetry::sdk::metrics::selectors;
use opentelemetry_otlp::WithExportConfig;
use std::io::{Error, ErrorKind, Result};
use std::path::PathBuf;
use std::time::Duration;
use svix_bridge_types::{SenderInput, TransformerJob};
use svix_ksuid::{KsuidLike as _, KsuidMs};
#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
use tikv_jemallocator::Jemalloc;
use tracing::Instrument;
use tracing_subscriber::prelude::*;

mod allocator;
mod config;
mod metrics;
mod runtime;
mod webhook_receiver;

use crate::allocator::{get_allocator_stat_mibs, get_allocator_stats};
use crate::metrics::CommonMetrics;

#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;

#[cfg(all(target_env = "msvc", feature = "jemalloc"))]
compile_error!("jemalloc cannot be enabled on msvc");

lazy_static! {
// Seems like it would be useful to be able to configure this.
// In some docker setups, hostname is sometimes the container id, and advertising this can be
Expand Down Expand Up @@ -105,6 +123,25 @@ fn setup_tracing(cfg: &Config) {
};
}

pub fn setup_metrics(cfg: &Config) -> Option<BasicController> {
cfg.opentelemetry.as_ref().map(|otel_cfg| {
let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(&otel_cfg.address);

opentelemetry_otlp::new_pipeline()
.metrics(
selectors::simple::inexpensive(),
delta_temporality_selector(),
Tokio,
)
.with_exporter(exporter)
.with_resource(get_svc_identifiers(cfg))
.build()
.unwrap()
})
}

async fn supervise_senders(inputs: Vec<Box<dyn SenderInput>>) -> Result<()> {
let mut set = tokio::task::JoinSet::new();
for input in inputs {
Expand Down Expand Up @@ -190,8 +227,37 @@ async fn main() -> Result<()> {
let vars = std::env::vars().collect();
let cfg = Config::from_src(&cfg_source, Some(vars).as_ref())?;
setup_tracing(&cfg);
let _metrics = setup_metrics(&cfg);
tracing::info!("starting");

tokio::spawn(
async move {
let mut interval = tokio::time::interval(Duration::from_secs(15));
let metrics = CommonMetrics::new(&opentelemetry::global::meter("svix.com"));
match get_allocator_stat_mibs() {
Ok(mibs) => {
tracing::debug!("Common Metrics Collection: Started");

loop {
interval.tick().await;

if let Ok(Some((allocated, resident))) =
get_allocator_stats(true, mibs.clone())
{
metrics.record_mem_allocated(allocated as _);
metrics.record_mem_resident(resident as _);
}
}
}
Err(e) => tracing::error!("Unable to get allocator stats mibs: {e}"),
}
}
.instrument(tracing::error_span!(
"common_metrics_collector",
instance_id = tracing::field::Empty
)),
);

let (xform_tx, mut xform_rx) = tokio::sync::mpsc::unbounded_channel::<TransformerJob>();

// XXX: this is a bit nasty, but might be okay to start.
Expand Down
44 changes: 44 additions & 0 deletions bridge/svix-bridge/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use opentelemetry::metrics::{Meter, ObservableGauge};
use opentelemetry::Context;

fn init_metric<T, E: std::fmt::Display>(result: Result<T, E>) -> Option<T> {
match result {
Ok(t) => Some(t),
Err(e) => {
tracing::error!("Failed to initialize metric: {}", e);
None
}
}
}

#[derive(Clone)]
pub struct CommonMetrics {
mem_allocated_recorder: Option<ObservableGauge<u64>>,
mem_resident_recorder: Option<ObservableGauge<u64>>,
}

impl CommonMetrics {
pub fn new(meter: &Meter) -> Self {
let mem_resident_recorder =
init_metric(meter.u64_observable_gauge("svix.mem_resident").try_init());
let mem_allocated_recorder =
init_metric(meter.u64_observable_gauge("svix.mem_allocated").try_init());

Self {
mem_allocated_recorder,
mem_resident_recorder,
}
}

pub fn record_mem_allocated(&self, value: u64) {
if let Some(ref recorder) = self.mem_allocated_recorder {
recorder.observe(&Context::current(), value, &[]);
}
}

pub fn record_mem_resident(&self, value: u64) {
if let Some(ref recorder) = self.mem_resident_recorder {
recorder.observe(&Context::current(), value, &[]);
}
}
}

0 comments on commit 287c57b

Please sign in to comment.