From 2d2bbbd6c3233b3a9b602292889ed5e93f70525c Mon Sep 17 00:00:00 2001 From: Aram Drevekenin Date: Wed, 13 Jul 2022 17:04:15 +0200 Subject: [PATCH] perf(terminal): better responsiveness (#1585) * performance(pty): only buffer terminal bytes when screen thread is backed up * style(fmt): rustfmt --- zellij-server/src/lib.rs | 1 + zellij-server/src/pty.rs | 154 +++++--------------------- zellij-server/src/terminal_bytes.rs | 162 ++++++++++++++++++++++++++++ 3 files changed, 191 insertions(+), 126 deletions(-) create mode 100644 zellij-server/src/terminal_bytes.rs diff --git a/zellij-server/src/lib.rs b/zellij-server/src/lib.rs index 490837d52f..7b9bf798bc 100644 --- a/zellij-server/src/lib.rs +++ b/zellij-server/src/lib.rs @@ -8,6 +8,7 @@ mod pty; mod pty_writer; mod route; mod screen; +mod terminal_bytes; mod thread_bus; mod ui; mod wasm_vm; diff --git a/zellij-server/src/pty.rs b/zellij-server/src/pty.rs index 717973a948..f1489ea639 100644 --- a/zellij-server/src/pty.rs +++ b/zellij-server/src/pty.rs @@ -1,31 +1,18 @@ +use crate::terminal_bytes::TerminalBytes; use crate::{ - os_input_output::{AsyncReader, ServerOsApi}, - panes::PaneId, - screen::ScreenInstruction, - thread_bus::{Bus, ThreadSenders}, - wasm_vm::PluginInstruction, + panes::PaneId, screen::ScreenInstruction, thread_bus::Bus, wasm_vm::PluginInstruction, ClientId, ServerInstruction, }; -use async_std::{ - future::timeout as async_timeout, - task::{self, JoinHandle}, -}; -use std::{ - collections::HashMap, - env, - os::unix::io::RawFd, - path::PathBuf, - time::{Duration, Instant}, -}; +use async_std::task::{self, JoinHandle}; +use std::{collections::HashMap, env, os::unix::io::RawFd, path::PathBuf}; use zellij_utils::nix::unistd::Pid; use zellij_utils::{ async_std, - errors::{get_current_ctx, ContextType, PtyContext}, + errors::{ContextType, PtyContext}, input::{ command::{RunCommand, TerminalAction}, layout::{Layout, LayoutFromYaml, Run, TabLayout}, }, - logging::debug_to_file, }; pub type VteBytes = Vec; @@ -198,100 +185,6 @@ pub(crate) fn pty_thread_main(mut pty: Pty, layout: Box) { } } -enum ReadResult { - Ok(usize), - Timeout, - Err(std::io::Error), -} - -impl From> for ReadResult { - fn from(e: std::io::Result) -> ReadResult { - match e { - Err(e) => ReadResult::Err(e), - Ok(n) => ReadResult::Ok(n), - } - } -} - -async fn deadline_read( - reader: &mut dyn AsyncReader, - deadline: Option, - buf: &mut [u8], -) -> ReadResult { - if let Some(deadline) = deadline { - let timeout = deadline.checked_duration_since(Instant::now()); - if let Some(timeout) = timeout { - match async_timeout(timeout, reader.read(buf)).await { - Ok(res) => res.into(), - _ => ReadResult::Timeout, - } - } else { - // deadline has already elapsed - ReadResult::Timeout - } - } else { - reader.read(buf).await.into() - } -} - -async fn async_send_to_screen(senders: ThreadSenders, screen_instruction: ScreenInstruction) { - task::spawn_blocking(move || senders.send_to_screen(screen_instruction)) - .await - .unwrap() -} - -fn stream_terminal_bytes( - pid: RawFd, - senders: ThreadSenders, - os_input: Box, - debug: bool, -) -> JoinHandle<()> { - let mut err_ctx = get_current_ctx(); - task::spawn({ - async move { - err_ctx.add_call(ContextType::AsyncTask); - - // After a successful read, we keep on reading additional data up to a duration of - // `RENDER_PAUSE`. This is in order to batch up PtyBytes before rendering them. - // Once `render_deadline` has elapsed, we send Render. - const RENDER_PAUSE: Duration = Duration::from_millis(30); - let mut render_deadline = None; - // Keep track of the last render time so we can render immediately if something shows - // up after a period of inactivity. This reduces input latency perception. - let mut last_render = Instant::now(); - - let mut buf = [0u8; 65536]; - let mut async_reader = os_input.async_file_reader(pid); - loop { - match deadline_read(async_reader.as_mut(), render_deadline, &mut buf).await { - ReadResult::Ok(0) | ReadResult::Err(_) => break, // EOF or error - ReadResult::Timeout => { - async_send_to_screen(senders.clone(), ScreenInstruction::Render).await; - // next read does not need a deadline as we just rendered everything - render_deadline = None; - last_render = Instant::now(); - }, - ReadResult::Ok(n_bytes) => { - let bytes = &buf[..n_bytes]; - if debug { - let _ = debug_to_file(bytes, pid); - } - async_send_to_screen( - senders.clone(), - ScreenInstruction::PtyBytes(pid, bytes.to_vec()), - ) - .await; - // if we already have a render_deadline we keep it, otherwise we set it - // to RENDER_PAUSE since the last time we rendered. - render_deadline.get_or_insert(last_render + RENDER_PAUSE); - }, - } - } - async_send_to_screen(senders.clone(), ScreenInstruction::Render).await; - } - }) -} - impl Pty { pub fn new( bus: Bus, @@ -361,13 +254,18 @@ impl Pty { .as_mut() .unwrap() .spawn_terminal(terminal_action, quit_cb, self.default_editor.clone())?; - let task_handle = stream_terminal_bytes( - pid_primary, - self.bus.senders.clone(), - self.bus.os_input.as_ref().unwrap().clone(), - self.debug_to_file, - ); - self.task_handles.insert(pid_primary, task_handle); + let terminal_bytes = task::spawn({ + let senders = self.bus.senders.clone(); + let os_input = self.bus.os_input.as_ref().unwrap().clone(); + let debug_to_file = self.debug_to_file; + async move { + TerminalBytes::new(pid_primary, senders, os_input, debug_to_file) + .listen() + .await; + } + }); + + self.task_handles.insert(pid_primary, terminal_bytes); self.id_to_child_pid.insert(pid_primary, child_fd); Ok(pid_primary) } @@ -425,13 +323,17 @@ impl Pty { )) .unwrap(); for id in new_pane_pids { - let task_handle = stream_terminal_bytes( - id, - self.bus.senders.clone(), - self.bus.os_input.as_ref().unwrap().clone(), - self.debug_to_file, - ); - self.task_handles.insert(id, task_handle); + let terminal_bytes = task::spawn({ + let senders = self.bus.senders.clone(); + let os_input = self.bus.os_input.as_ref().unwrap().clone(); + let debug_to_file = self.debug_to_file; + async move { + TerminalBytes::new(id, senders, os_input, debug_to_file) + .listen() + .await; + } + }); + self.task_handles.insert(id, terminal_bytes); } } pub fn close_pane(&mut self, id: PaneId) { diff --git a/zellij-server/src/terminal_bytes.rs b/zellij-server/src/terminal_bytes.rs new file mode 100644 index 0000000000..0a6c5a7bd0 --- /dev/null +++ b/zellij-server/src/terminal_bytes.rs @@ -0,0 +1,162 @@ +use crate::{ + os_input_output::{AsyncReader, ServerOsApi}, + screen::ScreenInstruction, + thread_bus::ThreadSenders, +}; +use async_std::{future::timeout as async_timeout, task}; +use std::{ + os::unix::io::RawFd, + time::{Duration, Instant}, +}; +use zellij_utils::{ + async_std, + errors::{get_current_ctx, ContextType}, + logging::debug_to_file, +}; + +enum ReadResult { + Ok(usize), + Timeout, + Err(std::io::Error), +} + +impl From> for ReadResult { + fn from(e: std::io::Result) -> ReadResult { + match e { + Err(e) => ReadResult::Err(e), + Ok(n) => ReadResult::Ok(n), + } + } +} + +pub(crate) struct TerminalBytes { + pid: RawFd, + senders: ThreadSenders, + async_reader: Box, + debug: bool, + render_deadline: Option, + backed_up: bool, + minimum_render_send_time: Option, + buffering_pause: Duration, + last_render: Instant, +} + +impl TerminalBytes { + pub fn new( + pid: RawFd, + senders: ThreadSenders, + os_input: Box, + debug: bool, + ) -> Self { + TerminalBytes { + pid, + senders, + debug, + async_reader: os_input.async_file_reader(pid), + render_deadline: None, + backed_up: false, + minimum_render_send_time: None, + buffering_pause: Duration::from_millis(30), + last_render: Instant::now(), + } + } + pub async fn listen(&mut self) { + // This function reads bytes from the pty and then sends them as + // ScreenInstruction::PtyBytes to screen to be parsed there + // We also send a separate instruction to Screen to render as ScreenInstruction::Render + // + // We endeavour to send a Render instruction to screen immediately after having send bytes + // to parse - this is so that the rendering is quick and smooth. However, this can cause + // latency if the screen is backed up. For this reason, if we detect a peak in the time it + // takes to send the render instruction, we assume the screen thread is backed up and so + // only send a render instruction sparingly, giving screen time to process bytes and render + // while still allowing the user to see an indication that things are happening (the + // sparing render instructions) + let mut err_ctx = get_current_ctx(); + err_ctx.add_call(ContextType::AsyncTask); + let mut buf = [0u8; 65536]; + loop { + match self.deadline_read(&mut buf).await { + // match deadline_read(async_reader.as_mut(), self.render_deadline, &mut buf).await { + ReadResult::Ok(0) | ReadResult::Err(_) => break, // EOF or error + ReadResult::Timeout => { + let time_to_send_render = + self.async_send_to_screen(ScreenInstruction::Render).await; + self.update_render_send_time(time_to_send_render); + // next read does not need a deadline as we just rendered everything + self.render_deadline = None; + self.last_render = Instant::now(); + }, + ReadResult::Ok(n_bytes) => { + let bytes = &buf[..n_bytes]; + if self.debug { + let _ = debug_to_file(bytes, self.pid); + } + self.async_send_to_screen(ScreenInstruction::PtyBytes( + self.pid, + bytes.to_vec(), + )) + .await; + if !self.backed_up { + // we're not backed up, let's send an immediate render instruction + let time_to_send_render = + self.async_send_to_screen(ScreenInstruction::Render).await; + self.update_render_send_time(time_to_send_render); + } + // if we already have a render_deadline we keep it, otherwise we set it + // to buffering_pause since the last time we rendered. + self.render_deadline + .get_or_insert(self.last_render + self.buffering_pause); + }, + } + } + self.async_send_to_screen(ScreenInstruction::Render).await; + } + async fn async_send_to_screen(&self, screen_instruction: ScreenInstruction) -> Duration { + // returns the time it blocked the thread for + let sent_at = Instant::now(); + let senders = self.senders.clone(); + task::spawn_blocking(move || senders.send_to_screen(screen_instruction)) + .await + .unwrap(); + sent_at.elapsed() + } + fn update_render_send_time(&mut self, time_to_send_render: Duration) { + match self.minimum_render_send_time.as_mut() { + Some(minimum_render_time) => { + if time_to_send_render < *minimum_render_time { + *minimum_render_time = time_to_send_render; + } + if time_to_send_render > *minimum_render_time * 10 { + // sending the render instruction took an especially long time, we can safely + // assume the screen thread is backed up and we should only send render + // instructions sparingly + self.backed_up = true; + } else if time_to_send_render < *minimum_render_time * 5 { + // the screen thread is not backed up, we atomically unset the backed_up + // indication + self.backed_up = false; + } + }, + None => { + self.minimum_render_send_time = Some(time_to_send_render); + }, + } + } + async fn deadline_read(&mut self, buf: &mut [u8]) -> ReadResult { + if let Some(deadline) = self.render_deadline { + let timeout = deadline.checked_duration_since(Instant::now()); + if let Some(timeout) = timeout { + match async_timeout(timeout, self.async_reader.read(buf)).await { + Ok(res) => res.into(), + _ => ReadResult::Timeout, + } + } else { + // deadline has already elapsed + ReadResult::Timeout + } + } else { + self.async_reader.read(buf).await.into() + } + } +}