-
Notifications
You must be signed in to change notification settings - Fork 0
/
thread_pool.rs
86 lines (69 loc) · 2.03 KB
/
thread_pool.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
// ==================== Types ====================
type Job = Box<dyn FnOnce() + Send + 'static>;
// ==================== Structs ====================
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
struct Worker {
id: usize,
work: Option<thread::JoinHandle<()>>,
}
// ==================== Impl(ementations) ====================
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver: Arc<Mutex<mpsc::Receiver<Job>>> = Arc::new(Mutex::new(receiver));
let mut workers: Vec<Worker> = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job: Box<F> = Box::new(f);
self.sender
.as_ref()
.unwrap()
.send(job)
.unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.work.take() {
thread.join().unwrap();
}
}
}
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let work: thread::JoinHandle<()> = thread::spawn(move || loop {
let message: Result<Box<dyn FnOnce() + Send>, mpsc::RecvError> =
receiver.lock().unwrap().recv();
match message {
Ok(job) => job(),
Err(_) => {
println!("Worker {id} disconnected");
break;
}
}
});
Worker { id, work: Some(work), }
}
}