Skip to content

Commit f8776f4

Browse files
committed
Merge pull request #312 from reem/acceptor-pool
feat(server): Rewrite the accept loop into a custom thread pool.
2 parents 27b4573 + 3528fb9 commit f8776f4

File tree

5 files changed

+158
-80
lines changed

5 files changed

+158
-80
lines changed

examples/hello.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ fn hello(_: Request, res: Response) {
1313
}
1414

1515
fn main() {
16-
hyper::Server::http(Ipv4Addr(127, 0, 0, 1), 3000).listen(hello).unwrap();
16+
let _listening = hyper::Server::http(Ipv4Addr(127, 0, 0, 1), 3000)
17+
.listen(hello).unwrap();
1718
println!("Listening on http://127.0.0.1:3000");
1819
}

examples/server.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ fn echo(mut req: Request, mut res: Response) {
5151

5252
fn main() {
5353
let server = Server::http(Ipv4Addr(127, 0, 0, 1), 1337);
54-
let mut listening = server.listen(echo).unwrap();
54+
let _guard = server.listen(echo).unwrap();
5555
println!("Listening on http://127.0.0.1:1337");
56-
listening.await();
5756
}

src/lib.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#![feature(core, collections, hash, io, os, path, std_misc,
2-
slicing_syntax, box_syntax)]
2+
slicing_syntax, box_syntax, unsafe_destructor)]
33
#![deny(missing_docs)]
44
#![cfg_attr(test, deny(warnings))]
55
#![cfg_attr(test, feature(alloc, test))]
@@ -130,12 +130,16 @@ extern crate "rustc-serialize" as serialize;
130130
extern crate time;
131131
extern crate url;
132132
extern crate openssl;
133-
#[macro_use] extern crate log;
134-
#[cfg(test)] extern crate test;
135133
extern crate "unsafe-any" as uany;
136134
extern crate cookie;
137135
extern crate unicase;
138136

137+
#[macro_use]
138+
extern crate log;
139+
140+
#[cfg(test)]
141+
extern crate test;
142+
139143
pub use std::old_io::net::ip::{SocketAddr, IpAddr, Ipv4Addr, Ipv6Addr, Port};
140144
pub use mimewrapper::mime;
141145
pub use url::Url;

src/server/acceptor.rs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
use std::thread::{Thread, JoinGuard};
2+
use std::sync::Arc;
3+
use std::sync::mpsc;
4+
use net::NetworkAcceptor;
5+
6+
pub struct AcceptorPool<A: NetworkAcceptor> {
7+
acceptor: A
8+
}
9+
10+
impl<A: NetworkAcceptor> AcceptorPool<A> {
11+
/// Create a thread pool to manage the acceptor.
12+
pub fn new(acceptor: A) -> AcceptorPool<A> {
13+
AcceptorPool { acceptor: acceptor }
14+
}
15+
16+
/// Runs the acceptor pool. Blocks until the acceptors are closed.
17+
///
18+
/// ## Panics
19+
///
20+
/// Panics if threads == 0.
21+
pub fn accept<F: Fn(A::Stream) + Send + Sync>(self,
22+
work: F,
23+
threads: usize) -> JoinGuard<'static, ()> {
24+
assert!(threads != 0, "Can't accept on 0 threads.");
25+
26+
// Replace with &F when Send changes land.
27+
let work = Arc::new(work);
28+
29+
let (super_tx, supervisor_rx) = mpsc::channel();
30+
31+
let spawn =
32+
move || spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone());
33+
34+
// Go
35+
for _ in 0..threads { spawn() }
36+
37+
// Spawn the supervisor
38+
Thread::scoped(move || for () in supervisor_rx.iter() { spawn() })
39+
}
40+
}
41+
42+
fn spawn_with<A, F>(supervisor: mpsc::Sender<()>, work: Arc<F>, mut acceptor: A)
43+
where A: NetworkAcceptor,
44+
F: Fn(<A as NetworkAcceptor>::Stream) + Send + Sync {
45+
use std::old_io::EndOfFile;
46+
47+
Thread::spawn(move || {
48+
let sentinel = Sentinel::new(supervisor, ());
49+
50+
loop {
51+
match acceptor.accept() {
52+
Ok(stream) => work(stream),
53+
Err(ref e) if e.kind == EndOfFile => {
54+
debug!("Server closed.");
55+
sentinel.cancel();
56+
return;
57+
},
58+
59+
Err(e) => {
60+
error!("Connection failed: {}", e);
61+
}
62+
}
63+
}
64+
});
65+
}
66+
67+
struct Sentinel<T: Send> {
68+
value: Option<T>,
69+
supervisor: mpsc::Sender<T>,
70+
active: bool
71+
}
72+
73+
impl<T: Send> Sentinel<T> {
74+
fn new(channel: mpsc::Sender<T>, data: T) -> Sentinel<T> {
75+
Sentinel {
76+
value: Some(data),
77+
supervisor: channel,
78+
active: true
79+
}
80+
}
81+
82+
fn cancel(mut self) { self.active = false; }
83+
}
84+
85+
#[unsafe_destructor]
86+
impl<T: Send> Drop for Sentinel<T> {
87+
fn drop(&mut self) {
88+
// If we were cancelled, get out of here.
89+
if !self.active { return; }
90+
91+
// Respawn ourselves
92+
let _ = self.supervisor.send(self.value.take().unwrap());
93+
}
94+
}
95+

