Skip to content

Commit 38e6b82

Browse files
Refactored the Reactor
1 parent d6f9447 commit 38e6b82

File tree

4 files changed

+446
-22
lines changed

4 files changed

+446
-22
lines changed

;

Lines changed: 324 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
use mio::tcp::*;
2+
use mio;
3+
use mio::{Token, Handler, EventLoop, EventSet, PollOpt, TryRead, TryWrite, Evented};
4+
use slab::Slab;
5+
use std::io::{self, Cursor, Write, Read};
6+
use std::net::{self, SocketAddr};
7+
use std::time::Duration;
8+
use std::mem;
9+
use std::thread;
10+
use std::sync::mpsc::{Receiver, Sender, channel};
11+
use result::{ThrustResult, ThrustError};
12+
use tangle::{Future, Async};
13+
use bytes::buf::Buf;
14+
use std::collections::HashMap;
15+
16+
pub enum Message {
17+
Connect(SocketAddr, Sender<Dispatch>, Sender<u64>),
18+
Bind(net::TcpListener, SocketAddr, Sender<Dispatch>),
19+
Rpc(u64, Vec<u8>),
20+
Shutdown,
21+
Close
22+
}
23+
24+
pub enum Dispatch {
25+
Data(Vec<u8>)
26+
}
27+
28+
pub enum Timeout {
29+
Reconnect(Token, SocketAddr)
30+
}
31+
32+
#[derive(Debug, PartialEq, Eq)]
33+
pub enum State {
34+
Reading,
35+
Writing,
36+
Closed
37+
}
38+
39+
pub struct Connection {
40+
stream: TcpStream,
41+
pub token: Token,
42+
state: State,
43+
chan: Sender<Dispatch>,
44+
rbuffer: Vec<u8>,
45+
wbuffer: Cursor<Vec<u8>>
46+
}
47+
48+
impl Connection {
49+
pub fn new(stream: TcpStream, token: Token, chan: Sender<Dispatch>) -> Self {
50+
Connection {
51+
stream: stream,
52+
token: token,
53+
state: State::Reading,
54+
chan: chan,
55+
rbuffer: vec![],
56+
wbuffer: Cursor::new(vec![])
57+
}
58+
}
59+
60+
pub fn ready(&mut self, event_loop: &mut EventLoop<Reactor>, events: EventSet) {
61+
match self.state {
62+
State::Reading if events.is_readable() => {
63+
self.readable();
64+
self.reregister(event_loop, self.token);
65+
},
66+
State::Writing if events.is_writable() => {
67+
self.writable();
68+
self.reregister(event_loop, self.token);
69+
},
70+
_ => {
71+
self.reregister(event_loop, self.token);
72+
}
73+
}
74+
}
75+
76+
pub fn read(&mut self) -> ThrustResult<Vec<u8>> {
77+
match self.stream.try_read_buf(&mut self.rbuffer) {
78+
Ok(Some(_)) => Ok(mem::replace(&mut self.rbuffer, vec![])),
79+
Ok(None) => Err(ThrustError::NotReady),
80+
Err(err) => Err(ThrustError::Other)
81+
}
82+
}
83+
84+
pub fn writable(&mut self) -> ThrustResult<()> {
85+
// Flush the whole buffer. The socket can, at any time, be unwritable. Thus, we
86+
// need to keep track of what we've written so far.
87+
while self.wbuffer.has_remaining() {
88+
self.flush();
89+
}
90+
91+
self.state = State::Reading;
92+
93+
Ok(())
94+
}
95+
96+
pub fn readable(&mut self) -> ThrustResult<()> {
97+
while let Ok(buf) = self.read() {
98+
self.chan.send(Dispatch::Data(buf));
99+
}
100+
101+
self.state = State::Writing;
102+
103+
Ok(())
104+
}
105+
106+
fn register(&mut self, event_loop: &mut EventLoop<Reactor>, token: Token) -> ThrustResult<()> {
107+
try!(event_loop.register(&self.stream, token, EventSet::readable(),
108+
PollOpt::edge() | PollOpt::oneshot()));
109+
Ok(())
110+
}
111+
112+
pub fn reregister(&self, event_loop: &mut EventLoop<Reactor>, token: Token) -> ThrustResult<()> {
113+
let event_set = match self.state {
114+
State::Reading => EventSet::readable(),
115+
State::Writing => EventSet::writable(),
116+
_ => EventSet::none()
117+
};
118+
119+
try!(event_loop.reregister(&self.stream, self.token, event_set, PollOpt::oneshot()));
120+
Ok(())
121+
}
122+
}
123+
124+
impl Write for Connection {
125+
fn write(&mut self, data: &[u8]) -> io::Result<usize> {
126+
try!(self.wbuffer.get_mut().write(data));
127+
try!(self.flush());
128+
Ok(0)
129+
}
130+
131+
fn flush(&mut self) -> io::Result<()> {
132+
match self.stream.try_write_buf(&mut self.wbuffer) {
133+
Ok(Some(_)) => Ok(()),
134+
Ok(None) => {
135+
println!("None");
136+
Ok(())
137+
},
138+
Err(err) => Err(err)
139+
}
140+
}
141+
}
142+
143+
/// XXX: Generalize the listener. Allow multiple listeners
144+
/// and multiple connections so we can multiplex a bunch
145+
/// of Thrift services in one `EventLoop`.
146+
pub struct Reactor {
147+
listeners: HashMap<Token, TcpListener>,
148+
connections: HashMap<Token, Connection>,
149+
sender: Sender<Dispatch>,
150+
servers: HashMap<Token, Sender<Dispatch>>,
151+
current_token: usize
152+
}
153+
154+
impl Reactor {
155+
pub fn new(sender: Sender<Dispatch>) -> Self {
156+
Reactor {
157+
listeners: HashMap::new(),
158+
connections: HashMap::new(),
159+
sender: sender,
160+
servers: HashMap::new(),
161+
current_token: 0
162+
}
163+
}
164+
165+
pub fn run(&mut self) -> ThrustResult<(EventLoop<Self>, mio::Sender<Message>)> {
166+
let mut event_loop = try!(EventLoop::new());
167+
168+
// let mut buf = mem::replace(&mut self.buf, vec![]);
169+
// for stream in buf.into_iter() {
170+
// let clone = self.sender.clone();
171+
// let token = self.connections.insert_with(|token| {
172+
// Connection::new(stream, token, clone)
173+
// }).expect("Failed to insert a new connection in the slab");
174+
175+
// self.connections[token].register(&mut event_loop, token);
176+
// }
177+
178+
let sender = event_loop.channel();
179+
Ok((event_loop, sender))
180+
}
181+
182+
pub fn accept_connection(&mut self, event_loop: &mut EventLoop<Self>, token: Token) {
183+
let mut listener = self.listeners.get_mut(&token).expect("Listener was not found.");
184+
match listener.accept() {
185+
Ok(Some(socket)) => {
186+
let (stream, _) = socket;
187+
let clone = self.servers[&token].clone();
188+
let new_token = Token(self.current_token);
189+
let mut conn = Connection::new(stream, new_token, clone);
190+
191+
self.connections.insert(new_token, conn);
192+
self.connections.get_mut(&new_token)
193+
.unwrap()
194+
.register(event_loop, new_token);
195+
196+
self.current_token += 1;
197+
},
198+
_ => {}
199+
}
200+
}
201+
}
202+
203+
impl Handler for Reactor {
204+
type Timeout = Timeout;
205+
type Message = Message;
206+
207+
fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token, events: EventSet) {
208+
println!("Token ready {:?}", token);
209+
if events.is_hup() {
210+
println!("Hup received. Socket disconnected.");
211+
if self.connections.contains_key(&token) {
212+
self.connections.remove(&token);
213+
}
214+
return;
215+
}
216+
217+
if events.is_error() {
218+
println!("Err: {:?}", events);
219+
return;
220+
}
221+
222+
if events.is_readable() && self.listeners.contains_key(&token) {
223+
self.accept_connection(event_loop, token);
224+
return;
225+
}
226+
227+
if self.connections.contains_key(&token) {
228+
println!("Connection ready.");
229+
self.connections.get_mut(&token).expect("connection was not found.").ready(event_loop, events);
230+
}
231+
}
232+
233+
fn timeout(&mut self, event_loop: &mut EventLoop<Self>, timeout: Timeout) {
234+
}
235+
236+
fn notify(&mut self, event_loop: &mut EventLoop<Self>, msg: Message) {
237+
match msg {
238+
Message::Rpc(id, data) => {
239+
println!("rpc");
240+
self.connections.get_mut(&Token(1)).expect("connection was not found.").write(&*data);
241+
},
242+
Message::Shutdown => {
243+
event_loop.shutdown();
244+
},
245+
Message::Connect(addr, tx) => {
246+
let mut mio_stream = TcpStream::connect(&addr).expect("MIO ERR");
247+
let new_token = Token(self.current_token);
248+
let mut conn = Connection::new(mio_stream, new_token, tx);
249+
250+
self.connections.insert(new_token, conn);
251+
252+
self.connections.get_mut(&new_token)
253+
.unwrap()
254+
.register(event_loop, new_token);
255+
256+
self.current_token += 1;
257+
},
258+
Message::Bind(listener, addr, tx) => {
259+
let token = Token(self.current_token);
260+
let mut lis = TcpListener::from_listener(listener, &addr).unwrap();
261+
self.servers.insert(token, tx);
262+
263+
event_loop.register(&lis, token, EventSet::readable(), PollOpt::edge()).unwrap();
264+
self.listeners.insert(token, lis);
265+
self.current_token += 1;
266+
},
267+
_ => {}
268+
}
269+
}
270+
}
271+
272+
#[cfg(test)]
273+
mod tests {
274+
use mio::EventLoop;
275+
use super::*;
276+
use std::io::Write;
277+
use std::sync::mpsc::{Receiver, Sender, channel};
278+
use tangle::{Future, Async};
279+
use std::thread;
280+
use std::time::Duration;
281+
use std::net::{TcpListener, TcpStream, SocketAddr};
282+
283+
#[test]
284+
fn create_reactor() {
285+
let (assert_tx, assert_rx) = channel();
286+
let (tx, rx) = channel();
287+
let mut reactor = Reactor::new(tx);
288+
let mut event_loop = EventLoop::new().unwrap();
289+
let sender = event_loop.channel();
290+
291+
let handle = thread::spawn(move || {
292+
event_loop.run(&mut reactor);
293+
});
294+
295+
// Establish a local TcpListener.
296+
let addr: SocketAddr = "127.0.0.1:5543".parse().unwrap();
297+
let listener = TcpListener::bind(addr.clone()).unwrap();
298+
let (s, r) = channel();
299+
300+
sender.send(Message::Bind(listener, addr.clone(), s.clone()));
301+
302+
thread::sleep(Duration::from_millis(50));
303+
304+
// Connect to that socket.
305+
let (rpc_id_tx, rpc_id_rx) = channel();
306+
sender.send(Message::Connect(addr, s, rpc_id_tx));
307+
let id = rpc_id_rx.recv().unwrap();
308+
sender.send(Message::Rpc(id, b"abc".to_vec()));
309+
310+
let server = thread::spawn(move || {
311+
for msg in r.iter() {
312+
match msg {
313+
Dispatch::Data(msg) => {
314+
println!("msg");
315+
assert_tx.send(msg).expect("Could not assert_tx");
316+
}
317+
}
318+
}
319+
});
320+
321+
let v = assert_rx.recv().expect("Error trying to assert reactor test.");
322+
assert_eq!(v.len(), 5);
323+
}
324+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#![allow(unused_imports, unused_variables, dead_code, unused_must_use, unused_mut)]
12
#![feature(associated_type_defaults)]
23

34
#[macro_use]

0 commit comments

Comments
 (0)