Skip to content

Commit fd2f0af

Browse files
committed
fixup! WIP: Serialize requests
1 parent 7f74616 commit fd2f0af

File tree

2 files changed

+77
-83
lines changed

2 files changed

+77
-83
lines changed

examples/nested_requests.rs

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,26 @@
11
use nvim_rs::{
2-
compat::tokio::Compat, create::tokio as create, neovim::Neovim, Handler,
2+
compat::tokio::Compat,
3+
create::tokio as create,
4+
neovim::Neovim,
5+
Handler,
36
};
47

58
use async_trait::async_trait;
69

710
use rmpv::Value;
811

9-
use std::sync::Arc;
12+
use std::{
13+
sync::Arc,
14+
path::Path,
15+
};
1016

1117
use tokio::{
1218
self,
1319
process::{ChildStdin, Command},
14-
spawn,
20+
sync::Mutex,
21+
spawn
1522
};
1623

17-
use futures::lock::Mutex;
18-
use futures::select;
19-
use futures::future::FutureExt;
20-
2124
const NVIMPATH: &str = "neovim/build/bin/nvim";
2225

2326
#[derive(Clone)]
@@ -106,7 +109,7 @@ impl Handler for NeovimHandler {
106109
}
107110
}
108111

109-
#[tokio::main(basic_scheduler)]
112+
#[tokio::main]
110113
async fn main() {
111114
let rs = r#"exe ":fun M(timer)
112115
call rpcnotify(1, 'set_froodle', rpcrequest(1, 'req', 'y'))
@@ -120,8 +123,13 @@ async fn main() {
120123
froodle: froodle.clone(),
121124
};
122125

123-
let (nvim, io, methods, _child) = create::new_child_cmd(
124-
Command::new(NVIMPATH).args(&[
126+
let path = if Path::new(NVIMPATH).exists() {
127+
NVIMPATH
128+
} else {
129+
"nvim"
130+
};
131+
let (nvim, io, _child) = create::new_child_cmd(
132+
Command::new(path).args(&[
125133
"-u",
126134
"NONE",
127135
"--embed",
@@ -145,14 +153,9 @@ async fn main() {
145153

146154
// The 2nd timer closes the channel, which will be returned as an error from
147155
// the io handler. We only fail the test if we got another error
148-
let (name, res) = select! {
149-
r_m = methods.fuse() => ("methods", r_m),
150-
r_io = io.fuse() => ("io", r_io),
151-
};
152-
153-
if let Err(err) = res {
156+
if let Err(err) = io.await.unwrap() {
154157
if !err.is_channel_closed() {
155-
panic!("Error in task '{}': '{:?}'", name, err);
158+
panic!("Error in io: '{:?}'", err);
156159
}
157160
}
158161

src/neovim.rs

Lines changed: 57 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,10 @@ where
108108

109109
let (sender, receiver) = unbounded();
110110
let fut = future::try_join(
111-
Self::io_loop(reader, sender, req.clone()),
112-
req.clone().handle_rec(handler, receiver)
113-
).map_ok(|_| ());
111+
req.clone().io_loop(reader, sender),
112+
req.clone().handler_loop(handler, receiver)
113+
)
114+
.map_ok(|_| ());
114115

115116
(req, fut)
116117
}
@@ -131,14 +132,11 @@ where
131132
let (sender, receiver) = oneshot::channel();
132133

133134
let mut q = self.queue.lock().await;
134-
//eprintln!("[Id {}]: Queue before push: {:#?}", msgid, q);
135135
q.push((msgid, sender));
136-
//eprintln!("[Id {}]: Queue after push: {:#?}", msgid, q);
137136

138137
let writer = self.writer.clone();
139138
model::encode(writer, req).await?;
140139

141-
//eprintln!("[Id {}]: Receiver before return: {:#?}", msgid, receiver);
142140
Ok(receiver)
143141
}
144142

@@ -152,7 +150,6 @@ where
152150
.await
153151
.map_err(|e| CallError::SendError(*e, method.to_string()))?;
154152

155-
//eprintln!("Receiver before await: {:#?}", receiver);
156153
match receiver.await {
157154
// Result<Result<Result<Value, Value>, Arc<DecodeError>>, RecvError>
158155
Ok(Ok(r)) => Ok(r), // r is Result<Value, Value>, i.e. we got an answer
@@ -194,7 +191,7 @@ where
194191
}
195192
}
196193

197-
async fn handle_rec<H>(
194+
async fn handler_loop<H>(
198195
self,
199196
handler: H,
200197
mut receiver: UnboundedReceiver<RpcMessage>,
@@ -203,54 +200,58 @@ where
203200
H: Handler<Writer = W> + Spawner,
204201
{
205202
loop {
206-
let msg = receiver.next().await.unwrap();
207-
208-
if let RpcMessage::RpcRequest {
209-
msgid,
210-
method,
211-
params,
212-
} = msg
213-
{
214-
let handler_c = handler.clone();
215-
let neovim = self.clone();
216-
let writer = self.writer.clone();
217-
218-
handler.spawn(async move {
219-
let response = match handler_c
220-
.handle_request(method, params, neovim)
221-
.await
222-
{
223-
Ok(result) => RpcMessage::RpcResponse {
224-
msgid,
225-
result,
226-
error: Value::Nil,
227-
},
228-
Err(error) => RpcMessage::RpcResponse {
229-
msgid,
230-
result: Value::Nil,
231-
error,
232-
},
233-
};
234-
235-
model::encode(writer, response)
236-
.await
237-
.unwrap_or_else(|e| {
238-
error!("Error sending response to request {}: '{}'", msgid, e);
239-
});
240-
});
241-
} else if let RpcMessage::RpcNotification { method, params } = msg {
242-
let neovim = self.clone();
243-
handler.handle_notify(method, params, neovim).await;
244-
} else {
245-
unreachable!()
203+
let msg = match receiver.next().await {
204+
Some(msg) => msg,
205+
None => break Ok(()),
206+
};
207+
208+
match msg {
209+
RpcMessage::RpcRequest {
210+
msgid,
211+
method,
212+
params,
213+
} => {
214+
let handler_c = handler.clone();
215+
let neovim = self.clone();
216+
let writer = self.writer.clone();
217+
218+
handler.spawn(async move {
219+
let response = match handler_c
220+
.handle_request(method, params, neovim)
221+
.await
222+
{
223+
Ok(result) => RpcMessage::RpcResponse {
224+
msgid,
225+
result,
226+
error: Value::Nil,
227+
},
228+
Err(error) => RpcMessage::RpcResponse {
229+
msgid,
230+
result: Value::Nil,
231+
error,
232+
},
233+
};
234+
235+
model::encode(writer, response)
236+
.await
237+
.unwrap_or_else(|e| {
238+
error!("Error sending response to request {}: '{}'", msgid, e);
239+
});
240+
});
241+
},
242+
RpcMessage::RpcNotification {
243+
method,
244+
params
245+
} => handler.handle_notify(method, params, self.clone()).await,
246+
_ => unreachable!(),
246247
}
247248
}
248249
}
249250

250251
async fn io_loop<R>(
252+
self,
251253
mut reader: R,
252254
mut sender: UnboundedSender<RpcMessage>,
253-
neovim: Neovim<W>,
254255
) -> Result<(), Box<LoopError>>
255256
where
256257
R: AsyncRead + Send + Unpin + 'static,
@@ -261,19 +262,14 @@ where
261262
let msg = match model::decode(&mut reader, &mut rest).await {
262263
Ok(msg) => msg,
263264
Err(err) => {
264-
let e = neovim.send_error_to_callers(&neovim.queue, *err).await?;
265+
let e = self.send_error_to_callers(&self.queue, *err).await?;
265266
return Err(Box::new(LoopError::DecodeError(e, None)));
266267
}
267268
};
268269

269270
debug!("Get message {:?}", msg);
270-
if let RpcMessage::RpcResponse {
271-
msgid,
272-
result,
273-
error,
274-
} = msg {
275-
let sender = find_sender(&neovim.queue, msgid).await?;
276-
//eprintln!("Sender: {:?}", sender);
271+
if let RpcMessage::RpcResponse { msgid, result, error, } = msg {
272+
let sender = find_sender(&self.queue, msgid).await?;
277273
if error == Value::Nil {
278274
sender
279275
.send(Ok(Ok(result)))
@@ -284,11 +280,10 @@ where
284280
.map_err(|r| (msgid, r.expect("This was an OK(_)")))?;
285281
}
286282
} else if let Err(e) = sender.send(msg).await {
287-
if e.is_disconnected() {
288-
break Ok(());
289-
} else {
290-
panic!("Unexpected error while sending incoming event to handler: {}", e.to_string());
291-
}
283+
panic!(
284+
"Unexpected error while sending incoming event to handler: {}",
285+
e.to_string()
286+
);
292287
}
293288
}
294289
}
@@ -327,18 +322,14 @@ async fn find_sender(
327322
queue: &Queue,
328323
msgid: u64,
329324
) -> Result<oneshot::Sender<ResponseResult>, Box<LoopError>> {
330-
//eprintln!("Looking for sender for id {}", msgid);
331325
let mut queue = queue.lock().await;
332326

333327
let pos = match queue.iter().position(|req| req.0 == msgid) {
334328
Some(p) => p,
335329
None => return Err(msgid.into()),
336330
};
337-
//eprintln!( "Found sender at pos {}, queue length was {}", pos, queue.len());
338331

339-
//eprintln!("[Id {}]: Queue before sender removal: {:#?}", msgid, queue);
340332
let q = queue.remove(pos).1;
341-
//eprintln!("[Id {}]: Queue after sender removal: {:#?}", msgid, queue);
342333
Ok(q)
343334
}
344335

0 commit comments

Comments
 (0)