Skip to content

Commit

Permalink
immutable runtimes
Browse files Browse the repository at this point in the history
  • Loading branch information
conradludgate committed Oct 26, 2023
1 parent a8805d8 commit ac0b314
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 17 deletions.
22 changes: 10 additions & 12 deletions libs/metrics/src/tokio_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
use std::sync::{Arc, Mutex};

use prometheus::{core::Desc, proto};

use crate::register_internal;

#[derive(Default, Clone)]
#[derive(Default)]
pub struct TokioCollector {
handles: Arc<Mutex<Vec<(tokio::runtime::Handle, String)>>>,
handles: Vec<(tokio::runtime::Handle, String)>,
}

impl TokioCollector {
pub fn add_runtime(&mut self, handle: tokio::runtime::Handle, name: String) -> &mut Self {
self.handles.lock().unwrap().push((handle, name));
pub fn add_runtime(mut self, handle: tokio::runtime::Handle, name: String) -> Self {
self.handles.push((handle, name));
self
}

pub fn register(&self) -> Result<(), prometheus::Error> {
register_internal(Box::new(self.clone()))
pub fn register(self) -> Result<(), prometheus::Error> {
register_internal(Box::new(self))
}

#[cfg(tokio_unstable)]
Expand Down Expand Up @@ -99,7 +97,7 @@ impl TokioCollector {
m.set_help(desc.help.clone());
m.set_field_type(typ);
let mut metrics = vec![];
for (rt, name) in &*self.handles.lock().unwrap() {
for (rt, name) in &self.handles {
let rt_metrics = rt.metrics();

let mut label_name = proto::LabelPair::default();
Expand Down Expand Up @@ -170,13 +168,13 @@ mod tests {
#[test]
fn gather_multiple_runtimes() {
let registry = Registry::new();
let mut collector = TokioCollector::default();

let runtime1 = runtime::Builder::new_current_thread().build().unwrap();
let runtime2 = runtime::Builder::new_current_thread().build().unwrap();

collector.add_runtime(runtime1.handle().clone(), "runtime1".to_owned());
collector.add_runtime(runtime2.handle().clone(), "runtime2".to_owned());
let collector = TokioCollector::default()
.add_runtime(runtime1.handle().clone(), "runtime1".to_owned())
.add_runtime(runtime2.handle().clone(), "runtime2".to_owned());

registry.register(Box::new(collector)).unwrap();

Expand Down
6 changes: 2 additions & 4 deletions pageserver/src/task_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,7 @@ pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(||
});

pub fn runtime_collector() -> TokioCollector {
let mut collector = TokioCollector::default();
collector
TokioCollector::default()
.add_runtime(
COMPUTE_REQUEST_RUNTIME.handle().clone(),
"compute".to_owned(),
Expand All @@ -162,8 +161,7 @@ pub fn runtime_collector() -> TokioCollector {
WALRECEIVER_RUNTIME.handle().clone(),
"walreceiver".to_owned(),
)
.add_runtime(BACKGROUND_RUNTIME.handle().clone(), "background".to_owned());
collector
.add_runtime(BACKGROUND_RUNTIME.handle().clone(), "background".to_owned())
}

#[derive(Debug, Clone, Copy)]
Expand Down
6 changes: 5 additions & 1 deletion safekeeper/src/bin/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ use safekeeper::defaults::{
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
DEFAULT_PG_LISTEN_ADDR,
};
use safekeeper::wal_service;
use safekeeper::GlobalTimelines;
use safekeeper::SafeKeeperConf;
use safekeeper::{broker, WAL_SERVICE_RUNTIME};
use safekeeper::{control_file, BROKER_RUNTIME};
use safekeeper::{http, WAL_REMOVER_RUNTIME};
use safekeeper::{remove_wal, WAL_BACKUP_RUNTIME};
use safekeeper::{runtime_collector, wal_service};
use safekeeper::{wal_backup, HTTP_RUNTIME};
use storage_broker::DEFAULT_ENDPOINT;
use utils::auth::{JwtAuth, Scope};
Expand Down Expand Up @@ -340,6 +340,10 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
let timeline_collector = safekeeper::metrics::TimelineCollector::new();
metrics::register_internal(Box::new(timeline_collector))?;

runtime_collector()
.add_runtime(Handle::current(), "main".to_owned())
.register()?;

let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);

// Keep handles to main tasks to die if any of them disappears.
Expand Down
20 changes: 20 additions & 0 deletions safekeeper/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use ::metrics::tokio_metrics::TokioCollector;
use camino::Utf8PathBuf;
use once_cell::sync::Lazy;
use remote_storage::RemoteStorageConfig;
Expand Down Expand Up @@ -165,3 +166,22 @@ pub static METRICS_SHIFTER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
.build()
.expect("Failed to create broker runtime")
});

pub fn runtime_collector() -> TokioCollector {
TokioCollector::default()
.add_runtime(
WAL_SERVICE_RUNTIME.handle().clone(),
"wal service".to_owned(),
)
.add_runtime(HTTP_RUNTIME.handle().clone(), "http".to_owned())
.add_runtime(BROKER_RUNTIME.handle().clone(), "broker".to_owned())
.add_runtime(
WAL_REMOVER_RUNTIME.handle().clone(),
"wal remover".to_owned(),
)
.add_runtime(WAL_BACKUP_RUNTIME.handle().clone(), "wal backup".to_owned())
.add_runtime(
METRICS_SHIFTER_RUNTIME.handle().clone(),
"metric shifter".to_owned(),
)
}

0 comments on commit ac0b314

Please sign in to comment.