Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 106 additions & 1 deletion doc/nio.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ nvim-nio *nvim-nio*
nio....................................................................|nio|
nio.control....................................................|nio.control|
nio.lsp............................................................|nio.lsp|
nio.process....................................................|nio.process|
nio.streams....................................................|nio.streams|
nio.uv..............................................................|nio.uv|
nio.ui..............................................................|nio.ui|
nio.tests........................................................|nio.tests|
Expand Down Expand Up @@ -349,6 +351,108 @@ Return~
`(nio.lsp.Client)`


==============================================================================
nio.process *nio.process*


*nio.process.Process*
Wrapper for a running process, providing access to its stdio streams and
methods to interact with it.

Fields~
{pid} `(integer)` ID of the invoked process
{signal} `(fun(signal: integer|uv.aliases.signals))` Send a signal to
the process
{result} `(async fun(): number)` Wait for the process to exit and return the
exit code
{stdin} `(nio.streams.OSStreamWriter)` Stream to write to the process stdin.
{stdout} `(nio.streams.OSStreamReader)` Stream to read from the process
stdout.
{stderr} `(nio.streams.OSStreamReader)` Stream to read from the process
stderr.

*nio.process.run()*
`run`({opts})

Run a process asynchronously.
>lua
local first = nio.process.run({
cmd = "printf", args = { "hello" }
})

local second = nio.process.run({
cmd = "cat", stdin = first.stdout
})

local output = second.stdout.read()
print(output)
<
Parameters~
{opts} `(nio.process.RunOpts)`
Return~
`(nio.process.Process)`

*nio.process.RunOpts*
Fields~
{cmd} `(string)` Command to run
{args?} `(string[])` Arguments to pass to the command
{stdin?} `(integer|nio.streams.OSStreamReader|uv.uv_pipe_t|uv_pipe_t)` Stream,
pipe or file descriptor to use as stdin.
{stdout?} `(integer|nio.streams.OSStreamWriter|uv.uv_pipe_t|uv_pipe_t)`
Stream,
pipe or file descriptor to use as stdout.
{stderr?} `(integer|nio.streams.OSStreamWriter|uv.uv_pipe_t|uv_pipe_t)`
Stream,
pipe or file descriptor to use as stderr.
{env?} `(table<string, string>)` Environment variables to pass to the
process
{cwd?} `(string)` Current working directory of the process
{uid?} `(integer)` User ID of the process
{gid?} `(integer)` Group ID of the process


==============================================================================
nio.streams *nio.streams*


*nio.streams.Stream*
Fields~
{close} `(async fun(): nil)` Close the stream

*nio.streams.Reader*
Inherits: `nio.streams.Stream`

Fields~
{read} `(async fun(n?: integer): string)` Read data from the stream,
optionally up to n bytes otherwise until EOF is reached

*nio.streams.Writer*
Inherits: `nio.streams.Stream`

Fields~
{write} `(async fun(data: string): nil)` Write data to the stream

*nio.streams.OSStream*
Inherits: `nio.streams.Stream`

Fields~
{fd} `(integer)` The file descriptor of the stream

*nio.streams.StreamReader*
Inherits: `nio.streams.Reader, nio.streams.Stream`

Inherits: `nio.streams.Writer, nio.streams.Stream`
*nio.streams.StreamWriter*


*nio.streams.OSStreamReader*
Inherits: `nio.streams.StreamReader, nio.streams.OSStream`

Inherits: `nio.streams.StreamWriter, nio.streams.OSStream`
*nio.streams.OSStreamWriter*



==============================================================================
nio.uv *nio.uv*

