Skip to content

Commit

Permalink
Record when job is queued in scheduler metrics (#28)
Browse files Browse the repository at this point in the history
* Record when job is queueud in scheduler metrics

* add additional buckets for exec times
  • Loading branch information
thinkharderdev authored and fsdvh committed Feb 15, 2023
1 parent 9cb8ed1 commit 13572f3
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 4 deletions.
5 changes: 5 additions & 0 deletions ballista/scheduler/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ use std::sync::Arc;
/// will be passed when constructing the `QueryStageScheduler` which is the core event loop of the scheduler.
/// The event loop will then record metric events through this trait.
pub trait SchedulerMetricsCollector: Send + Sync {
/// Record that job with `job_id` was queued. This will be invoked
/// when the job is queued for scheduling.
/// When invoked should specify the timestamp in milliseconds when the job was queued
fn record_queued(&self, job_id: &str, queued_at: u64);
/// Record that job with `job_id` was submitted. This will be invoked
/// after the job's `ExecutionGraph` is created and it is ready to be scheduled
/// on executors.
Expand Down Expand Up @@ -62,6 +66,7 @@ pub trait SchedulerMetricsCollector: Send + Sync {
pub struct NoopMetricsCollector {}

impl SchedulerMetricsCollector for NoopMetricsCollector {
fn record_queued(&self, _job_id: &str, _queued_at: u64) {}
fn record_submitted(&self, _job_id: &str, _queued_at: u64, _submitted_at: u64) {}
fn record_completed(&self, _job_id: &str, _queued_at: u64, _completed_att: u64) {}
fn record_failed(&self, _job_id: &str, _queued_at: u64, _failed_at: u64) {}
Expand Down
27 changes: 25 additions & 2 deletions ballista/scheduler/src/metrics/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,17 @@ pub struct PrometheusMetricsCollector {
completed: Counter,
submitted: Counter,
pending_queue_size: Gauge,
active_jobs: Gauge,
}

impl PrometheusMetricsCollector {
pub fn new(registry: &Registry) -> Result<Self> {
let execution_time = register_histogram_with_registry!(
"job_exec_time_seconds",
"Histogram of successful job execution time in seconds",
vec![0.5_f64, 1_f64, 5_f64, 30_f64, 60_f64],
vec![
0.5_f64, 1_f64, 5_f64, 30_f64, 60_f64, 120_f64, 180_f64, 240_f64, 300_f64
],
registry
)
.map_err(|e| {
Expand All @@ -63,7 +66,10 @@ impl PrometheusMetricsCollector {
let planning_time = register_histogram_with_registry!(
"planning_time_ms",
"Histogram of job planning time in milliseconds",
vec![1.0_f64, 5.0_f64, 25.0_f64, 100.0_f64, 500.0_f64],
vec![
1.0_f64, 5.0_f64, 25.0_f64, 100.0_f64, 200.0_f64, 300_f64, 400_f64,
500_f64, 1_000_f64, 2_000_f64, 3_000_f64
],
registry
)
.map_err(|e| {
Expand Down Expand Up @@ -115,6 +121,15 @@ impl PrometheusMetricsCollector {
BallistaError::Internal(format!("Error registering metric: {e:?}"))
})?;

let active_jobs = register_gauge_with_registry!(
"active_job_count",
"Number of active jobs on the scheduler",
registry
)
.map_err(|e| {
BallistaError::Internal(format!("Error registering metric: {:?}", e))
})?;

Ok(Self {
execution_time,
planning_time,
Expand All @@ -123,6 +138,7 @@ impl PrometheusMetricsCollector {
completed,
submitted,
pending_queue_size,
active_jobs,
})
}

Expand All @@ -138,6 +154,10 @@ impl PrometheusMetricsCollector {
}

impl SchedulerMetricsCollector for PrometheusMetricsCollector {
fn record_queued(&self, _job_id: &str, _queued_at: u64) {
self.active_jobs.inc()
}

fn record_submitted(&self, _job_id: &str, queued_at: u64, submitted_at: u64) {
self.submitted.inc();
self.planning_time
Expand All @@ -146,15 +166,18 @@ impl SchedulerMetricsCollector for PrometheusMetricsCollector {

fn record_completed(&self, _job_id: &str, queued_at: u64, completed_at: u64) {
self.completed.inc();
self.active_jobs.dec();
self.execution_time
.observe((completed_at - queued_at) as f64 / 1000_f64)
}

fn record_failed(&self, _job_id: &str, _queued_at: u64, _failed_at: u64) {
self.active_jobs.dec();
self.failed.inc()
}

fn record_cancelled(&self, _job_id: &str) {
self.active_jobs.dec();
self.cancelled.inc();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
} => {
info!("Job {} queued with name {:?}", job_id, job_name);

self.metrics_collector
.record_queued(&job_id, timestamp_millis());

self.state
.task_manager
.queue_job(&job_id, &job_name, queued_at)
Expand Down
17 changes: 15 additions & 2 deletions ballista/scheduler/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,7 @@ impl SchedulerTest {

#[derive(Clone)]
pub enum MetricEvent {
Queued(String, u64),
Submitted(String, u64, u64),
Completed(String, u64, u64),
Cancelled(String),
Expand All @@ -682,6 +683,7 @@ pub enum MetricEvent {
impl MetricEvent {
pub fn job_id(&self) -> &str {
match self {
MetricEvent::Queued(job, _) => job.as_str(),
MetricEvent::Submitted(job, _, _) => job.as_str(),
MetricEvent::Completed(job, _, _) => job.as_str(),
MetricEvent::Cancelled(job) => job.as_str(),
Expand Down Expand Up @@ -713,6 +715,11 @@ impl TestMetricsCollector {
}

impl SchedulerMetricsCollector for TestMetricsCollector {
fn record_queued(&self, job_id: &str, queued_at: u64) {
let mut guard = self.events.lock();
guard.push(MetricEvent::Queued(job_id.to_owned(), queued_at));
}

fn record_submitted(&self, job_id: &str, queued_at: u64, submitted_at: u64) {
let mut guard = self.events.lock();
guard.push(MetricEvent::Submitted(
Expand Down Expand Up @@ -749,12 +756,18 @@ impl SchedulerMetricsCollector for TestMetricsCollector {
}

pub fn assert_submitted_event(job_id: &str, collector: &TestMetricsCollector) {
let found = collector
let queued = collector
.job_events(job_id)
.iter()
.any(|ev| matches!(ev, MetricEvent::Queued(_, _)));

let submitted = collector
.job_events(job_id)
.iter()
.any(|ev| matches!(ev, MetricEvent::Submitted(_, _, _)));

assert!(found, "{}", "Expected submitted event for job {job_id}");
assert!(queued, "Expected queued event for job {}", job_id);
assert!(submitted, "Expected submitted event for job {}", job_id);
}

pub fn assert_no_submitted_event(job_id: &str, collector: &TestMetricsCollector) {
Expand Down

0 comments on commit 13572f3

Please sign in to comment.