src/server/mod.rs

Lines changed: 53 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
//! HTTP Server
2-
use std::old_io::{Listener, EndOfFile, BufferedReader, BufferedWriter};
2+
use std::old_io::{Listener, BufferedReader, BufferedWriter};
33
use std::old_io::net::ip::{IpAddr, Port, SocketAddr};
44
use std::os;
5-
use std::sync::{Arc, TaskPool};
6-
use std::thread::{Builder, JoinGuard};
7-
5+
use std::thread::JoinGuard;
86

97
pub use self::request::Request;
108
pub use self::response::Response;
@@ -19,9 +17,13 @@ use net::{NetworkListener, NetworkStream, NetworkAcceptor,
1917
HttpAcceptor, HttpListener};
2018
use version::HttpVersion::{Http10, Http11};
2119

20+
use self::acceptor::AcceptorPool;
21+
2222
pub mod request;
2323
pub mod response;
2424

25+
mod acceptor;
26+
2527
/// A server can listen on a TCP socket.
2628
///
2729
/// Once listening, it will create a `Request`/`Response` pair for each
@@ -71,71 +73,14 @@ S: NetworkStream + Clone + Send> Server<L> {
7173
let acceptor = try!(self.listener.listen((self.ip, self.port)));
7274
let socket = try!(acceptor.socket_name());
7375

74-
let mut captured = acceptor.clone();
75-
let guard = Builder::new().name("hyper acceptor".to_string()).scoped(move || {
76-
let handler = Arc::new(handler);
77-
debug!("threads = {:?}", threads);
78-
let pool = TaskPool::new(threads);
79-
for conn in captured.incoming() {
80-
match conn {
81-
Ok(mut stream) => {
82-
debug!("Incoming stream");
83-
let handler = handler.clone();
84-
pool.execute(move || {
85-
let addr = match stream.peer_name() {
86-
Ok(addr) => addr,
87-
Err(e) => {
88-
error!("Peer Name error: {:?}", e);
89-
return;
90-
}
91-
};
92-
let mut rdr = BufferedReader::new(stream.clone());
93-
let mut wrt = BufferedWriter::new(stream);
94-
95-
let mut keep_alive = true;
96-
while keep_alive {
97-
let mut res = Response::new(&mut wrt);
98-
let req = match Request::new(&mut rdr, addr) {
99-
Ok(req) => req,
100-
Err(e@HttpIoError(_)) => {
101-
debug!("ioerror in keepalive loop = {:?}", e);
102-
return;
103-
}
104-
Err(e) => {
105-
//TODO: send a 400 response
106-
error!("request error = {:?}", e);
107-
return;
108-
}
109-
};
110-
111-
keep_alive = match (req.version, req.headers.get::<Connection>()) {
112-
(Http10, Some(conn)) if !conn.contains(&KeepAlive) => false,
113-
(Http11, Some(conn)) if conn.contains(&Close) => false,
114-
_ => true
115-
};
116-
res.version = req.version;
117-
handler.handle(req, res);
118-
debug!("keep_alive = {:?}", keep_alive);
119-
}
120-
121-
});
122-
},
123-
Err(ref e) if e.kind == EndOfFile => {
124-
debug!("server closed");
125-
break;
126-
},
127-
Err(e) => {
128-
error!("Connection failed: {}", e);
129-
continue;
130-
}
131-
}
132-
}
133-
});
76+
debug!("threads = {:?}", threads);
77+
let pool = AcceptorPool::new(acceptor.clone());
78+
let work = move |stream| handle_connection(stream, &handler);
13479

13580
Ok(Listening {
136-
acceptor: acceptor,
137-
guard: Some(guard),
81+
_guard: pool.accept(work, threads),
13882
socket: socket,
83+
acceptor: acceptor
13984
})
14085
}
14186

