Skip to content

Commit a6f59d8

Browse files
committed
Run handlers in a different thread than io
1 parent 4fd95d2 commit a6f59d8

File tree

3 files changed

+43
-24
lines changed

3 files changed

+43
-24
lines changed

src/rpc/client.rs

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ where
3434
{
3535
reader: Option<BufReader<R>>,
3636
writer: Arc<Mutex<BufWriter<W>>>,
37-
dispatch_guard: Option<JoinHandle<()>>,
37+
dispatch_guard: Option<(JoinHandle<()>, JoinHandle<()>)>,
3838
event_loop_started: bool,
3939
queue: Queue,
4040
msgid_counter: u64,
@@ -45,7 +45,7 @@ where
4545
R: Read + Send + 'static,
4646
W: Write + Send + 'static,
4747
{
48-
pub fn take_dispatch_guard(&mut self) -> JoinHandle<()> {
48+
pub fn take_dispatch_guard(&mut self) -> (JoinHandle<()>, JoinHandle<()>) {
4949
self.dispatch_guard
5050
.take()
5151
.expect("Can only take join handle after running event loop")
@@ -143,7 +143,7 @@ where
143143
}
144144
}
145145
Err(mpsc::TryRecvError::Disconnected) => {
146-
return Err(Value::from(format!("Channel disconnected ({})", method)))
146+
return Err(Value::from(format!("Channel disconnected ({})", method)));
147147
}
148148
Ok(val) => return val,
149149
};
@@ -226,27 +226,19 @@ where
226226
mut reader: BufReader<R>,
227227
writer: Arc<Mutex<BufWriter<W>>>,
228228
mut handler: H,
229-
) -> JoinHandle<()>
229+
) -> (JoinHandle<()>, JoinHandle<()>)
230230
where
231231
H: Handler + Send + 'static,
232232
{
233-
thread::spawn(move || loop {
234-
let msg = match model::decode(&mut reader) {
235-
Ok(msg) => msg,
236-
Err(e) => {
237-
error!("Error while reading: {}", e);
238-
Self::send_error_to_callers(&queue, &e);
239-
return;
240-
}
241-
};
242-
debug!("Get message {:?}", msg);
243-
match msg {
244-
model::RpcMessage::RpcRequest {
233+
let (io_to_handlers, handlers_from_io) = mpsc::channel();
234+
let rqjoin = thread::spawn(move || loop {
235+
match handlers_from_io.recv() {
236+
Ok(model::RpcMessage::RpcRequest {
245237
msgid,
246238
method,
247239
params,
248-
} => {
249-
let response = match handler.handle_request(&method, params) {
240+
}) => {
241+
let response = match handler.handle_request(method, params) {
250242
Ok(result) => model::RpcMessage::RpcResponse {
251243
msgid,
252244
result,
@@ -262,6 +254,31 @@ where
262254
let writer = &mut *writer.lock().unwrap();
263255
model::encode(writer, response).expect("Error sending RPC response");
264256
}
257+
Ok(model::RpcMessage::RpcNotification { method, params }) => {
258+
handler.handle_notify(&method, params);
259+
}
260+
Ok(_) => {
261+
error!("Handler threat does not handle notifications!");
262+
}
263+
Err(e) => {
264+
debug!("Error receiving request data: {:?}", e);
265+
}
266+
}
267+
});
268+
let iojoin = thread::spawn(move || loop {
269+
error!("Beginning of io-loop!");
270+
let msg = match model::decode(&mut reader) {
271+
Ok(msg) => msg,
272+
Err(e) => {
273+
error!("Error while reading: {}", e);
274+
Self::send_error_to_callers(&queue, &e);
275+
return;
276+
}
277+
};
278+
match msg {
279+
m @ model::RpcMessage::RpcRequest { .. } => {
280+
io_to_handlers.send(m).unwrap();
281+
}
265282
model::RpcMessage::RpcResponse {
266283
msgid,
267284
result,
@@ -274,11 +291,13 @@ where
274291
sender.send(Ok(result));
275292
}
276293
}
277-
model::RpcMessage::RpcNotification { method, params } => {
278-
handler.handle_notify(&method, params);
294+
m @ model::RpcMessage::RpcNotification { .. } => {
295+
io_to_handlers.send(m).unwrap();
296+
//handler.handle_notify(&method, params);
279297
}
280298
};
281-
})
299+
});
300+
(rqjoin, iojoin)
282301
}
283302
}
284303

src/rpc/handler.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use rmpv::Value;
22
use std::sync::mpsc;
33

44
pub trait RequestHandler {
5-
fn handle_request(&mut self, _name: &str, _args: Vec<Value>) -> Result<Value, Value> {
5+
fn handle_request(&mut self, _name: String, _args: Vec<Value>) -> Result<Value, Value> {
66
Err(Value::from("Not implemented"))
77
}
88
}
@@ -28,7 +28,7 @@ impl<H: RequestHandler> Handler for ChannelHandler<H> {
2828
}
2929

3030
impl<H: RequestHandler> RequestHandler for ChannelHandler<H> {
31-
fn handle_request(&mut self, name: &str, args: Vec<Value>) -> Result<Value, Value> {
31+
fn handle_request(&mut self, name: String, args: Vec<Value>) -> Result<Value, Value> {
3232
self.request_handler.handle_request(name, args)
3333
}
3434
}

src/session.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ impl Session {
197197
/// Wait dispatch thread to finish.
198198
///
199199
/// This can happens in case child process connection is lost for some reason.
200-
pub fn take_dispatch_guard(&mut self) -> JoinHandle<()> {
200+
pub fn take_dispatch_guard(&mut self) -> (JoinHandle<()>, JoinHandle<()>) {
201201
match self.client {
202202
ClientConnection::Child(ref mut client, _) => client.take_dispatch_guard(),
203203
ClientConnection::Parent(ref mut client) => client.take_dispatch_guard(),

0 commit comments

Comments
 (0)