Skip to content

Process termination monitoring implementation on Linux conflicts with processes spawned by other means #83

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 99 additions & 69 deletions Sources/Subprocess/Platforms/Subprocess+Linux.swift
Original file line number Diff line number Diff line change
Expand Up @@ -266,32 +266,56 @@ extension String {
internal func monitorProcessTermination(
forProcessWithIdentifier pid: ProcessIdentifier
) async throws -> TerminationStatus {
// Ensure the waiter thread is running.
_setupMonitorSignalHandler()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is already called in Configuration.spawn, we don't need to call it again here.


return try await withCheckedThrowingContinuation { continuation in
_childProcessContinuations.withLock { continuations in
if let existing = continuations.removeValue(forKey: pid.value),
case .status(let existingStatus) = existing
{
// We already have existing status to report
continuation.resume(returning: existingStatus)
} else {
// Save the continuation for handler
continuations[pid.value] = .continuation(continuation)
}
// We don't need to worry about a race condition here because waitid()
// does not clear the wait/zombie state of the child process. If it sees
// the child process has terminated and manages to acquire the lock before
// we add this continuation to the dictionary, then it will simply loop
// and report the status again.
let oldContinuation = continuations.updateValue(continuation, forKey: pid.value)
precondition(oldContinuation == nil)

// Wake up the waiter thread if it is waiting for more child processes.
_ = pthread_cond_signal(_waitThreadNoChildrenCondition)
}
}
}

private enum ContinuationOrStatus {
case continuation(CheckedContinuation<TerminationStatus, any Error>)
case status(TerminationStatus)
// Small helper to provide thread-safe access to the child process to continuations map as well as a condition variable to suspend the calling thread when there are no subprocesses to wait for. Note that Mutex cannot be used here because we need the semantics of pthread_cond_wait, which requires passing the pthread_mutex_t instance as a parameter, something the Mutex API does not provide access to.
private final class ChildProcessContinuations: Sendable {
private nonisolated(unsafe) var continuations = [pid_t: CheckedContinuation<TerminationStatus, any Error>]()
private nonisolated(unsafe) let mutex = UnsafeMutablePointer<pthread_mutex_t>.allocate(capacity: 1)

init() {
pthread_mutex_init(mutex, nil)
}

func withLock<R>(_ body: (inout [pid_t: CheckedContinuation<TerminationStatus, any Error>]) throws -> R) rethrows -> R {
try withUnsafeUnderlyingLock { _, continuations in
try body(&continuations)
}
}

func withUnsafeUnderlyingLock<R>(_ body: (UnsafeMutablePointer<pthread_mutex_t>, inout [pid_t: CheckedContinuation<TerminationStatus, any Error>]) throws -> R) rethrows -> R {
pthread_mutex_lock(mutex)
defer {
pthread_mutex_unlock(mutex)
}
return try body(mutex, &continuations)
}
}

private let _childProcessContinuations:
Mutex<
[pid_t: ContinuationOrStatus]
> = Mutex([:])
private let _childProcessContinuations = ChildProcessContinuations()

private let signalSource: SendableSourceSignal = SendableSourceSignal()
private nonisolated(unsafe) let _waitThreadNoChildrenCondition = {
let result = UnsafeMutablePointer<pthread_cond_t>.allocate(capacity: 1)
_ = pthread_cond_init(result, nil)
return result
}()

private extension siginfo_t {
var si_status: Int32 {
Expand All @@ -316,64 +340,70 @@ private extension siginfo_t {
}

private let setup: () = {
signalSource.setEventHandler {
while true {
var siginfo = siginfo_t()
guard waitid(P_ALL, id_t(0), &siginfo, WEXITED) == 0 || errno == EINTR else {
return
}
var status: TerminationStatus? = nil
switch siginfo.si_code {
case .init(CLD_EXITED):
status = .exited(siginfo.si_status)
case .init(CLD_KILLED), .init(CLD_DUMPED):
status = .unhandledException(siginfo.si_status)
case .init(CLD_TRAPPED), .init(CLD_STOPPED), .init(CLD_CONTINUED):
// Ignore these signals because they are not related to
// process exiting
break
default:
fatalError("Unexpected exit status: \(siginfo.si_code)")
}
if let status = status {
_childProcessContinuations.withLock { continuations in
// Create the thread. It will run immediately; because it runs in an infinite
// loop, we aren't worried about detaching or joining it.
var thread = pthread_t()
_ = pthread_create(
&thread,
nil,
{ _ -> UnsafeMutableRawPointer? in
// Run an infinite loop that waits for child processes to terminate and
// captures their exit statuses.
while true {
// Listen for child process exit events. WNOWAIT means we don't perturb the
// state of a terminated (zombie) child process, allowing us to fetch the
// continuation (if available) before reaping.
var siginfo = siginfo_t()
errno = 0
if waitid(P_ALL, id_t(0), &siginfo, WEXITED | WNOWAIT) == 0 {
let pid = siginfo.si_pid
if let existing = continuations.removeValue(forKey: pid),
case .continuation(let c) = existing
{
c.resume(returning: status)
} else {
// We don't have continuation yet, just state status
continuations[pid] = .status(status)

// If we had a continuation for this PID, allow the process to be reaped
// and pass the resulting exit condition back to the calling task. If
// there is no continuation, then either it hasn't been stored yet or
// this child process is not tracked by the waiter thread.
guard pid != 0, let c = _childProcessContinuations.withLock({ $0.removeValue(forKey: pid) }) else {
continue
}

c.resume(with: Result {
// Here waitid should not block because `pid` has already terminated at this point.
while true {
var siginfo = siginfo_t()
errno = 0
if waitid(P_PID, numericCast(pid), &siginfo, WEXITED) == 0 {
var status: TerminationStatus? = nil
switch siginfo.si_code {
case .init(CLD_EXITED):
return .exited(siginfo.si_status)
case .init(CLD_KILLED), .init(CLD_DUMPED):
return .unhandledException(siginfo.si_status)
default:
fatalError("Unexpected exit status: \(siginfo.si_code)")
}
} else if errno != EINTR {
throw SubprocessError.UnderlyingError(rawValue: errno)
}
}
})
} else if errno == ECHILD {
// We got ECHILD. If there are no continuations added right now, we should
// suspend this thread on the no-children condition until it's awoken by a
// newly-scheduled waiter process. (If this condition is spuriously
// woken, we'll just loop again, which is fine.) Note that we read errno
// outside the lock in case acquiring the lock perturbs it.
_childProcessContinuations.withUnsafeUnderlyingLock { lock, childProcessContinuations in
if childProcessContinuations.isEmpty {
_ = pthread_cond_wait(_waitThreadNoChildrenCondition, lock)
}
}
}
}
}
}
signalSource.resume()
},
nil
)
}()

/// Unchecked Sendable here since this class is only explicitly
/// initialized once during the lifetime of the process
final class SendableSourceSignal: @unchecked Sendable {
private let signalSource: DispatchSourceSignal

func setEventHandler(handler: @escaping DispatchSourceHandler) {
self.signalSource.setEventHandler(handler: handler)
}

func resume() {
self.signalSource.resume()
}

init() {
self.signalSource = DispatchSource.makeSignalSource(
signal: SIGCHLD,
queue: .global()
)
}
}

private func _setupMonitorSignalHandler() {
// Only executed once
setup
Expand Down
Loading