Skip to content

Commit

Permalink
Auto merge of #710 - Mark-Simulacrum:track-worker-count, r=Mark-Simul…
Browse files Browse the repository at this point in the history
…acrum

In-memory tracking for active worker count
  • Loading branch information
bors committed Nov 5, 2023
2 parents 4d35849 + 8ed29a5 commit 8491684
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 2 deletions.
5 changes: 5 additions & 0 deletions src/agent/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,15 @@ impl ResponseExt for ::reqwest::blocking::Response {
pub struct AgentApi {
url: String,
token: String,
random_id: String,
}

impl AgentApi {
pub fn new(url: &str, token: &str) -> Self {
AgentApi {
url: url.to_string(),
token: token.to_string(),
random_id: format!("{:X}{:X}", rand::random::<u64>(), rand::random::<u64>()),
}
}

Expand Down Expand Up @@ -200,6 +202,9 @@ impl AgentApi {
self.retry(|this| {
let _: bool = this
.build_request(Method::POST, "heartbeat")
.json(&json!({
"id": self.random_id,
}))
.send()?
.to_api_response()?;
Ok(())
Expand Down
30 changes: 29 additions & 1 deletion src/server/agents.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use crate::prelude::*;
use crate::server::tokens::Tokens;
use chrono::Duration;
use chrono::{DateTime, Utc};
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::{Arc, Mutex};

/// Number of seconds without an heartbeat after an agent should be considered unreachable.
const INACTIVE_AFTER: i64 = 300;
Expand Down Expand Up @@ -74,15 +76,41 @@ impl Agent {
#[derive(Clone)]
pub struct Agents {
db: Database,
// worker -> timestamp
current_workers: Arc<Mutex<HashMap<String, (WorkerInfo, std::time::Instant)>>>,
}

#[derive(Deserialize)]
pub struct WorkerInfo {
id: String,
}

impl Agents {
pub fn new(db: Database, tokens: &Tokens) -> Fallible<Self> {
let agents = Agents { db };
let agents = Agents {
db,
current_workers: Arc::new(Mutex::new(HashMap::new())),
};
agents.synchronize(tokens)?;
Ok(agents)
}

pub fn active_worker_count(&self) -> usize {
let mut guard = self.current_workers.lock().unwrap();
guard.retain(|_, (_, timestamp)| {
// It's been 10 minutes since we heard from this worker, drop it from our active list.
timestamp.elapsed() > std::time::Duration::from_secs(60 * 10)
});
guard.len()
}

pub fn add_worker(&self, id: WorkerInfo) {
self.current_workers
.lock()
.unwrap()
.insert(id.id.clone(), (id, std::time::Instant::now()));
}

fn synchronize(&self, tokens: &Tokens) -> Fallible<()> {
self.db.transaction(|trans| {
let mut real = tokens.agents.values().collect::<HashSet<&String>>();
Expand Down
10 changes: 10 additions & 0 deletions src/server/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const AGENT_WORK_METRIC: &str = "crater_agent_supposed_to_work";
const AGENT_FAILED: &str = "crater_agent_failure";
const LAST_CRATES_UPDATE_METRIC: &str = "crater_last_crates_update";
const ENDPOINT_TIME: &str = "crater_endpoint_time_seconds";
const WORKER_COUNT: &str = "crater_worker_count";

#[derive(Clone)]
pub struct Metrics {
Expand All @@ -19,6 +20,7 @@ pub struct Metrics {
crater_work_status: IntGaugeVec,
crater_last_crates_update: IntGauge,
pub crater_endpoint_time: HistogramVec,
crater_worker_count: IntGauge,
}

impl Metrics {
Expand Down Expand Up @@ -46,16 +48,24 @@ impl Metrics {
&["endpoint"]
)?;

let crater_worker_count = prometheus::opts!(WORKER_COUNT, "number of active workers");
let crater_worker_count = prometheus::register_int_gauge!(crater_worker_count)?;

Ok(Metrics {
crater_completed_jobs_total,
crater_bounced_record_progress,
crater_agent_failure,
crater_work_status,
crater_last_crates_update,
crater_endpoint_time,
crater_worker_count,
})
}

pub fn record_worker_count(&self, count: usize) {
self.crater_worker_count.set(count as i64);
}

pub fn record_error(&self, agent: &str, experiment: &str) {
self.crater_agent_failure
.with_label_values(&[agent, experiment])
Expand Down
11 changes: 10 additions & 1 deletion src/server/routes/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::agent::Capabilities;
use crate::experiments::{Assignee, Experiment};
use crate::prelude::*;
use crate::results::{DatabaseDB, EncodingType, ProgressData};
use crate::server::agents::WorkerInfo;
use crate::server::api_types::{AgentConfig, ApiResponse};
use crate::server::auth::{auth_filter, AuthDetails};
use crate::server::messages::Message;
Expand Down Expand Up @@ -68,6 +69,7 @@ pub fn routes(
let heartbeat = warp::post()
.and(warp::path("heartbeat"))
.and(warp::path::end())
.and(warp::body::json())
.and(data_filter)
.and(auth_filter(data.clone()))
.map(endpoint_heartbeat);
Expand Down Expand Up @@ -318,12 +320,19 @@ fn endpoint_record_progress(
ret
}

fn endpoint_heartbeat(data: Arc<Data>, auth: AuthDetails) -> Fallible<Response<Body>> {
fn endpoint_heartbeat(
id: WorkerInfo,
data: Arc<Data>,
auth: AuthDetails,
) -> Fallible<Response<Body>> {
data.agents.add_worker(id);
if let Some(rev) = auth.git_revision {
data.agents.set_git_revision(&auth.name, &rev)?;
}

data.agents.record_heartbeat(&auth.name)?;
data.metrics
.record_worker_count(data.agents.active_worker_count());
Ok(ApiResponse::Success { result: true }.into_response()?)
}

Expand Down

0 comments on commit 8491684

Please sign in to comment.