@@ -146,22 +91,56 @@ S: NetworkStream + Clone + Send> Server<L> {
14691

14792
}
14893

94+
fn handle_connection<S, H>(mut stream: S, handler: &H)
95+
where S: NetworkStream + Clone, H: Handler {
96+
debug!("Incoming stream");
97+
let addr = match stream.peer_name() {
98+
Ok(addr) => addr,
99+
Err(e) => {
100+
error!("Peer Name error: {:?}", e);
101+
return;
102+
}
103+
};
104+
105+
let mut rdr = BufferedReader::new(stream.clone());
106+
let mut wrt = BufferedWriter::new(stream);
107+
108+
let mut keep_alive = true;
109+
while keep_alive {
110+
let mut res = Response::new(&mut wrt);
111+
let req = match Request::new(&mut rdr, addr) {
112+
Ok(req) => req,
113+
Err(e@HttpIoError(_)) => {
114+
debug!("ioerror in keepalive loop = {:?}", e);
115+
return;
116+
}
117+
Err(e) => {
118+
//TODO: send a 400 response
119+
error!("request error = {:?}", e);
120+
return;
121+
}
122+
};
123+
124+
keep_alive = match (req.version, req.headers.get::<Connection>()) {
125+
(Http10, Some(conn)) if !conn.contains(&KeepAlive) => false,
126+
(Http11, Some(conn)) if conn.contains(&Close) => false,
127+
_ => true
128+
};
129+
res.version = req.version;
130+
handler.handle(req, res);
131+
debug!("keep_alive = {:?}", keep_alive);
132+
}
133+
}
134+
149135
/// A listening server, which can later be closed.
150136
pub struct Listening<A = HttpAcceptor> {
151137
acceptor: A,
152-
guard: Option<JoinGuard<'static, ()>>,
138+
_guard: JoinGuard<'static, ()>,
153139
/// The socket addresses that the server is bound to.
154140
pub socket: SocketAddr,
155141
}
156142

157143
impl<A: NetworkAcceptor> Listening<A> {
158-
/// Causes the current thread to wait for this listening to complete.
159-
pub fn await(&mut self) {
160-
if let Some(guard) = self.guard.take() {
161-
let _ = guard.join();
162-
}
163-
}
164-
165144
/// Stop the server from listening to its socket address.
166145
pub fn close(&mut self) -> HttpResult<()> {
167146
debug!("closing server");

0 commit comments

Comments
 (0)