Skip to content

Commit

Permalink
Merge pull request #19 from coralogix/sc-5792
Browse files Browse the repository at this point in the history
sc-5792: pending queue variable
  • Loading branch information
fsdvh authored Oct 20, 2022
2 parents a7f1384 + 53cdc2b commit 08140ef
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExternalScaler
Ok(Response::new(GetMetricsResponse {
metric_values: vec![MetricValue {
metric_name: INFLIGHT_TASKS_METRIC_NAME.to_string(),
metric_value: 10000000, // A very high number to saturate the HPA
metric_value: self.state.task_manager.get_pending_task_queue_size()
as i64,
}],
}))
}
Expand Down
24 changes: 24 additions & 0 deletions ballista/rust/scheduler/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,24 +351,37 @@ mod test {

let plan = test_graph(session_ctx.clone()).await;

assert_eq!(state.task_manager.get_pending_task_queue_size(), 0);

// Create 4 jobs so we have four pending tasks
state
.task_manager
.submit_job("job-1", session_ctx.session_id().as_str(), plan.clone(), 0)
.await?;

assert_eq!(state.task_manager.get_pending_task_queue_size(), 1);

state
.task_manager
.submit_job("job-2", session_ctx.session_id().as_str(), plan.clone(), 0)
.await?;

assert_eq!(state.task_manager.get_pending_task_queue_size(), 2);

state
.task_manager
.submit_job("job-3", session_ctx.session_id().as_str(), plan.clone(), 0)
.await?;

assert_eq!(state.task_manager.get_pending_task_queue_size(), 3);

state
.task_manager
.submit_job("job-4", session_ctx.session_id().as_str(), plan.clone(), 0)
.await?;

assert_eq!(state.task_manager.get_pending_task_queue_size(), 4);

let executors = test_executors(1, 4);

let (executor_metadata, executor_data) = executors[0].clone();
Expand All @@ -381,6 +394,7 @@ mod test {
let result = state.offer_reservation(reservations).await?;

assert!(result.is_empty());
assert_eq!(state.task_manager.get_pending_task_queue_size(), 0);

// All task slots should be assigned so we should not be able to reserve more tasks
let reservations = state.executor_manager.reserve_slots(4).await?;
Expand Down Expand Up @@ -408,12 +422,16 @@ mod test {

let plan = test_graph(session_ctx.clone()).await;

assert_eq!(state.task_manager.get_pending_task_queue_size(), 0);

// Create a job
state
.task_manager
.submit_job("job-1", session_ctx.session_id().as_str(), plan.clone(), 0)
.await?;

assert_eq!(state.task_manager.get_pending_task_queue_size(), 1);

let executors = test_executors(1, 4);

let (executor_metadata, executor_data) = executors[0].clone();
Expand Down Expand Up @@ -450,6 +468,8 @@ mod test {
)
.await?;

assert_eq!(state.task_manager.get_pending_task_queue_size(), 1);

state
.executor_manager
.register_executor(executor_metadata, executor_data, false)
Expand All @@ -459,12 +479,16 @@ mod test {

assert_eq!(reservations.len(), 1);

assert_eq!(state.task_manager.get_pending_task_queue_size(), 1);

// Offer the reservation. It should be filled with one of the 4 pending tasks. The other 3 should
// be reserved for the other 3 tasks, emitting another offer event
let reservations = state.offer_reservation(reservations).await?;

assert_eq!(reservations.len(), 3);

assert_eq!(state.task_manager.get_pending_task_queue_size(), 0);

// Remaining 3 task slots should be reserved for pending tasks
let reservations = state.executor_manager.reserve_slots(4).await?;

Expand Down
117 changes: 77 additions & 40 deletions ballista/rust/scheduler/src/state/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use crate::state::execution_graph::{ExecutionGraph, Task};
use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
use crate::state::{decode_protobuf, encode_protobuf, with_lock};
use ballista_core::config::BallistaConfig;
#[cfg(not(test))]
use ballista_core::error::BallistaError;
use ballista_core::error::Result;
use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
Expand All @@ -42,6 +41,7 @@ use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use std::collections::HashMap;
use std::default::Default;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock;
Expand All @@ -60,6 +60,7 @@ pub struct TaskManager<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
scheduler_id: String,
// Cache for active execution graphs curated by this scheduler
active_job_cache: ExecutionGraphCache,
pending_task_queue_size: Arc<AtomicUsize>,
}

impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U> {
Expand All @@ -76,6 +77,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
codec,
scheduler_id,
active_job_cache: Arc::new(RwLock::new(HashMap::new())),
pending_task_queue_size: Arc::new(AtomicUsize::new(0)),
}
}

Expand All @@ -101,11 +103,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
.await?;

graph.revive();
let available_tasks = graph.available_tasks();

let mut active_graph_cache = self.active_job_cache.write().await;
active_graph_cache.insert(job_id.to_owned(), Arc::new(RwLock::new(graph)));

Ok(())
self.increase_pending_queue_size(available_tasks)
}

/// Get the status of of a job. First look in the active cache.
Expand Down Expand Up @@ -199,16 +202,19 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
let mut pending_tasks = 0usize;
let mut assign_tasks = 0usize;
let job_cache = self.active_job_cache.read().await;

for (_job_id, graph) in job_cache.iter() {
let mut graph = graph.write().await;
for reservation in free_reservations.iter().skip(assign_tasks) {
if let Some(task) = graph.pop_next_task(&reservation.executor_id)? {
assignments.push((reservation.executor_id.clone(), task));
assign_tasks += 1;
self.decrease_pending_queue_size(1)?;
} else {
break;
}
}

if assign_tasks >= free_reservations.len() {
pending_tasks = graph.available_tasks();
break;
Expand Down Expand Up @@ -253,18 +259,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
) -> Result<()> {
let lock = self.state.lock(Keyspace::ActiveJobs, "").await?;

let running_tasks = self
.get_execution_graph(job_id)
.await
.map(|graph| graph.running_tasks())
.unwrap_or_else(|_| vec![]);

info!(
"Cancelling {} running tasks for job {}",
running_tasks.len(),
job_id
);

let failed_at = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
Expand All @@ -273,39 +267,47 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
self.fail_job_inner(lock, job_id, "Cancelled".to_owned(), failed_at)
.await?;

let mut tasks: HashMap<&str, Vec<protobuf::PartitionId>> = Default::default();

for (partition, executor_id) in &running_tasks {
if let Some(parts) = tasks.get_mut(executor_id.as_str()) {
parts.push(protobuf::PartitionId {
job_id: job_id.to_owned(),
stage_id: partition.stage_id as u32,
partition_id: partition.partition_id as u32,
})
} else {
tasks.insert(
executor_id.as_str(),
vec![protobuf::PartitionId {
if let Ok(graph) = self.get_execution_graph(job_id).await {
let running_tasks = graph.running_tasks();
let mut tasks: HashMap<&str, Vec<protobuf::PartitionId>> = Default::default();

info!(
"Cancelling {} running tasks for job {}",
running_tasks.len(),
job_id
);
for (partition, executor_id) in &running_tasks {
if let Some(parts) = tasks.get_mut(executor_id.as_str()) {
parts.push(protobuf::PartitionId {
job_id: job_id.to_owned(),
stage_id: partition.stage_id as u32,
partition_id: partition.partition_id as u32,
}],
);
})
} else {
tasks.insert(
executor_id.as_str(),
vec![protobuf::PartitionId {
job_id: job_id.to_owned(),
stage_id: partition.stage_id as u32,
partition_id: partition.partition_id as u32,
}],
);
}
}
}

for (executor_id, partitions) in tasks {
if let Ok(mut client) = executor_manager.get_client(executor_id).await {
client
.cancel_tasks(CancelTasksParams {
partition_id: partitions,
})
.await?;
} else {
error!("Failed to get client for executor ID {}", executor_id)
for (executor_id, partitions) in tasks {
if let Ok(mut client) = executor_manager.get_client(executor_id).await {
client
.cancel_tasks(CancelTasksParams {
partition_id: partitions,
})
.await?;
} else {
error!("Failed to get client for executor ID {}", executor_id)
}
}
self.decrease_pending_queue_size(graph.available_tasks())?;
}

Ok(())
}

Expand Down Expand Up @@ -364,6 +366,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
pub async fn fail_running_job(&self, job_id: &str) -> Result<()> {
if let Some(graph) = self.get_active_execution_graph(job_id).await {
let graph = graph.read().await.clone();
let available_tasks = graph.available_tasks();
let value = self.encode_execution_graph(graph)?;

debug!("Moving job {} from Active to Failed", job_id);
Expand All @@ -372,6 +375,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
self.state
.put(Keyspace::FailedJobs, job_id.to_owned(), value)
.await?;
self.decrease_pending_queue_size(available_tasks)?
} else {
warn!("Fail to find job {} in the cache", job_id);
}
Expand All @@ -385,10 +389,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
let mut graph = graph.write().await;
graph.revive();
let graph = graph.clone();
let available_tasks = graph.available_tasks();
let value = self.encode_execution_graph(graph)?;

self.state
.put(Keyspace::ActiveJobs, job_id.to_owned(), value)
.await?;
self.increase_pending_queue_size(available_tasks)?;
} else {
warn!("Fail to find job {} in the cache", job_id);
}
Expand Down Expand Up @@ -574,4 +581,34 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
.take(7)
.collect()
}

pub fn increase_pending_queue_size(&self, num: usize) -> Result<()> {
match self.pending_task_queue_size.fetch_update(
Ordering::Relaxed,
Ordering::Relaxed,
|s| Some(s + num),
) {
Ok(_) => Ok(()),
Err(_) => Err(BallistaError::Internal(
"Unable to update pending task counter".to_owned(),
)),
}
}

pub fn decrease_pending_queue_size(&self, num: usize) -> Result<()> {
match self.pending_task_queue_size.fetch_update(
Ordering::Relaxed,
Ordering::Relaxed,
|s| Some(s - num),
) {
Ok(_) => Ok(()),
Err(_) => Err(BallistaError::Internal(
"Unable to update pending task counter".to_owned(),
)),
}
}

pub fn get_pending_task_queue_size(&self) -> usize {
self.pending_task_queue_size.load(Ordering::SeqCst)
}
}

0 comments on commit 08140ef

Please sign in to comment.