Expand Down Expand Up @@ -436,7 +540,8 @@ length: integer): (string|nil,integer|nil))`
{fs_scandir} `(async fun(path: string): (string|nil,uv_fs_t|nil))`
{shutdown} `(async fun(stream: uv_stream_t): string|nil)`
{listen} `(async fun(stream: uv_stream_t): string|nil)`
{write} `(async fun(stream: uv_stream_t): string|nil)`
{write} `(async fun(stream: uv_stream_t, data: string|string[]):
uv.uv_write_t|nil)`
{write2} `(async fun(stream: uv_stream_t, data: string|string[], send_handle:
uv_stream_t): string|nil)`

Expand Down
2 changes: 2 additions & 0 deletions lua/nio/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ local uv = require("nio.uv")
local tests = require("nio.tests")
local ui = require("nio.ui")
local lsp = require("nio.lsp")
local process = require("nio.process")

---@tag nvim-nio

Expand All @@ -26,6 +27,7 @@ nio.ui = ui
nio.tests = tests
nio.tasks = tasks
nio.lsp = lsp
nio.process = process

--- Run a function in an async context. This is the entrypoint to all async
--- functionality.
Expand Down
109 changes: 109 additions & 0 deletions lua/nio/process.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
local streams = require("nio.streams")
local control = require("nio.control")

local nio = {}
---@toc_entry nio.process
---@class nio.process
nio.process = {}

---@class nio.process.Process
--- Wrapper for a running process, providing access to its stdio streams and
--- methods to interact with it.
---
---@field pid integer ID of the invoked process
---@field signal fun(signal: integer|uv.aliases.signals) Send a signal to
--- the process
---@field result async fun(): number Wait for the process to exit and return the
--- exit code
---@field stdin nio.streams.OSStreamWriter Stream to write to the process stdin.
---@field stdout nio.streams.OSStreamReader Stream to read from the process
--- stdout.
---@field stderr nio.streams.OSStreamReader Stream to read from the process
--- stderr.

--- Run a process asynchronously.
--- ```lua
--- local first = nio.process.run({
--- cmd = "printf", args = { "hello" }
--- })
---
--- local second = nio.process.run({
--- cmd = "cat", stdin = first.stdout
--- })
---
--- local output = second.stdout.read()
--- print(output)
--- ```
---@param opts nio.process.RunOpts
---@return nio.process.Process
function nio.process.run(opts)
opts = vim.tbl_extend("force", { hide = true }, opts)

local cmd = opts.cmd
local args = opts.args

local exit_code_future = control.future()

local stdout = streams.reader(opts.stdout)
local stderr = streams.reader(opts.stderr)
local stdin = streams.writer(opts.stdin)

local stdio = { stdin.pipe, stdout.pipe, stderr.pipe }

local handle, pid, spawn_err = vim.loop.spawn(cmd, {
args = args,
stdio = stdio,
env = opts.env,
cwd = opts.cwd,
uid = opts.uid,
gid = opts.gid,
verbatim = opts.verbatim,
detached = opts.detached,
hide = opts.hide,
}, function(_, code)
exit_code_future.set(code)
end)

assert(not spawn_err, spawn_err)

local process = {
pid = pid,
signal = function(signal)
vim.loop.process_kill(handle, signal)
end,
stdin = {
write = stdin.write,
fd = stdin.pipe:fileno(),
close = stdin.close,
},
stdout = {
read = stdout.read,
fd = stdout.pipe:fileno(),
close = stdout.close,
},
stderr = {
read = stderr.read,
fd = stderr.pipe:fileno(),
close = stderr.close,
},
result = exit_code_future.wait,
}
return process
end

---@class nio.process.RunOpts
---@field cmd string Command to run
---@field args? string[] Arguments to pass to the command
---@field stdin? integer|nio.streams.OSStreamReader|uv.uv_pipe_t|uv_pipe_t Stream,
--- pipe or file descriptor to use as stdin.
---@field stdout? integer|nio.streams.OSStreamWriter|uv.uv_pipe_t|uv_pipe_t Stream,
--- pipe or file descriptor to use as stdout.
---@field stderr? integer|nio.streams.OSStreamWriter|uv.uv_pipe_t|uv_pipe_t Stream,
--- pipe or file descriptor to use as stderr.
---@field env? table<string, string> Environment variables to pass to the
--- process
---@field cwd? string Current working directory of the process
---@field uid? integer User ID of the process
---@field gid? integer Group ID of the process

return nio.process
125 changes: 125 additions & 0 deletions lua/nio/streams.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
local tasks = require("nio.tasks")
local control = require("nio.control")
local uv = require("nio.uv")

local nio = {}

---@toc_entry nio.streams
---@class nio.streams
nio.streams = {}

---@class nio.streams.Stream
---@field close async fun(): nil Close the stream

---@class nio.streams.Reader : nio.streams.Stream
---@field read async fun(n?: integer): string Read data from the stream,
--- optionally up to n bytes otherwise until EOF is reached

---@class nio.streams.Writer : nio.streams.Stream
---@field write async fun(data: string): nil Write data to the stream

---@class nio.streams.OSStream : nio.streams.Stream
---@field fd integer The file descriptor of the stream

---@class nio.streams.StreamReader : nio.streams.Reader, nio.streams.Stream
---@class nio.streams.StreamWriter : nio.streams.Writer, nio.streams.Stream

---@class nio.streams.OSStreamReader : nio.streams.StreamReader, nio.streams.OSStream
---@class nio.streams.OSStreamWriter : nio.streams.StreamWriter, nio.streams.OSStream

---@param input integer|uv.uv_pipe_t|uv_pipe_t|nio.streams.OSStream
---@return uv_pipe_t
---@nodoc
local function create_pipe(input)
if type(input) == "userdata" then
-- Existing pipe
return input
end

local pipe, err = vim.loop.new_pipe()
assert(not err and pipe, err)

local fd = type(input) == "number" and input or input and input.fd
if fd then
-- File descriptor
pipe:open(fd)
end

return pipe
end

---@param input integer|nio.streams.OSStreamReader|uv.uv_pipe_t|uv_pipe_t
---@private
function nio.streams.reader(input)
local pipe = create_pipe(input)

local buffer = ""
local ready = control.event()
local complete = control.event()
local started = false

local stop_reading = function()
if not started or complete.is_set() then
return
end
vim.loop.read_stop(pipe)
complete.set()
ready.set()
end

local start = function()
started = true
pipe:read_start(function(err, data)
assert(not err, err)
if not data then
tasks.run(stop_reading)
return
end
buffer = buffer .. data
ready.set()
end)
end

return {
pipe = pipe,
close = function()
stop_reading()
uv.close(pipe)
end,
read = function(n)
if not started then
start()
end
if n == 0 then
return ""
end

while not complete.is_set() and (not n or #buffer < n) do
ready.wait()
ready.clear()
end

local data = n and buffer:sub(1, n) or buffer
buffer = buffer:sub(#data + 1)
return data
end,
}
end

---@param input integer|nio.streams.OSStreamWriter|uv.uv_pipe_t|uv_pipe_t
---@private
function nio.streams.writer(input)
local pipe = create_pipe(input)

return {
pipe = pipe,
write = function(data)
uv.write(pipe, data)
end,
close = function()
uv.shutdown(pipe)
end,
}
end

return nio.streams
4 changes: 2 additions & 2 deletions lua/nio/uv.lua
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ local nio = {}
---@field fs_scandir async fun(path: string): (string|nil,uv_fs_t|nil)
---@field shutdown async fun(stream: uv_stream_t): string|nil
---@field listen async fun(stream: uv_stream_t): string|nil
---@field write async fun(stream: uv_stream_t): string|nil
---@field write async fun(stream: uv_stream_t, data: string|string[]): uv.uv_write_t|nil
---@field write2 async fun(stream: uv_stream_t, data: string|string[], send_handle: uv_stream_t): string|nil
nio.uv = {}

Expand All @@ -78,7 +78,7 @@ local function add(name, argc)
nio.uv[name] = ret
end

add("close", 4) -- close a handle
add("close", 2) -- close a handle
-- filesystem operations
add("fs_open", 4)
add("fs_read", 4)
Expand Down
Loading