From a4f48e108e3b05487e23357db753eb5606052521 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Tue, 11 Oct 2022 15:43:14 +0300 Subject: [PATCH 01/15] sc-5792: pending queue variable --- .../rust/scheduler/src/state/task_manager.rs | 190 ++++++++++++------ 1 file changed, 124 insertions(+), 66 deletions(-) diff --git a/ballista/rust/scheduler/src/state/task_manager.rs b/ballista/rust/scheduler/src/state/task_manager.rs index 39b5446e4..77a807f02 100644 --- a/ballista/rust/scheduler/src/state/task_manager.rs +++ b/ballista/rust/scheduler/src/state/task_manager.rs @@ -22,7 +22,7 @@ use crate::state::execution_graph::{ExecutionGraph, Task}; use crate::state::executor_manager::{ExecutorManager, ExecutorReservation}; use crate::state::{decode_protobuf, encode_protobuf, with_lock}; use ballista_core::config::BallistaConfig; -#[cfg(not(test))] +// #[cfg(not(test))] use ballista_core::error::BallistaError; use ballista_core::error::Result; use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient; @@ -42,6 +42,7 @@ use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; use std::collections::HashMap; use std::default::Default; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use tokio::sync::RwLock; use tonic::transport::Channel; @@ -49,7 +50,6 @@ use tonic::transport::Channel; type ExecutorClients = Arc>>>; type ExecutionGraphCache = Arc>>>>; -#[derive(Clone)] pub struct TaskManager { state: Arc, #[allow(dead_code)] @@ -59,6 +59,25 @@ pub struct TaskManager scheduler_id: String, // Cache for active execution graphs curated by this scheduler active_job_cache: ExecutionGraphCache, + pending_task_queue_size: AtomicUsize, +} + +impl Clone + for TaskManager +{ + fn clone(&self) -> Self { + Self { + state: self.state.clone(), + clients: self.clients.clone(), + session_builder: self.session_builder, + codec: self.codec.clone(), + scheduler_id: self.scheduler_id.clone(), + active_job_cache: self.active_job_cache.clone(), + pending_task_queue_size: AtomicUsize::new( + self.pending_task_queue_size.load(Ordering::SeqCst), + ), + } + } } impl TaskManager { @@ -75,6 +94,7 @@ impl TaskManager codec, scheduler_id, active_job_cache: Arc::new(RwLock::new(HashMap::new())), + pending_task_queue_size: AtomicUsize::new(0), } } @@ -90,6 +110,7 @@ impl TaskManager ) -> Result<()> { let mut graph = ExecutionGraph::new(&self.scheduler_id, job_id, session_id, plan, queued_at)?; + info!("Submitting execution graph: {}", graph); self.state .put( @@ -195,21 +216,25 @@ impl TaskManager .collect(); let mut assignments: Vec<(String, Task)> = vec![]; - let mut pending_tasks = 0usize; let mut assign_tasks = 0usize; + let job_cache = self.active_job_cache.read().await; + for (_job_id, graph) in job_cache.iter() { let mut graph = graph.write().await; + for reservation in free_reservations.iter().skip(assign_tasks) { if let Some(task) = graph.pop_next_task(&reservation.executor_id)? { assignments.push((reservation.executor_id.clone(), task)); assign_tasks += 1; + self.decrease_pending_queue_size(1)?; } else { break; } } + if assign_tasks >= free_reservations.len() { - pending_tasks = graph.available_tasks(); + self.increase_pending_queue_size(graph.available_tasks())?; break; } } @@ -218,7 +243,7 @@ impl TaskManager for reservation in free_reservations.iter().skip(assign_tasks) { unassigned.push(reservation.clone()); } - Ok((assignments, unassigned, pending_tasks)) + Ok((assignments, unassigned, self.get_pending_task_queue_size())) } /// Mark a job as completed. This will create a key under the CompletedJobs keyspace @@ -252,54 +277,54 @@ impl TaskManager ) -> Result<()> { let lock = self.state.lock(Keyspace::ActiveJobs, "").await?; - let running_tasks = self - .get_execution_graph(job_id) - .await - .map(|graph| graph.running_tasks()) - .unwrap_or_else(|_| vec![]); + if let Ok(graph) = self.get_execution_graph(job_id).await { + let running_tasks = graph.running_tasks(); - info!( - "Cancelling {} running tasks for job {}", - running_tasks.len(), - job_id - ); + info!( + "Cancelling {} running tasks for job {}", + running_tasks.len(), + job_id + ); - self.fail_job_inner(lock, job_id, "Cancelled".to_owned()) - .await?; - - let mut tasks: HashMap<&str, Vec> = Default::default(); + let mut tasks: HashMap<&str, Vec> = Default::default(); - for (partition, executor_id) in &running_tasks { - if let Some(parts) = tasks.get_mut(executor_id.as_str()) { - parts.push(protobuf::PartitionId { - job_id: job_id.to_owned(), - stage_id: partition.stage_id as u32, - partition_id: partition.partition_id as u32, - }) - } else { - tasks.insert( - executor_id.as_str(), - vec![protobuf::PartitionId { + for (partition, executor_id) in &running_tasks { + if let Some(parts) = tasks.get_mut(executor_id.as_str()) { + parts.push(protobuf::PartitionId { job_id: job_id.to_owned(), stage_id: partition.stage_id as u32, partition_id: partition.partition_id as u32, - }], - ); + }) + } else { + tasks.insert( + executor_id.as_str(), + vec![protobuf::PartitionId { + job_id: job_id.to_owned(), + stage_id: partition.stage_id as u32, + partition_id: partition.partition_id as u32, + }], + ); + } } - } - for (executor_id, partitions) in tasks { - if let Ok(mut client) = executor_manager.get_client(executor_id).await { - client - .cancel_tasks(CancelTasksParams { - partition_id: partitions, - }) - .await?; - } else { - error!("Failed to get client for executor ID {}", executor_id) + for (executor_id, partitions) in tasks { + if let Ok(mut client) = executor_manager.get_client(executor_id).await { + client + .cancel_tasks(CancelTasksParams { + partition_id: partitions, + }) + .await?; + } else { + error!("Failed to get client for executor ID {}", executor_id) + } } + + self.decrease_pending_queue_size(graph.available_tasks())?; } + self.fail_job_inner(lock, job_id, "Cancelled".to_owned()) + .await?; + Ok(()) } @@ -349,8 +374,8 @@ impl TaskManager /// TODO this should be atomic pub async fn fail_running_job(&self, job_id: &str) -> Result<()> { if let Some(graph) = self.get_active_execution_graph(job_id).await { - let graph = graph.read().await.clone(); - let value = self.encode_execution_graph(graph)?; + let graph = graph.read().await; + let value = self.encode_execution_graph(graph.clone())?; debug!("Moving job {} from Active to Failed", job_id); let lock = self.state.lock(Keyspace::ActiveJobs, "").await?; @@ -358,6 +383,7 @@ impl TaskManager self.state .put(Keyspace::FailedJobs, job_id.to_owned(), value) .await?; + self.decrease_pending_queue_size(graph.available_tasks())? } else { warn!("Fail to find job {} in the cache", job_id); } @@ -370,11 +396,12 @@ impl TaskManager if let Some(graph) = self.get_active_execution_graph(job_id).await { let mut graph = graph.write().await; graph.revive(); - let graph = graph.clone(); - let value = self.encode_execution_graph(graph)?; + let value = self.encode_execution_graph(graph.clone())?; + self.state .put(Keyspace::ActiveJobs, job_id.to_owned(), value) .await?; + self.increase_pending_queue_size(graph.available_tasks())?; } else { warn!("Fail to find job {} in the cache", job_id); } @@ -383,22 +410,23 @@ impl TaskManager } pub async fn executor_lost(&self, executor_id: &str) -> Result<()> { - { - let job_cache = self.active_job_cache.read().await; - for (job_id, graph) in job_cache.iter() { - let graph = graph.read().await; - - for (_, running_task_executor_id) in graph.running_tasks().iter() { - if executor_id == running_task_executor_id { - self.fail_running_job(job_id).await? - } + let job_cache = self.active_job_cache.read().await; + for (job_id, graph) in job_cache.iter() { + let graph = graph.read().await; + for (_, running_task_executor_id) in graph.running_tasks().iter() { + if executor_id == running_task_executor_id { + info!( + "Failing job_id: {} as executor_id: {} is lost", + job_id, executor_id + ); + self.fail_running_job(job_id).await? } } } Ok(()) } - #[cfg(not(test))] + // #[cfg(not(test))] /// Launch the given task on the specified executor pub(crate) async fn launch_task( &self, @@ -424,16 +452,16 @@ impl TaskManager Ok(()) } - /// In unit tests, we do not have actual executors running, so it simplifies things to just noop. - #[cfg(test)] - pub(crate) async fn launch_task( - &self, - _executor: &ExecutorMetadata, - _task: Task, - _executor_manager: &ExecutorManager, - ) -> Result<()> { - Ok(()) - } + // /// In unit tests, we do not have actual executors running, so it simplifies things to just noop. + // #[cfg(test)] + // pub(crate) async fn launch_task( + // &self, + // _executor: &ExecutorMetadata, + // _task: Task, + // _executor_manager: &ExecutorManager, + // ) -> Result<()> { + // Ok(()) + // } /// Retrieve the number of available tasks for the given job. The value returned /// is strictly a point-in-time snapshot @@ -536,4 +564,34 @@ impl TaskManager .take(7) .collect() } + + pub fn increase_pending_queue_size(&self, num: usize) -> Result<()> { + match self.pending_task_queue_size.fetch_update( + Ordering::SeqCst, + Ordering::SeqCst, + |s| Some(s + num), + ) { + Ok(_) => Ok(()), + Err(_) => Err(BallistaError::Internal( + "Unable to update pending task counter".to_owned(), + )), + } + } + + pub fn decrease_pending_queue_size(&self, num: usize) -> Result<()> { + match self.pending_task_queue_size.fetch_update( + Ordering::SeqCst, + Ordering::SeqCst, + |s| Some(s - num), + ) { + Ok(_) => Ok(()), + Err(_) => Err(BallistaError::Internal( + "Unable to update pending task counter".to_owned(), + )), + } + } + + pub fn get_pending_task_queue_size(&self) -> usize { + self.pending_task_queue_size.load(Ordering::SeqCst) + } } From 6af6c884accad3bf7a6ccdf86f4ca704770225ef Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Tue, 11 Oct 2022 15:47:28 +0300 Subject: [PATCH 02/15] sc-5792: cleanup --- .../rust/scheduler/src/state/task_manager.rs | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/ballista/rust/scheduler/src/state/task_manager.rs b/ballista/rust/scheduler/src/state/task_manager.rs index 77a807f02..5ad053a6d 100644 --- a/ballista/rust/scheduler/src/state/task_manager.rs +++ b/ballista/rust/scheduler/src/state/task_manager.rs @@ -110,7 +110,6 @@ impl TaskManager ) -> Result<()> { let mut graph = ExecutionGraph::new(&self.scheduler_id, job_id, session_id, plan, queued_at)?; - info!("Submitting execution graph: {}", graph); self.state .put( @@ -222,7 +221,6 @@ impl TaskManager for (_job_id, graph) in job_cache.iter() { let mut graph = graph.write().await; - for reservation in free_reservations.iter().skip(assign_tasks) { if let Some(task) = graph.pop_next_task(&reservation.executor_id)? { assignments.push((reservation.executor_id.clone(), task)); @@ -426,7 +424,7 @@ impl TaskManager Ok(()) } - // #[cfg(not(test))] + #[cfg(not(test))] /// Launch the given task on the specified executor pub(crate) async fn launch_task( &self, @@ -452,16 +450,16 @@ impl TaskManager Ok(()) } - // /// In unit tests, we do not have actual executors running, so it simplifies things to just noop. - // #[cfg(test)] - // pub(crate) async fn launch_task( - // &self, - // _executor: &ExecutorMetadata, - // _task: Task, - // _executor_manager: &ExecutorManager, - // ) -> Result<()> { - // Ok(()) - // } + /// In unit tests, we do not have actual executors running, so it simplifies things to just noop. + #[cfg(test)] + pub(crate) async fn launch_task( + &self, + _executor: &ExecutorMetadata, + _task: Task, + _executor_manager: &ExecutorManager, + ) -> Result<()> { + Ok(()) + } /// Retrieve the number of available tasks for the given job. The value returned /// is strictly a point-in-time snapshot From 6b755af84eee30efde575c59862ad1117661006f Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Tue, 11 Oct 2022 15:52:58 +0300 Subject: [PATCH 03/15] sc-5792: cleanup --- .../rust/scheduler/src/state/task_manager.rs | 25 +++++++------------ 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/ballista/rust/scheduler/src/state/task_manager.rs b/ballista/rust/scheduler/src/state/task_manager.rs index 7690e33d6..a44847001 100644 --- a/ballista/rust/scheduler/src/state/task_manager.rs +++ b/ballista/rust/scheduler/src/state/task_manager.rs @@ -276,15 +276,6 @@ impl TaskManager ) -> Result<()> { let lock = self.state.lock(Keyspace::ActiveJobs, "").await?; - if let Ok(graph) = self.get_execution_graph(job_id).await { - let running_tasks = graph.running_tasks(); - - info!( - "Cancelling {} running tasks for job {}", - running_tasks.len(), - job_id - ); - let failed_at = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("Time went backwards") @@ -293,8 +284,15 @@ impl TaskManager self.fail_job_inner(lock, job_id, "Cancelled".to_owned(), failed_at) .await?; - let mut tasks: HashMap<&str, Vec> = Default::default(); - + if let Ok(graph) = self.get_execution_graph(job_id).await { + let running_tasks = graph.running_tasks(); + let mut tasks: HashMap<&str, Vec> = Default::default(); + + info!( + "Cancelling {} running tasks for job {}", + running_tasks.len(), + job_id + ); for (partition, executor_id) in &running_tasks { if let Some(parts) = tasks.get_mut(executor_id.as_str()) { parts.push(protobuf::PartitionId { @@ -325,13 +323,8 @@ impl TaskManager error!("Failed to get client for executor ID {}", executor_id) } } - self.decrease_pending_queue_size(graph.available_tasks())?; } - - self.fail_job_inner(lock, job_id, "Cancelled".to_owned()) - .await?; - Ok(()) } From 175f4f33c8b7f4fba8d3e598b43b32d569798e1b Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Tue, 11 Oct 2022 15:54:19 +0300 Subject: [PATCH 04/15] sc-5792: cleanup --- ballista/rust/scheduler/src/state/task_manager.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ballista/rust/scheduler/src/state/task_manager.rs b/ballista/rust/scheduler/src/state/task_manager.rs index a44847001..da38bdafd 100644 --- a/ballista/rust/scheduler/src/state/task_manager.rs +++ b/ballista/rust/scheduler/src/state/task_manager.rs @@ -22,7 +22,6 @@ use crate::state::execution_graph::{ExecutionGraph, Task}; use crate::state::executor_manager::{ExecutorManager, ExecutorReservation}; use crate::state::{decode_protobuf, encode_protobuf, with_lock}; use ballista_core::config::BallistaConfig; -// #[cfg(not(test))] use ballista_core::error::BallistaError; use ballista_core::error::Result; use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient; @@ -287,7 +286,7 @@ impl TaskManager if let Ok(graph) = self.get_execution_graph(job_id).await { let running_tasks = graph.running_tasks(); let mut tasks: HashMap<&str, Vec> = Default::default(); - + info!( "Cancelling {} running tasks for job {}", running_tasks.len(), From 5d9795ac110f4de147c90a95d2c5e0d4cb26aa47 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Tue, 11 Oct 2022 16:14:03 +0300 Subject: [PATCH 05/15] sc-5792: expose pending_task_queue size in external scaler --- .../rust/scheduler/src/scheduler_server/external_scaler.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ballista/rust/scheduler/src/scheduler_server/external_scaler.rs b/ballista/rust/scheduler/src/scheduler_server/external_scaler.rs index f55030609..cb4281e82 100644 --- a/ballista/rust/scheduler/src/scheduler_server/external_scaler.rs +++ b/ballista/rust/scheduler/src/scheduler_server/external_scaler.rs @@ -57,7 +57,8 @@ impl ExternalScaler Ok(Response::new(GetMetricsResponse { metric_values: vec![MetricValue { metric_name: INFLIGHT_TASKS_METRIC_NAME.to_string(), - metric_value: 10000000, // A very high number to saturate the HPA + metric_value: self.state.task_manager.get_pending_task_queue_size() + as i64, // A very high number to saturate the HPA }], })) } From ca7bc9261a9e8550906f26b43a4b7a63cfdf7e82 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Tue, 11 Oct 2022 16:21:10 +0300 Subject: [PATCH 06/15] sc-5792: remove comment --- ballista/rust/scheduler/src/scheduler_server/external_scaler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ballista/rust/scheduler/src/scheduler_server/external_scaler.rs b/ballista/rust/scheduler/src/scheduler_server/external_scaler.rs index cb4281e82..3306cb893 100644 --- a/ballista/rust/scheduler/src/scheduler_server/external_scaler.rs +++ b/ballista/rust/scheduler/src/scheduler_server/external_scaler.rs @@ -58,7 +58,7 @@ impl ExternalScaler metric_values: vec![MetricValue { metric_name: INFLIGHT_TASKS_METRIC_NAME.to_string(), metric_value: self.state.task_manager.get_pending_task_queue_size() - as i64, // A very high number to saturate the HPA + as i64, }], })) } From 7c7e36e10a7801498b5466f983d47091e5232a28 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Wed, 12 Oct 2022 12:02:45 +0300 Subject: [PATCH 07/15] sc-5792: add few tests --- ballista/rust/scheduler/src/state/mod.rs | 14 ++++++++++++++ ballista/rust/scheduler/src/state/task_manager.rs | 6 +++--- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs index ef98c909f..3d2fee014 100644 --- a/ballista/rust/scheduler/src/state/mod.rs +++ b/ballista/rust/scheduler/src/state/mod.rs @@ -351,24 +351,37 @@ mod test { let plan = test_graph(session_ctx.clone()).await; + assert_eq!(state.task_manager.get_pending_task_queue_size(), 0); + // Create 4 jobs so we have four pending tasks state .task_manager .submit_job("job-1", session_ctx.session_id().as_str(), plan.clone(), 0) .await?; + + assert_eq!(state.task_manager.get_pending_task_queue_size(), 1); + state .task_manager .submit_job("job-2", session_ctx.session_id().as_str(), plan.clone(), 0) .await?; + + assert_eq!(state.task_manager.get_pending_task_queue_size(), 2); + state .task_manager .submit_job("job-3", session_ctx.session_id().as_str(), plan.clone(), 0) .await?; + + assert_eq!(state.task_manager.get_pending_task_queue_size(), 3); + state .task_manager .submit_job("job-4", session_ctx.session_id().as_str(), plan.clone(), 0) .await?; + assert_eq!(state.task_manager.get_pending_task_queue_size(), 4); + let executors = test_executors(1, 4); let (executor_metadata, executor_data) = executors[0].clone(); @@ -381,6 +394,7 @@ mod test { let result = state.offer_reservation(reservations).await?; assert!(result.is_empty()); + assert_eq!(state.task_manager.get_pending_task_queue_size(), 0); // All task slots should be assigned so we should not be able to reserve more tasks let reservations = state.executor_manager.reserve_slots(4).await?; diff --git a/ballista/rust/scheduler/src/state/task_manager.rs b/ballista/rust/scheduler/src/state/task_manager.rs index da38bdafd..abd0492ce 100644 --- a/ballista/rust/scheduler/src/state/task_manager.rs +++ b/ballista/rust/scheduler/src/state/task_manager.rs @@ -122,9 +122,10 @@ impl TaskManager graph.revive(); let mut active_graph_cache = self.active_job_cache.write().await; - active_graph_cache.insert(job_id.to_owned(), Arc::new(RwLock::new(graph))); + active_graph_cache + .insert(job_id.to_owned(), Arc::new(RwLock::new(graph.clone()))); - Ok(()) + self.increase_pending_queue_size(graph.available_tasks()) } /// Get the status of of a job. First look in the active cache. @@ -232,7 +233,6 @@ impl TaskManager } if assign_tasks >= free_reservations.len() { - self.increase_pending_queue_size(graph.available_tasks())?; break; } } From a91378a37250e7c455a528fd720ec743934a8664 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Wed, 12 Oct 2022 12:53:39 +0300 Subject: [PATCH 08/15] sc-5792: small changes in code order --- .../rust/scheduler/src/state/task_manager.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/ballista/rust/scheduler/src/state/task_manager.rs b/ballista/rust/scheduler/src/state/task_manager.rs index abd0492ce..d05bf09de 100644 --- a/ballista/rust/scheduler/src/state/task_manager.rs +++ b/ballista/rust/scheduler/src/state/task_manager.rs @@ -120,12 +120,12 @@ impl TaskManager .await?; graph.revive(); + let available_tasks = graph.available_tasks(); let mut active_graph_cache = self.active_job_cache.write().await; - active_graph_cache - .insert(job_id.to_owned(), Arc::new(RwLock::new(graph.clone()))); + active_graph_cache.insert(job_id.to_owned(), Arc::new(RwLock::new(graph))); - self.increase_pending_queue_size(graph.available_tasks()) + self.increase_pending_queue_size(available_tasks) } /// Get the status of of a job. First look in the active cache. @@ -381,8 +381,9 @@ impl TaskManager /// TODO this should be atomic pub async fn fail_running_job(&self, job_id: &str) -> Result<()> { if let Some(graph) = self.get_active_execution_graph(job_id).await { - let graph = graph.read().await; - let value = self.encode_execution_graph(graph.clone())?; + let graph = graph.read().await.clone(); + let available_tasks = graph.available_tasks(); + let value = self.encode_execution_graph(graph)?; debug!("Moving job {} from Active to Failed", job_id); let lock = self.state.lock(Keyspace::ActiveJobs, "").await?; @@ -390,7 +391,7 @@ impl TaskManager self.state .put(Keyspace::FailedJobs, job_id.to_owned(), value) .await?; - self.decrease_pending_queue_size(graph.available_tasks())? + self.decrease_pending_queue_size(available_tasks)? } else { warn!("Fail to find job {} in the cache", job_id); } @@ -403,12 +404,14 @@ impl TaskManager if let Some(graph) = self.get_active_execution_graph(job_id).await { let mut graph = graph.write().await; graph.revive(); - let value = self.encode_execution_graph(graph.clone())?; + let graph = graph.clone(); + let available_tasks = graph.available_tasks(); + let value = self.encode_execution_graph(graph)?; self.state .put(Keyspace::ActiveJobs, job_id.to_owned(), value) .await?; - self.increase_pending_queue_size(graph.available_tasks())?; + self.increase_pending_queue_size(available_tasks)?; } else { warn!("Fail to find job {} in the cache", job_id); } From d7dba8e8bd5771bc0aadb390d1908ac0a34a498b Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Wed, 12 Oct 2022 14:31:08 +0300 Subject: [PATCH 09/15] sc-5792: more asserts --- ballista/rust/scheduler/src/state/mod.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs index 3d2fee014..54995871f 100644 --- a/ballista/rust/scheduler/src/state/mod.rs +++ b/ballista/rust/scheduler/src/state/mod.rs @@ -422,12 +422,16 @@ mod test { let plan = test_graph(session_ctx.clone()).await; + assert_eq!(state.task_manager.get_pending_task_queue_size(), 0); + // Create a job state .task_manager .submit_job("job-1", session_ctx.session_id().as_str(), plan.clone(), 0) .await?; + assert_eq!(state.task_manager.get_pending_task_queue_size(), 1); + let executors = test_executors(1, 4); let (executor_metadata, executor_data) = executors[0].clone(); @@ -464,6 +468,8 @@ mod test { ) .await?; + assert_eq!(state.task_manager.get_pending_task_queue_size(), 1); + state .executor_manager .register_executor(executor_metadata, executor_data, false) From e0846d0977c6deea03b52e4a73141efd6547e6bc Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Wed, 19 Oct 2022 14:33:55 +0300 Subject: [PATCH 10/15] sc-5792: fix reservation issue --- ballista/rust/scheduler/src/state/task_manager.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/ballista/rust/scheduler/src/state/task_manager.rs b/ballista/rust/scheduler/src/state/task_manager.rs index d05bf09de..48eddcd8c 100644 --- a/ballista/rust/scheduler/src/state/task_manager.rs +++ b/ballista/rust/scheduler/src/state/task_manager.rs @@ -233,6 +233,7 @@ impl TaskManager } if assign_tasks >= free_reservations.len() { + self.increase_pending_queue_size(graph.available_tasks())?; break; } } From f4331be61fd875c4a9d0c5dd4c032926789339e4 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Wed, 19 Oct 2022 14:51:20 +0300 Subject: [PATCH 11/15] sc-5792: another fix --- ballista/rust/scheduler/src/state/mod.rs | 4 ++++ ballista/rust/scheduler/src/state/task_manager.rs | 7 +++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs index 54995871f..1b965a061 100644 --- a/ballista/rust/scheduler/src/state/mod.rs +++ b/ballista/rust/scheduler/src/state/mod.rs @@ -479,12 +479,16 @@ mod test { assert_eq!(reservations.len(), 1); + assert_eq!(state.task_manager.get_pending_task_queue_size(), 1); + // Offer the reservation. It should be filled with one of the 4 pending tasks. The other 3 should // be reserved for the other 3 tasks, emitting another offer event let reservations = state.offer_reservation(reservations).await?; assert_eq!(reservations.len(), 3); + assert_eq!(state.task_manager.get_pending_task_queue_size(), 0); + // Remaining 3 task slots should be reserved for pending tasks let reservations = state.executor_manager.reserve_slots(4).await?; diff --git a/ballista/rust/scheduler/src/state/task_manager.rs b/ballista/rust/scheduler/src/state/task_manager.rs index 48eddcd8c..ea2e4277c 100644 --- a/ballista/rust/scheduler/src/state/task_manager.rs +++ b/ballista/rust/scheduler/src/state/task_manager.rs @@ -217,6 +217,7 @@ impl TaskManager let mut assignments: Vec<(String, Task)> = vec![]; let mut assign_tasks = 0usize; + let mut pending_tasts = 0usize; let job_cache = self.active_job_cache.read().await; @@ -226,6 +227,8 @@ impl TaskManager if let Some(task) = graph.pop_next_task(&reservation.executor_id)? { assignments.push((reservation.executor_id.clone(), task)); assign_tasks += 1; + + // descrease global counter self.decrease_pending_queue_size(1)?; } else { break; @@ -233,7 +236,7 @@ impl TaskManager } if assign_tasks >= free_reservations.len() { - self.increase_pending_queue_size(graph.available_tasks())?; + pending_tasts = graph.available_tasks(); break; } } @@ -242,7 +245,7 @@ impl TaskManager for reservation in free_reservations.iter().skip(assign_tasks) { unassigned.push(reservation.clone()); } - Ok((assignments, unassigned, self.get_pending_task_queue_size())) + Ok((assignments, unassigned, pending_tasts)) } /// Mark a job as completed. This will create a key under the CompletedJobs keyspace From eec1f5b2cc507bc53dcde952ebfcca4ea8cf4d58 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Thu, 20 Oct 2022 11:36:51 +0300 Subject: [PATCH 12/15] sc-5792: fmt --- ballista/rust/scheduler/src/state/task_manager.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/ballista/rust/scheduler/src/state/task_manager.rs b/ballista/rust/scheduler/src/state/task_manager.rs index ea2e4277c..5be8ab3ac 100644 --- a/ballista/rust/scheduler/src/state/task_manager.rs +++ b/ballista/rust/scheduler/src/state/task_manager.rs @@ -216,9 +216,8 @@ impl TaskManager .collect(); let mut assignments: Vec<(String, Task)> = vec![]; + let mut pending_tasks = 0usize; let mut assign_tasks = 0usize; - let mut pending_tasts = 0usize; - let job_cache = self.active_job_cache.read().await; for (_job_id, graph) in job_cache.iter() { @@ -227,8 +226,6 @@ impl TaskManager if let Some(task) = graph.pop_next_task(&reservation.executor_id)? { assignments.push((reservation.executor_id.clone(), task)); assign_tasks += 1; - - // descrease global counter self.decrease_pending_queue_size(1)?; } else { break; @@ -236,7 +233,7 @@ impl TaskManager } if assign_tasks >= free_reservations.len() { - pending_tasts = graph.available_tasks(); + pending_tasks = graph.available_tasks(); break; } } @@ -245,7 +242,7 @@ impl TaskManager for reservation in free_reservations.iter().skip(assign_tasks) { unassigned.push(reservation.clone()); } - Ok((assignments, unassigned, pending_tasts)) + Ok((assignments, unassigned, pending_tasks)) } /// Mark a job as completed. This will create a key under the CompletedJobs keyspace From 39c0f19a7a9d9e13bf35882971486fc508724fa9 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Thu, 20 Oct 2022 12:16:24 +0300 Subject: [PATCH 13/15] sc-5792: relax, take it easy --- ballista/rust/scheduler/src/state/task_manager.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/ballista/rust/scheduler/src/state/task_manager.rs b/ballista/rust/scheduler/src/state/task_manager.rs index 5be8ab3ac..8e91471e7 100644 --- a/ballista/rust/scheduler/src/state/task_manager.rs +++ b/ballista/rust/scheduler/src/state/task_manager.rs @@ -73,9 +73,7 @@ impl Clone codec: self.codec.clone(), scheduler_id: self.scheduler_id.clone(), active_job_cache: self.active_job_cache.clone(), - pending_task_queue_size: AtomicUsize::new( - self.pending_task_queue_size.load(Ordering::SeqCst), - ), + pending_task_queue_size: AtomicUsize::new(self.get_pending_task_queue_size()), } } } @@ -601,8 +599,8 @@ impl TaskManager pub fn increase_pending_queue_size(&self, num: usize) -> Result<()> { match self.pending_task_queue_size.fetch_update( - Ordering::SeqCst, - Ordering::SeqCst, + Ordering::Relaxed, + Ordering::Relaxed, |s| Some(s + num), ) { Ok(_) => Ok(()), @@ -614,8 +612,8 @@ impl TaskManager pub fn decrease_pending_queue_size(&self, num: usize) -> Result<()> { match self.pending_task_queue_size.fetch_update( - Ordering::SeqCst, - Ordering::SeqCst, + Ordering::Relaxed, + Ordering::Relaxed, |s| Some(s - num), ) { Ok(_) => Ok(()), From 0fdc8b8a061cc5107794ee4245e8a932534755d1 Mon Sep 17 00:00:00 2001 From: fsnlla <105630300+fsnlla@users.noreply.github.com> Date: Thu, 20 Oct 2022 13:54:42 +0300 Subject: [PATCH 14/15] Update ballista/rust/scheduler/src/state/task_manager.rs Co-authored-by: mpurins-coralogix <101556208+mpurins-coralogix@users.noreply.github.com> --- ballista/rust/scheduler/src/state/task_manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ballista/rust/scheduler/src/state/task_manager.rs b/ballista/rust/scheduler/src/state/task_manager.rs index 8e91471e7..ae0637d9b 100644 --- a/ballista/rust/scheduler/src/state/task_manager.rs +++ b/ballista/rust/scheduler/src/state/task_manager.rs @@ -59,7 +59,7 @@ pub struct TaskManager scheduler_id: String, // Cache for active execution graphs curated by this scheduler active_job_cache: ExecutionGraphCache, - pending_task_queue_size: AtomicUsize, + pending_task_queue_size: Arc, } impl Clone From 53cdc2b606e0a1f1263b00831886028bfc98c797 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Thu, 20 Oct 2022 13:59:20 +0300 Subject: [PATCH 15/15] sc-5792: wrap pending queue counter into Arc --- .../rust/scheduler/src/state/task_manager.rs | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/ballista/rust/scheduler/src/state/task_manager.rs b/ballista/rust/scheduler/src/state/task_manager.rs index ae0637d9b..47d223b6a 100644 --- a/ballista/rust/scheduler/src/state/task_manager.rs +++ b/ballista/rust/scheduler/src/state/task_manager.rs @@ -50,6 +50,7 @@ use tonic::transport::Channel; type ExecutorClients = Arc>>>; type ExecutionGraphCache = Arc>>>>; +#[derive(Clone)] pub struct TaskManager { state: Arc, #[allow(dead_code)] @@ -62,22 +63,6 @@ pub struct TaskManager pending_task_queue_size: Arc, } -impl Clone - for TaskManager -{ - fn clone(&self) -> Self { - Self { - state: self.state.clone(), - clients: self.clients.clone(), - session_builder: self.session_builder, - codec: self.codec.clone(), - scheduler_id: self.scheduler_id.clone(), - active_job_cache: self.active_job_cache.clone(), - pending_task_queue_size: AtomicUsize::new(self.get_pending_task_queue_size()), - } - } -} - impl TaskManager { pub fn new( state: Arc, @@ -92,7 +77,7 @@ impl TaskManager codec, scheduler_id, active_job_cache: Arc::new(RwLock::new(HashMap::new())), - pending_task_queue_size: AtomicUsize::new(0), + pending_task_queue_size: Arc::new(AtomicUsize::new(0)), } }