Skip to content

Commit

Permalink
Merge pull request #31 from qualified/ws-keep-alive
Browse files Browse the repository at this point in the history
  • Loading branch information
kazk authored Mar 16, 2022
2 parents 65bdcf8 + 5ab3de0 commit d353b05
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 10 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ serde = { version = "1.0.126", features = ["derive"] }
serde_json = "1.0.64"
url = "2.2.2"

tokio = { version = "1.6.1", features = ["fs", "process", "macros", "rt", "rt-multi-thread"] }
tokio = { version = "1.6.1", features = ["fs", "process", "macros", "rt", "rt-multi-thread", "time"] }
tokio-util = { version = "0.6.7", features = ["codec"] }
warp = { git = "https://github.com/kazk/warp", branch = "permessage-deflate", default-features = false, features = ["websocket"] }

Expand Down
40 changes: 35 additions & 5 deletions src/api/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{convert::Infallible, process::Stdio, str::FromStr};

use futures_util::{
future::{select, Either},
SinkExt, StreamExt,
stream, SinkExt, StreamExt,
};
use tokio::{fs, process::Command};
use url::Url;
Expand Down Expand Up @@ -108,7 +108,19 @@ async fn connected(
let mut server_send = lsp::framed::writer(server.stdin.take().unwrap());
let mut server_recv = lsp::framed::reader(server.stdout.take().unwrap());
let (mut client_send, client_recv) = ws.split();
let mut client_recv = client_recv.filter_map(filter_map_warp_ws_message).boxed();
let client_recv = client_recv
.filter_map(filter_map_warp_ws_message)
// Chain this with `Done` so we know when the client disconnects
.chain(stream::once(async { Ok(Message::Done) }));
// Tick every 30s so we can ping the client to keep the connection alive
let ticks = stream::unfold(
tokio::time::interval(std::time::Duration::from_secs(30)),
|mut interval| async move {
interval.tick().await;
Some((Ok(Message::Tick), interval))
},
);
let mut client_recv = stream::select(client_recv, ticks).boxed();

let mut client_msg = client_recv.next();
let mut server_msg = server_recv.next();
Expand Down Expand Up @@ -145,15 +157,26 @@ async fn connected(
tracing::info!("received Close message");
}

// Ping the client to keep the connection alive
Some(Ok(Message::Tick)) => {
tracing::debug!("pinging the client");
client_send.send(warp::ws::Message::ping(vec![])).await?;
}

// Connection closed
Some(Ok(Message::Done)) => {
tracing::info!("connection closed");
break;
}

// WebSocket Error
Some(Err(err)) => {
tracing::error!("websocket error: {}", err);
}

// Connection closed
None => {
tracing::info!("connection closed");
break;
// Unreachable because of the interval stream
unreachable!("should never yield None");
}
}

Expand Down Expand Up @@ -206,13 +229,20 @@ async fn connected(
}

// Type to describe a message from the client conveniently.
#[allow(clippy::large_enum_variant)]
#[allow(clippy::enum_variant_names)]
enum Message {
// Valid LSP message
Message(lsp::Message),
// Invalid JSON
Invalid(String),
// Close message
Close,
// Ping the client to keep the connection alive.
// Note that this is from the interval stream and not actually from client.
Tick,
// Client disconnected. Necessary because the combined stream is infinite.
Done,
}

// Parse the message and ignore anything we don't care.
Expand Down
2 changes: 1 addition & 1 deletion src/lsp/framed/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub fn parse_message(input: &[u8]) -> IResult<&[u8], &[u8]> {

let header = terminated(terminated(content_len, opt(content_type)), crlf);

let header = map_res(header, |s: &[u8]| str::from_utf8(s));
let header = map_res(header, str::from_utf8);
let length = map_res(header, |s: &str| s.parse::<usize>());
let mut message = length_data(length);

Expand Down
1 change: 1 addition & 0 deletions src/lsp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use types::Unknown;

#[derive(Clone, Debug, PartialEq, Deserialize)]
#[serde(untagged)]
#[allow(clippy::large_enum_variant)]
pub enum Message {
Request(Request),

Expand Down
1 change: 1 addition & 0 deletions src/lsp/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use super::types::Id;
/// [Request message]: https://microsoft.github.io/language-server-protocol/specifications/specification-current/#requestMessage
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(tag = "method")]
#[allow(clippy::large_enum_variant)]
pub enum Request {
// To Server
/// > The [initialize] request is sent as the first request from the client
Expand Down

0 comments on commit d353b05

Please sign in to comment.