Skip to content

Commit

Permalink
fix(router): handle client buffer overflow (#1955)
Browse files Browse the repository at this point in the history
* fix(router): handle client buffer overflow

* style(fmt): rustfmt
  • Loading branch information
imsnif authored Nov 18, 2022
1 parent 84a931a commit 2afb355
Showing 1 changed file with 48 additions and 5 deletions.
53 changes: 48 additions & 5 deletions zellij-server/src/os_input_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@ use nix::{
use signal_hook::consts::*;
use sysinfo::{ProcessExt, ProcessRefreshKind, System, SystemExt};
use zellij_utils::{
async_std,
async_std, channels,
data::Palette,
errors::prelude::*,
input::command::{RunCommand, TerminalAction},
interprocess,
ipc::{ClientToServerMsg, IpcReceiverWithContext, IpcSenderWithContext, ServerToClientMsg},
ipc::{
ClientToServerMsg, ExitReason, IpcReceiverWithContext, IpcSenderWithContext,
ServerToClientMsg,
},
libc, nix,
shared::default_palette,
signal_hook,
Expand Down Expand Up @@ -323,10 +326,50 @@ fn spawn_terminal(
handle_terminal(cmd, failover_cmd, orig_termios, quit_cb, terminal_id)
}

// The ClientSender is in charge of sending messages to the client on a special thread
// This is done so that when the unix socket buffer is full, we won't block the entire router
// thread
// When the above happens, the ClientSender buffers messages in hopes that the congestion will be
// freed until we runs out of buffer space.
// If we run out of buffer space, we bubble up an error sot hat the router thread will give up on
// this client and we'll stop sending messages to it.
// If the client ever becomes responsive again, we'll send one final "Buffer full" message so it
// knows what happened.
#[derive(Clone)]
struct ClientSender {
client_id: ClientId,
client_buffer_sender: channels::Sender<ServerToClientMsg>,
}

impl ClientSender {
pub fn new(client_id: ClientId, mut sender: IpcSenderWithContext<ServerToClientMsg>) -> Self {
let (client_buffer_sender, client_buffer_receiver) = channels::bounded(50);
std::thread::spawn(move || {
let err_context = || format!("failed to send message to client {client_id}");
for msg in client_buffer_receiver.iter() {
let _ = sender.send(msg).with_context(err_context);
}
let _ = sender.send(ServerToClientMsg::Exit(ExitReason::Error(
"Buffer full".to_string(),
)));
});
ClientSender {
client_id,
client_buffer_sender,
}
}
pub fn send_or_buffer(&self, msg: ServerToClientMsg) -> Result<()> {
let err_context = || format!("Client {} send buffer full", self.client_id);
self.client_buffer_sender
.try_send(msg)
.with_context(err_context)
}
}

#[derive(Clone)]
pub struct ServerOsInputOutput {
orig_termios: Arc<Mutex<termios::Termios>>,
client_senders: Arc<Mutex<HashMap<ClientId, IpcSenderWithContext<ServerToClientMsg>>>>,
client_senders: Arc<Mutex<HashMap<ClientId, ClientSender>>>,
terminal_id_to_raw_fd: Arc<Mutex<BTreeMap<u32, Option<RawFd>>>>, // A value of None means the
// terminal_id exists but is
// not connected to an fd (eg.
Expand Down Expand Up @@ -589,7 +632,7 @@ impl ServerOsApi for ServerOsInputOutput {
.with_context(err_context)?
.get_mut(&client_id)
{
sender.send(msg).with_context(err_context)
sender.send_or_buffer(msg).with_context(err_context)
} else {
Ok(())
}
Expand All @@ -601,7 +644,7 @@ impl ServerOsApi for ServerOsInputOutput {
stream: LocalSocketStream,
) -> Result<IpcReceiverWithContext<ClientToServerMsg>> {
let receiver = IpcReceiverWithContext::new(stream);
let sender = receiver.get_sender();
let sender = ClientSender::new(client_id, receiver.get_sender());
self.client_senders
.lock()
.to_anyhow()
Expand Down

0 comments on commit 2afb355

Please sign in to comment.