Skip to content

run cleanupRelay only once in TerminalIO and StandardIO #153

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Sources/Integration/ProcessTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ extension IntegrationSuite {

let status = try await exec.wait()
if status != 0 {
throw IntegrationError.assert(msg: "process \(idx) status for \(status) != 0")
throw IntegrationError.assert(msg: "process \(idx) status \(status) != 0")
}
var hasher = SHA256()
hasher.update(data: buffer.data)
Expand Down
9 changes: 7 additions & 2 deletions vminitd/Sources/vminitd/ManagedProcess.swift
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,11 @@ final class ManagedProcess: Sendable {
extension ManagedProcess {
func start() throws -> Int32 {
try self.lock.withLock {
log.debug("starting managed process")
log.debug(
"starting managed process",
metadata: [
"id": "\(id)"
])

// Start the underlying process.
try process.start()
Expand All @@ -155,7 +159,8 @@ extension ManagedProcess {
log.debug(
"started managed process",
metadata: [
"pid": "\(i)"
"pid": "\(i)",
"id": "\(id)",
])

return i
Expand Down
13 changes: 8 additions & 5 deletions vminitd/Sources/vminitd/ProcessSupervisor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,15 @@ actor ProcessSupervisor {
let exited = Reaper.reap()
self.log?.debug("finished wait4 of \(exited.count) processes")

for proc in processes {
let pid = proc.pid
self.log?.debug("checking for exit of managed process", metadata: ["pid": "\(pid)", "exits": "\(exited)"])
self.log?.debug("checking for exit of managed process", metadata: ["exits": "\(exited)", "processes": "\(processes.count)"])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch 😆

let exitedProcesses = self.processes.filter { proc in
exited.contains { pid, _ in
proc.pid == pid
}
}

for proc in exitedProcesses {
let pid = proc.pid
if pid <= 0 {
continue
}
Expand All @@ -80,7 +85,6 @@ actor ProcessSupervisor {
"status": "\(status)",
"count": "\(processes.count - 1)",
])

proc.setExit(status)
self.processes.removeAll(where: { $0.pid == pid })
}
Expand All @@ -95,7 +99,6 @@ actor ProcessSupervisor {

do {
self.processes.append(process)

return try process.start()
} catch {
self.log?.error("process start failed \(error)", metadata: ["process-id": "\(process.id)"])
Expand Down
24 changes: 16 additions & 8 deletions vminitd/Sources/vminitd/StandardIO.swift
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ final class StandardIO: ManagedProcess.IO & Sendable {
port: stdinPort,
cid: VsockType.hostCID
)
let stdinSocket = try Socket(type: type)
let stdinSocket = try Socket(type: type, closeOnDeinit: false)
try stdinSocket.connect()
self.stdinSocket = stdinSocket

Expand All @@ -93,7 +93,8 @@ final class StandardIO: ManagedProcess.IO & Sendable {
port: stdoutPort,
cid: VsockType.hostCID
)
let stdoutSocket = try Socket(type: type)
// These fd's get closed when cleanupRelay is called
let stdoutSocket = try Socket(type: type, closeOnDeinit: false)
try stdoutSocket.connect()
self.stdoutSocket = stdoutSocket

Expand All @@ -108,7 +109,7 @@ final class StandardIO: ManagedProcess.IO & Sendable {
port: stderrPort,
cid: VsockType.hostCID
)
let stderrSocket = try Socket(type: type)
let stderrSocket = try Socket(type: type, closeOnDeinit: false)
try stderrSocket.connect()
self.stderrSocket = stderrSocket

Expand All @@ -125,12 +126,19 @@ final class StandardIO: ManagedProcess.IO & Sendable {
func relay(readFromFd: Int32, writeToFd: Int32) throws {
let readFrom = OSFile(fd: readFromFd)
let writeTo = OSFile(fd: writeToFd)
// `buf` isn't used concurrently.
// `buf` and `didCleanup` aren't used concurrently.
nonisolated(unsafe) let buf = UnsafeMutableBufferPointer<UInt8>.allocate(capacity: Int(getpagesize()))
nonisolated(unsafe) var didCleanup = false

let cleanupRelay: @Sendable () -> Void = {
if didCleanup { return }
didCleanup = true
self.cleanupRelay(readFd: readFromFd, writeFd: writeToFd, buffer: buf, log: self.log)
}

try ProcessSupervisor.default.poller.add(readFromFd, mask: EPOLLIN) { mask in
if mask.isHangup && !mask.readyToRead {
self.cleanupRelay(readFd: readFromFd, writeFd: writeToFd, buffer: buf, log: self.log)
cleanupRelay()
return
}
// Loop so that in the case that someone wrote > buf.count down the pipe
Expand All @@ -146,7 +154,7 @@ final class StandardIO: ManagedProcess.IO & Sendable {
let w = writeTo.write(view)
if w.wrote != r.read {
self.log?.error("stopping relay: short write for stdio")
self.cleanupRelay(readFd: readFromFd, writeFd: writeToFd, buffer: buf, log: self.log)
cleanupRelay()
return
}
}
Expand All @@ -156,13 +164,13 @@ final class StandardIO: ManagedProcess.IO & Sendable {
self.log?.error("failed with errno \(errno) while reading for fd \(readFromFd)")
fallthrough
case .eof:
self.cleanupRelay(readFd: readFromFd, writeFd: writeToFd, buffer: buf, log: self.log)
cleanupRelay()
self.log?.debug("closing relay for \(readFromFd)")
return
case .again:
// We read all we could, exit.
if mask.isHangup {
self.cleanupRelay(readFd: readFromFd, writeFd: writeToFd, buffer: buf, log: self.log)
cleanupRelay()
}
return
default:
Expand Down
21 changes: 14 additions & 7 deletions vminitd/Sources/vminitd/TerminalIO.swift
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ final class TerminalIO: ManagedProcess.IO & Sendable {
port: stdinPort,
cid: VsockType.hostCID
)
let stdinSocket = try Socket(type: type)
let stdinSocket = try Socket(type: type, closeOnDeinit: false)
try stdinSocket.connect()
self.stdinSocket = stdinSocket

Expand All @@ -77,7 +77,7 @@ final class TerminalIO: ManagedProcess.IO & Sendable {
port: stdoutPort,
cid: VsockType.hostCID
)
let stdoutSocket = try Socket(type: type)
let stdoutSocket = try Socket(type: type, closeOnDeinit: false)
try stdoutSocket.connect()
self.stdoutSocket = stdoutSocket

Expand All @@ -91,12 +91,19 @@ final class TerminalIO: ManagedProcess.IO & Sendable {
func relay(readFromFd: Int32, writeToFd: Int32) throws {
let readFrom = OSFile(fd: readFromFd)
let writeTo = OSFile(fd: writeToFd)
// `buf` isn't used concurrently.
// `buf` and `didCleanup` aren't used concurrently.
nonisolated(unsafe) let buf = UnsafeMutableBufferPointer<UInt8>.allocate(capacity: Int(getpagesize()))
nonisolated(unsafe) var didCleanup = false

let cleanupRelay: @Sendable () -> Void = {
if didCleanup { return }
didCleanup = true
self.cleanupRelay(readFd: readFromFd, writeFd: writeToFd, buffer: buf, log: self.log)
}

try ProcessSupervisor.default.poller.add(readFromFd, mask: EPOLLIN) { mask in
if mask.isHangup && !mask.readyToRead {
self.cleanupRelay(readFd: readFromFd, writeFd: writeToFd, buffer: buf, log: self.log)
cleanupRelay()
return
}
// Loop so that in the case that someone wrote > buf.count down the pipe
Expand All @@ -112,7 +119,7 @@ final class TerminalIO: ManagedProcess.IO & Sendable {
let w = writeTo.write(view)
if w.wrote != r.read {
self.log?.error("stopping relay: short write for stdio")
self.cleanupRelay(readFd: readFromFd, writeFd: writeToFd, buffer: buf, log: self.log)
cleanupRelay()
return
}
}
Expand All @@ -122,13 +129,13 @@ final class TerminalIO: ManagedProcess.IO & Sendable {
self.log?.error("failed with errno \(errno) while reading for fd \(readFromFd)")
fallthrough
case .eof:
self.cleanupRelay(readFd: readFromFd, writeFd: writeToFd, buffer: buf, log: self.log)
cleanupRelay()
self.log?.debug("closing relay for \(readFromFd)")
return
case .again:
// We read all we could, exit.
if mask.isHangup {
self.cleanupRelay(readFd: readFromFd, writeFd: writeToFd, buffer: buf, log: self.log)
cleanupRelay()
}
return
default:
Expand Down