From d574fbc0e9637a50f60deaaf03f8226d5bb22102 Mon Sep 17 00:00:00 2001 From: armand picard <33733022+armandpicard@users.noreply.github.com> Date: Sun, 13 Nov 2022 09:53:33 +0100 Subject: [PATCH] Add support for terminal size when executing command inside a container (#983) * Add support for terminal size when executing command inside a container Signed-off-by: armandpicard * remove unused import + fix example Signed-off-by: armandpicard * Use crossterm::event::EventStream to get terminal change in pod_exec_terminal size example Signed-off-by: armandpicard * Fix error when terminal_resize_tx is None Signed-off-by: armandpicard * Add some comments Signed-off-by: armandpicard * Fix typos Co-authored-by: kazk Signed-off-by: armand picard <33733022+armandpicard@users.noreply.github.com> * Fix: uniformize message send Signed-off-by: armandpicard * Remove crossterm event stream Signed-off-by: armandpicard * Rename example pod_shell_crossterm Signed-off-by: armandpicard * Make example run on window but without handling terminal size change + run fmt Signed-off-by: armandpicard Signed-off-by: armandpicard Signed-off-by: armand picard <33733022+armandpicard@users.noreply.github.com> Co-authored-by: kazk Co-authored-by: Eirik A --- examples/Cargo.toml | 6 + examples/pod_shell_crossterm.rs | 132 +++++++++++++++++++ kube-client/src/api/mod.rs | 2 +- kube-client/src/api/remote_command.rs | 182 +++++++++++++++++--------- 4 files changed, 261 insertions(+), 61 deletions(-) create mode 100644 examples/pod_shell_crossterm.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 679a5b2b4..8f7700039 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -53,6 +53,7 @@ backoff = "0.4.0" clap = { version = "4.0", default-features = false, features = ["std", "cargo", "derive"] } edit = "0.1.3" tokio-stream = { version = "0.1.9", features = ["net"] } +crossterm = {version = "0.25.0" } [[example]] name = "configmapgen_controller" @@ -206,3 +207,8 @@ path = "custom_client_trace.rs" [[example]] name = "secret_syncer" path = "secret_syncer.rs" + +[[example]] +name = "pod_shell_crossterm" +path = "pod_shell_crossterm.rs" +required-features = ["ws"] \ No newline at end of file diff --git a/examples/pod_shell_crossterm.rs b/examples/pod_shell_crossterm.rs new file mode 100644 index 000000000..3cfc27acf --- /dev/null +++ b/examples/pod_shell_crossterm.rs @@ -0,0 +1,132 @@ +use futures::{channel::mpsc::Sender, SinkExt, StreamExt}; +use k8s_openapi::api::core::v1::Pod; + +#[cfg(unix)] use crossterm::event::Event; +use kube::{ + api::{Api, AttachParams, AttachedProcess, DeleteParams, PostParams, ResourceExt, TerminalSize}, + runtime::wait::{await_condition, conditions::is_pod_running}, + Client, +}; +#[cfg(unix)] use tokio::signal; +use tokio::{io::AsyncWriteExt, select}; + +#[cfg(unix)] +// Send the new terminal size to channel when it change +async fn handle_terminal_size(mut channel: Sender) -> Result<(), anyhow::Error> { + let (width, height) = crossterm::terminal::size()?; + channel.send(TerminalSize { height, width }).await?; + + // create a stream to catch SIGWINCH signal + let mut sig = signal::unix::signal(signal::unix::SignalKind::window_change())?; + loop { + if sig.recv().await == None { + return Ok(()); + } + + let (width, height) = crossterm::terminal::size()?; + channel.send(TerminalSize { height, width }).await?; + } +} + +#[cfg(windows)] +// We don't support window for terminal size change, we only send the initial size +async fn handle_terminal_size(mut channel: Sender) -> Result<(), anyhow::Error> { + let (width, height) = crossterm::terminal::size()?; + channel.send(TerminalSize { height, width }).await?; + let mut ctrl_c = tokio::signal::windows::ctrl_c()?; + ctrl_c.recv().await; + Ok(()) +} + + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let client = Client::try_default().await?; + + let pods: Api = Api::default_namespaced(client); + let p: Pod = serde_json::from_value(serde_json::json!({ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { "name": "example" }, + "spec": { + "containers": [{ + "name": "example", + "image": "alpine", + // Do nothing + "command": ["tail", "-f", "/dev/null"], + }], + } + }))?; + // Create pod if don't exist + pods.create(&PostParams::default(), &p).await?; + + // Wait until the pod is running, otherwise we get 500 error. + let running = await_condition(pods.clone(), "example", is_pod_running()); + let _ = tokio::time::timeout(std::time::Duration::from_secs(15), running).await?; + + { + // Here we we put the terminal in 'raw' mode to directly get the input from the user and sending it to the server and getting the result from the server to display directly. + // We also watch for change in your terminal size and send it to the server so that application that use the size work properly. + crossterm::terminal::enable_raw_mode()?; + let mut attached: AttachedProcess = pods + .exec( + "example", + vec!["sh"], + &AttachParams::default().stdin(true).tty(true).stderr(false), + ) + .await?; + + let mut stdin = tokio_util::io::ReaderStream::new(tokio::io::stdin()); + let mut stdout = tokio::io::stdout(); + + let mut output = tokio_util::io::ReaderStream::new(attached.stdout().unwrap()); + let mut input = attached.stdin().unwrap(); + + let term_tx = attached.terminal_size().unwrap(); + + let mut handle_terminal_size_handle = tokio::spawn(handle_terminal_size(term_tx)); + + loop { + select! { + message = stdin.next() => { + match message { + Some(Ok(message)) => { + input.write(&message).await?; + } + _ => { + break; + }, + } + }, + message = output.next() => { + match message { + Some(Ok(message)) => { + stdout.write(&message).await?; + stdout.flush().await?; + }, + _ => { + break + }, + } + }, + result = &mut handle_terminal_size_handle => { + match result { + Ok(_) => println!("End of terminal size stream"), + Err(e) => println!("Error getting terminal size: {e:?}") + } + }, + }; + } + crossterm::terminal::disable_raw_mode()?; + } + + // Delete it + pods.delete("example", &DeleteParams::default()) + .await? + .map_left(|pdel| { + assert_eq!(pdel.name_any(), "example"); + }); + + println!(""); + Ok(()) +} diff --git a/kube-client/src/api/mod.rs b/kube-client/src/api/mod.rs index 593624ba9..b65e6855a 100644 --- a/kube-client/src/api/mod.rs +++ b/kube-client/src/api/mod.rs @@ -5,7 +5,7 @@ mod core_methods; #[cfg(feature = "ws")] mod remote_command; use std::fmt::Debug; -#[cfg(feature = "ws")] pub use remote_command::AttachedProcess; +#[cfg(feature = "ws")] pub use remote_command::{AttachedProcess, TerminalSize}; #[cfg(feature = "ws")] mod portforward; #[cfg(feature = "ws")] pub use portforward::Portforwarder; diff --git a/kube-client/src/api/remote_command.rs b/kube-client/src/api/remote_command.rs index 26b1be8c1..96b93ac47 100644 --- a/kube-client/src/api/remote_command.rs +++ b/kube-client/src/api/remote_command.rs @@ -3,22 +3,38 @@ use std::future::Future; use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status; use futures::{ - channel::oneshot, - future::{ - select, - Either::{Left, Right}, - }, + channel::{mpsc, oneshot}, FutureExt, SinkExt, StreamExt, }; +use serde::{Deserialize, Serialize}; use thiserror::Error; -use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, DuplexStream}; -use tokio_tungstenite::{tungstenite as ws, WebSocketStream}; +use tokio::{ + io::{AsyncRead, AsyncWrite, AsyncWriteExt, DuplexStream}, + select, +}; +use tokio_tungstenite::{ + tungstenite::{self as ws}, + WebSocketStream, +}; use super::AttachParams; type StatusReceiver = oneshot::Receiver; type StatusSender = oneshot::Sender; +type TerminalSizeReceiver = mpsc::Receiver; +type TerminalSizeSender = mpsc::Sender; + +/// TerminalSize define the size of a terminal +#[derive(Debug, Serialize, Deserialize)] +#[cfg_attr(docsrs, doc(cfg(feature = "ws")))] +pub struct TerminalSize { + /// width of the terminal + pub width: u16, + /// height of the terminal + pub height: u16, +} + /// Errors from attaching to a pod. #[derive(Debug, Error)] pub enum Error { @@ -57,6 +73,18 @@ pub enum Error { /// Failed to send status object #[error("failed to send status object")] SendStatus, + + /// Fail to serialize Terminalsize object + #[error("failed to serialize TerminalSize object: {0}")] + SerializeTerminalSize(#[source] serde_json::Error), + + /// Fail to send terminal size message + #[error("failed to send terminal size message")] + SendTerminalSize(#[source] ws::Error), + + /// Failed to set terminal size, tty need to be true to resize the terminal + #[error("failed to set terminal size, tty need to be true to resize the terminal")] + TtyNeedToBeTrue, } const MAX_BUF_SIZE: usize = 1024; @@ -78,6 +106,7 @@ pub struct AttachedProcess { stdout_reader: Option, stderr_reader: Option, status_rx: Option, + terminal_resize_tx: Option, task: tokio::task::JoinHandle>, } @@ -102,6 +131,12 @@ impl AttachedProcess { (None, None) }; let (status_tx, status_rx) = oneshot::channel(); + let (terminal_resize_tx, terminal_resize_rx) = if ap.tty { + let (w, r) = mpsc::channel(10); + (Some(w), Some(r)) + } else { + (None, None) + }; let task = tokio::spawn(start_message_loop( stream, @@ -109,6 +144,7 @@ impl AttachedProcess { stdout_writer, stderr_writer, status_tx, + terminal_resize_rx, )); AttachedProcess { @@ -119,6 +155,7 @@ impl AttachedProcess { stdin_writer: Some(stdin_writer), stdout_reader, stderr_reader, + terminal_resize_tx, status_rx: Some(status_rx), } } @@ -179,14 +216,29 @@ impl AttachedProcess { pub fn take_status(&mut self) -> Option>> { self.status_rx.take().map(|recv| recv.map(|res| res.ok())) } + + /// Async writer to change the terminal size + /// ```ignore + /// let mut terminal_size_writer = attached.terminal_size().unwrap(); + /// terminal_size_writer.send(TerminalSize{ + /// height: 100, + /// width: 200, + /// }).await?; + /// ``` + /// Only available if [`AttachParams`](super::AttachParams) had `tty`. + pub fn terminal_size(&mut self) -> Option { + self.terminal_resize_tx.take() + } } +// theses values come from here: https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/cri/streaming/remotecommand/websocket.go#L34 const STDIN_CHANNEL: u8 = 0; const STDOUT_CHANNEL: u8 = 1; const STDERR_CHANNEL: u8 = 2; // status channel receives `Status` object on exit. const STATUS_CHANNEL: u8 = 3; -// const RESIZE_CHANNEL: u8 = 4; +// resize channel is use to send TerminalSize object to change the size of the terminal +const RESIZE_CHANNEL: u8 = 4; async fn start_message_loop( stream: WebSocketStream, @@ -194,6 +246,7 @@ async fn start_message_loop( mut stdout: Option, mut stderr: Option, status_tx: StatusSender, + mut terminal_size_rx: Option, ) -> Result<(), Error> where S: AsyncRead + AsyncWrite + Unpin + Sized + Send + 'static, @@ -202,71 +255,80 @@ where let (mut server_send, raw_server_recv) = stream.split(); // Work with filtered messages to reduce noise. let mut server_recv = raw_server_recv.filter_map(filter_message).boxed(); - let mut server_msg = server_recv.next(); - let mut next_stdin = stdin_stream.next(); + let have_terminal_size_rx = terminal_size_rx.is_some(); loop { - match select(server_msg, next_stdin).await { - // from server - Left((Some(message), p_next_stdin)) => { - match message { - Ok(Message::Stdout(bin)) => { + let terminal_size_next = async { + match terminal_size_rx.as_mut() { + Some(tmp) => Some(tmp.next().await), + None => None, + } + }; + select! { + server_message = server_recv.next() => { + match server_message { + Some(Ok(Message::Stdout(bin))) => { if let Some(stdout) = stdout.as_mut() { stdout.write_all(&bin[1..]).await.map_err(Error::WriteStdout)?; } - } - - Ok(Message::Stderr(bin)) => { + }, + Some(Ok(Message::Stderr(bin))) => { if let Some(stderr) = stderr.as_mut() { stderr.write_all(&bin[1..]).await.map_err(Error::WriteStderr)?; } - } - - Ok(Message::Status(bin)) => { - let status = - serde_json::from_slice::(&bin[1..]).map_err(Error::DeserializeStatus)?; + }, + Some(Ok(Message::Status(bin))) => { + let status = serde_json::from_slice::(&bin[1..]).map_err(Error::DeserializeStatus)?; status_tx.send(status).map_err(|_| Error::SendStatus)?; - break; - } - - Err(err) => { + break + }, + Some(Err(err)) => { return Err(Error::ReceiveWebSocketMessage(err)); + }, + None => { + // Connection closed properly + break + }, + } + }, + stdin_message = stdin_stream.next() => { + match stdin_message { + Some(Ok(bytes)) => { + if !bytes.is_empty() { + let mut vec = Vec::with_capacity(bytes.len() + 1); + vec.push(STDIN_CHANNEL); + vec.extend_from_slice(&bytes[..]); + server_send + .send(ws::Message::binary(vec)) + .await + .map_err(Error::SendStdin)?; + } + }, + Some(Err(err)) => { + return Err(Error::ReadStdin(err)); + } + None => { + // Stdin closed (writer half dropped). + // Let the server know and disconnect. + server_send.close().await.map_err(Error::SendClose)?; + break; } } - server_msg = server_recv.next(); - next_stdin = p_next_stdin; - } - - Left((None, _)) => { - // Connection closed properly - break; - } - - // from stdin - Right((Some(Ok(bytes)), p_server_msg)) => { - if !bytes.is_empty() { - let mut vec = Vec::with_capacity(bytes.len() + 1); - vec.push(STDIN_CHANNEL); - vec.extend_from_slice(&bytes[..]); - server_send - .send(ws::Message::binary(vec)) - .await - .map_err(Error::SendStdin)?; + }, + Some(terminal_size_message) = terminal_size_next, if have_terminal_size_rx => { + match terminal_size_message { + Some(new_size) => { + let new_size = serde_json::to_vec(&new_size).map_err(Error::SerializeTerminalSize)?; + let mut vec = Vec::with_capacity(new_size.len() + 1); + vec.push(RESIZE_CHANNEL); + vec.extend_from_slice(&new_size[..]); + server_send.send(ws::Message::Binary(vec)).await.map_err(Error::SendTerminalSize)?; + }, + None => { + break + } } - server_msg = p_server_msg; - next_stdin = stdin_stream.next(); - } - - Right((Some(Err(err)), _)) => { - return Err(Error::ReadStdin(err)); - } - - Right((None, _)) => { - // Stdin closed (writer half dropped). - // Let the server know and disconnect. - server_send.close().await.map_err(Error::SendClose)?; - break; - } + }, } }