diff --git a/ext/io/lib.rs b/ext/io/lib.rs index 04ed58ef2708bf..43bfa929502bd2 100644 --- a/ext/io/lib.rs +++ b/ext/io/lib.rs @@ -48,6 +48,13 @@ use winapi::um::processenv::GetStdHandle; #[cfg(windows)] use winapi::um::winbase; +#[cfg(windows)] +use std::sync::Arc; +#[cfg(windows)] +use std::sync::Condvar; +#[cfg(windows)] +use std::sync::Mutex; + pub mod fs; mod pipe; #[cfg(windows)] @@ -106,12 +113,21 @@ deno_core::extension!(deno_io, }, state = |state, options| { if let Some(stdio) = options.stdio { + #[cfg(windows)] + let stdin_state = { + let st = Arc::new(Mutex::new(WinTtyState::default())); + state.put(st.clone()); + st + }; + #[cfg(unix)] + let stdin_state = (); + let t = &mut state.resource_table; let rid = t.add(fs::FileResource::new( Rc::new(match stdio.stdin.pipe { StdioPipeInner::Inherit => StdFileResourceInner::new( - StdFileResourceKind::Stdin, + StdFileResourceKind::Stdin(stdin_state), STDIN_HANDLE.try_clone().unwrap(), ), StdioPipeInner::File(pipe) => StdFileResourceInner::file(pipe), @@ -317,14 +333,29 @@ impl Resource for ChildStderrResource { } } -#[derive(Clone, Copy)] +#[cfg(windows)] +use winapi::um::wincon::CONSOLE_SCREEN_BUFFER_INFO; + +#[cfg(windows)] +#[derive(Default)] +pub struct WinTtyState { + pub cancelled: bool, + pub reading: bool, + pub screen_buffer_info: Option, + pub cvar: Arc, +} + +#[derive(Clone)] enum StdFileResourceKind { File, // For stdout and stderr, we sometimes instead use std::io::stdout() directly, // because we get some Windows specific functionality for free by using Rust // std's wrappers. So we take a bit of a complexity hit in order to not // have to duplicate the functionality in Rust's std/src/sys/windows/stdio.rs - Stdin, + #[cfg(windows)] + Stdin(Arc>), + #[cfg(not(windows))] + Stdin(()), Stdout, Stderr, } @@ -435,7 +466,7 @@ impl crate::fs::File for StdFileResourceInner { // std/src/sys/windows/stdio.rs in Rust's source code). match self.kind { StdFileResourceKind::File => self.with_sync(|file| Ok(file.write(buf)?)), - StdFileResourceKind::Stdin => { + StdFileResourceKind::Stdin(_) => { Err(Into::::into(ErrorKind::Unsupported).into()) } StdFileResourceKind::Stdout => { @@ -457,7 +488,7 @@ impl crate::fs::File for StdFileResourceInner { fn read_sync(self: Rc, buf: &mut [u8]) -> FsResult { match self.kind { - StdFileResourceKind::File | StdFileResourceKind::Stdin => { + StdFileResourceKind::File | StdFileResourceKind::Stdin(_) => { self.with_sync(|file| Ok(file.read(buf)?)) } StdFileResourceKind::Stdout | StdFileResourceKind::Stderr => { @@ -471,7 +502,7 @@ impl crate::fs::File for StdFileResourceInner { StdFileResourceKind::File => { self.with_sync(|file| Ok(file.write_all(buf)?)) } - StdFileResourceKind::Stdin => { + StdFileResourceKind::Stdin(_) => { Err(Into::::into(ErrorKind::Unsupported).into()) } StdFileResourceKind::Stdout => { @@ -497,7 +528,7 @@ impl crate::fs::File for StdFileResourceInner { .with_inner_blocking_task(move |file| Ok(file.write_all(&buf)?)) .await } - StdFileResourceKind::Stdin => { + StdFileResourceKind::Stdin(_) => { Err(Into::::into(ErrorKind::Unsupported).into()) } StdFileResourceKind::Stdout => { @@ -538,7 +569,7 @@ impl crate::fs::File for StdFileResourceInner { }) .await } - StdFileResourceKind::Stdin => { + StdFileResourceKind::Stdin(_) => { Err(Into::::into(ErrorKind::Unsupported).into()) } StdFileResourceKind::Stdout => { @@ -568,7 +599,7 @@ impl crate::fs::File for StdFileResourceInner { fn read_all_sync(self: Rc) -> FsResult> { match self.kind { - StdFileResourceKind::File | StdFileResourceKind::Stdin => { + StdFileResourceKind::File | StdFileResourceKind::Stdin(_) => { let mut buf = Vec::new(); self.with_sync(|file| Ok(file.read_to_end(&mut buf)?))?; Ok(buf) @@ -580,7 +611,7 @@ impl crate::fs::File for StdFileResourceInner { } async fn read_all_async(self: Rc) -> FsResult> { match self.kind { - StdFileResourceKind::File | StdFileResourceKind::Stdin => { + StdFileResourceKind::File | StdFileResourceKind::Stdin(_) => { self .with_inner_blocking_task(|file| { let mut buf = Vec::new(); @@ -736,19 +767,97 @@ impl crate::fs::File for StdFileResourceInner { self: Rc, mut buf: BufMutView, ) -> FsResult<(usize, BufMutView)> { - self - .with_inner_blocking_task(|file| { - let nread = file.read(&mut buf)?; - Ok((nread, buf)) - }) - .await + match &self.kind { + /* On Windows, we need to handle special read cancellation logic for stdin */ + #[cfg(windows)] + StdFileResourceKind::Stdin(state) => { + loop { + let state = state.clone(); + + let fut = self.with_inner_blocking_task(move |file| { + /* Start reading, and set the reading flag to true */ + state.lock().unwrap().reading = true; + let nread = match file.read(&mut buf) { + Ok(nread) => nread, + Err(e) => return Err((e.into(), buf)), + }; + + let mut state = state.lock().unwrap(); + state.reading = false; + + /* If we canceled the read by sending a VK_RETURN event, restore + the screen state to undo the visual effect of the VK_RETURN event */ + if state.cancelled { + if let Some(screen_buffer_info) = state.screen_buffer_info { + // SAFETY: WinAPI calls to open conout$ and restore visual state. + unsafe { + let handle = winapi::um::fileapi::CreateFileW( + "conout$" + .encode_utf16() + .chain(Some(0)) + .collect::>() + .as_ptr(), + winapi::um::winnt::GENERIC_READ + | winapi::um::winnt::GENERIC_WRITE, + winapi::um::winnt::FILE_SHARE_READ + | winapi::um::winnt::FILE_SHARE_WRITE, + std::ptr::null_mut(), + winapi::um::fileapi::OPEN_EXISTING, + 0, + std::ptr::null_mut(), + ); + + let mut pos = screen_buffer_info.dwCursorPosition; + /* If the cursor was at the bottom line of the screen buffer, the + VK_RETURN would have caused the buffer contents to scroll up by + one line. The right position to reset the cursor to is therefore one + line higher */ + if pos.Y == screen_buffer_info.dwSize.Y - 1 { + pos.Y -= 1; + } + + winapi::um::wincon::SetConsoleCursorPosition(handle, pos); + winapi::um::handleapi::CloseHandle(handle); + } + } + + /* Reset the cancelled flag */ + state.cancelled = false; + + /* Unblock the main thread */ + state.cvar.notify_one(); + + return Err((FsError::FileBusy, buf)); + } + + Ok((nread, buf)) + }); + + match fut.await { + Err((FsError::FileBusy, b)) => { + buf = b; + continue; + } + other => return other.map_err(|(e, _)| e), + } + } + } + _ => { + self + .with_inner_blocking_task(|file| { + let nread = file.read(&mut buf)?; + Ok((nread, buf)) + }) + .await + } + } } fn try_clone_inner(self: Rc) -> FsResult> { let inner: &Option<_> = &self.cell.borrow(); match inner { Some(inner) => Ok(Rc::new(StdFileResourceInner { - kind: self.kind, + kind: self.kind.clone(), cell: RefCell::new(Some(inner.try_clone()?)), cell_async_task_queue: Default::default(), handle: self.handle, diff --git a/runtime/ops/tty.rs b/runtime/ops/tty.rs index 3d721734cded0b..f78dcc9c784af7 100644 --- a/runtime/ops/tty.rs +++ b/runtime/ops/tty.rs @@ -13,6 +13,13 @@ use rustyline::KeyCode; use rustyline::KeyEvent; use rustyline::Modifiers; +#[cfg(windows)] +use deno_io::WinTtyState; +#[cfg(windows)] +use std::sync::Arc; +#[cfg(windows)] +use std::sync::Mutex; + #[cfg(unix)] use deno_core::ResourceId; #[cfg(unix)] @@ -94,6 +101,7 @@ fn op_set_raw( #[cfg(windows)] { use winapi::shared::minwindef::FALSE; + use winapi::um::consoleapi; let handle = handle_or_fd; @@ -116,6 +124,79 @@ fn op_set_raw( mode_raw_input_off(original_mode) }; + let stdin_state = state.borrow::>>(); + let mut stdin_state = stdin_state.lock().unwrap(); + + if stdin_state.reading { + let cvar = stdin_state.cvar.clone(); + + /* Trick to unblock an ongoing line-buffered read operation if not already pending. + See https://github.com/libuv/libuv/pull/866 for prior art */ + if original_mode & COOKED_MODE != 0 && !stdin_state.cancelled { + // SAFETY: Write enter key event to force the console wait to return. + let record = unsafe { + let mut record: wincon::INPUT_RECORD = std::mem::zeroed(); + record.EventType = wincon::KEY_EVENT; + record.Event.KeyEvent_mut().wVirtualKeyCode = + winapi::um::winuser::VK_RETURN as u16; + record.Event.KeyEvent_mut().bKeyDown = 1; + record.Event.KeyEvent_mut().wRepeatCount = 1; + *record.Event.KeyEvent_mut().uChar.UnicodeChar_mut() = '\r' as u16; + record.Event.KeyEvent_mut().dwControlKeyState = 0; + record.Event.KeyEvent_mut().wVirtualScanCode = + winapi::um::winuser::MapVirtualKeyW( + winapi::um::winuser::VK_RETURN as u32, + winapi::um::winuser::MAPVK_VK_TO_VSC, + ) as u16; + record + }; + stdin_state.cancelled = true; + + // SAFETY: winapi call to open conout$ and save screen state. + let active_screen_buffer = unsafe { + /* Save screen state before sending the VK_RETURN event */ + let handle = winapi::um::fileapi::CreateFileW( + "conout$" + .encode_utf16() + .chain(Some(0)) + .collect::>() + .as_ptr(), + winapi::um::winnt::GENERIC_READ | winapi::um::winnt::GENERIC_WRITE, + winapi::um::winnt::FILE_SHARE_READ + | winapi::um::winnt::FILE_SHARE_WRITE, + std::ptr::null_mut(), + winapi::um::fileapi::OPEN_EXISTING, + 0, + std::ptr::null_mut(), + ); + + let mut active_screen_buffer = std::mem::zeroed(); + winapi::um::wincon::GetConsoleScreenBufferInfo( + handle, + &mut active_screen_buffer, + ); + winapi::um::handleapi::CloseHandle(handle); + active_screen_buffer + }; + stdin_state.screen_buffer_info = Some(active_screen_buffer); + + // SAFETY: winapi call to write the VK_RETURN event. + if unsafe { + winapi::um::wincon::WriteConsoleInputW(handle, &record, 1, &mut 0) + } == FALSE + { + return Err(Error::last_os_error().into()); + } + + /* Wait for read thread to acknowledge the cancellation to ensure that nothing + interferes with the screen state. + NOTE: `wait_while` automatically unlocks stdin_state */ + let _unused = cvar + .wait_while(stdin_state, |state| state.cancelled) + .unwrap(); + } + } + // SAFETY: winapi call if unsafe { consoleapi::SetConsoleMode(handle, new_mode) } == FALSE { return Err(Error::last_os_error().into()); diff --git a/tests/integration/run_tests.rs b/tests/integration/run_tests.rs index f3fc18fee25d9d..e52aff232e393e 100644 --- a/tests/integration/run_tests.rs +++ b/tests/integration/run_tests.rs @@ -3202,6 +3202,19 @@ itest!(byte_order_mark { output: "run/byte_order_mark.out", }); +#[test] +fn process_stdin_read_unblock() { + TestContext::default() + .new_command() + .args_vec(["run", "run/process_stdin_unblock.mjs"]) + .with_pty(|mut console| { + console.write_raw("b"); + console.human_delay(); + console.write_line_raw("s"); + console.expect_all(&["1", "1"]); + }); +} + #[test] fn issue9750() { TestContext::default() diff --git a/tests/testdata/run/process_stdin_unblock.mjs b/tests/testdata/run/process_stdin_unblock.mjs new file mode 100644 index 00000000000000..bbeea2afb3111f --- /dev/null +++ b/tests/testdata/run/process_stdin_unblock.mjs @@ -0,0 +1,21 @@ +import process from "node:process"; + +function prompt() { + process.stdin.setRawMode(true); + + const { promise, resolve } = Promise.withResolvers(); + + const onData = (buf) => { + process.stdin.setRawMode(false); + process.stdin.removeListener("data", onData); + console.log(buf.length); + resolve(); + }; + + process.stdin.on("data", onData); + return promise; +} + +await prompt(); +await prompt(); +Deno.exit(0);