Skip to content

Commit

Permalink
sc-5792: wrap pending queue counter into Arc
Browse files Browse the repository at this point in the history
  • Loading branch information
fsdvh committed Oct 20, 2022
1 parent 0fdc8b8 commit 53cdc2b
Showing 1 changed file with 2 additions and 17 deletions.
19 changes: 2 additions & 17 deletions ballista/rust/scheduler/src/state/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use tonic::transport::Channel;
type ExecutorClients = Arc<RwLock<HashMap<String, ExecutorGrpcClient<Channel>>>>;
type ExecutionGraphCache = Arc<RwLock<HashMap<String, Arc<RwLock<ExecutionGraph>>>>>;

#[derive(Clone)]
pub struct TaskManager<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> {
state: Arc<dyn StateBackendClient>,
#[allow(dead_code)]
Expand All @@ -62,22 +63,6 @@ pub struct TaskManager<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
pending_task_queue_size: Arc<AtomicUsize>,
}

impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> Clone
for TaskManager<T, U>
{
fn clone(&self) -> Self {
Self {
state: self.state.clone(),
clients: self.clients.clone(),
session_builder: self.session_builder,
codec: self.codec.clone(),
scheduler_id: self.scheduler_id.clone(),
active_job_cache: self.active_job_cache.clone(),
pending_task_queue_size: AtomicUsize::new(self.get_pending_task_queue_size()),
}
}
}

impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U> {
pub fn new(
state: Arc<dyn StateBackendClient>,
Expand All @@ -92,7 +77,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
codec,
scheduler_id,
active_job_cache: Arc::new(RwLock::new(HashMap::new())),
pending_task_queue_size: AtomicUsize::new(0),
pending_task_queue_size: Arc::new(AtomicUsize::new(0)),
}
}

Expand Down

0 comments on commit 53cdc2b

Please sign in to comment.