Skip to content

Allow callers to run a subprocess and provide low and high water marks when using SequenceOutput to emit standard output and standard error as soon as it arrives. #40

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 33 additions & 7 deletions Sources/Subprocess/AsyncBufferSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,33 +35,59 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
@_nonSendable
public struct Iterator: AsyncIteratorProtocol {
public typealias Element = Buffer
internal typealias Stream = AsyncThrowingStream<Buffer, Swift.Error>

private let diskIO: DiskIO
private var buffer: [UInt8]
private var currentPosition: Int
private var finished: Bool
private var streamIterator: Stream.AsyncIterator
private let continuation: Stream.Continuation
private var bytesRemaining: Int

internal init(diskIO: DiskIO) {
self.diskIO = diskIO
self.buffer = []
self.currentPosition = 0
self.finished = false
let (stream, continuation) = AsyncThrowingStream<Buffer, Swift.Error>.makeStream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit concerned with the usage of the AsyncStream here for two reasons. The first is that AsyncStream has no way to propagate the internal async sequence back pressure to the external system that is producing the elements.

We had the same issue in swift-nio and created the NIOAsyncSequenceProducer that allowed us to bridge NIO's back pressure into the back pressure of an async sequence. The learned a lot from the APIs and implementation in NIO and have an open PR in swift-async-algorithms that generalizes this concept as a MultiProducerSingleConsumerAsyncChannel.

Now having said that it might be fine here back pressure wise since the DispatchIO is going to call the ioHandler multiple times but only up to readBufferSize. So we do have some maximum limit and our buffer can't balloon indefinitely.

However, this brings me to my second point which is the performance of this. AsyncStream is not super fast so I expect this PR to have performance impact when streaming a lot of data to/from a subprocess. It would be good to understand that impact and if the MultiProducerSingleConsumerAsyncChannel can improve this.

Lastly, aren't we missing to close the diskIO when the iterator is dropped. We probably want to setup a onTerminationCallback on the stream or the channel to close the diskIO right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll look into this in a separate PR. I think the AsyncStream based implementation is good enough for this PR.

Copy link
Contributor

@iCharlesHu iCharlesHu May 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will investigate under #51

self.streamIterator = stream.makeAsyncIterator()
self.continuation = continuation
self.bytesRemaining = 0
}

public func next() async throws -> Buffer? {
let data = try await self.diskIO.readChunk(
upToLength: readBufferSize
)
if data == nil {
// We finished reading. Close the file descriptor now
public mutating func next() async throws -> Buffer? {
if bytesRemaining <= 0 {
bytesRemaining = readBufferSize
diskIO.stream(upToLength: readBufferSize, continuation: continuation)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to manually stream only up to readBufferSize here. The idea of streaming is that we want to read until the end of file so we should use .max here to specify DispatchIO should read data until an EOF is reached. You should only need to call stream once in the initializer.

Copy link
Author

@rdingman rdingman May 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iCharlesHu

Are we sure we want that? That exacerbates the concerns of @weissi and @FranzBusch. With the current approach, the easing of back pressure is limited to buffering up to about readBufferSize for slow consumers. This suggestion opens that up to the entire output leaving it essentially unbounded.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to unblock this PR so I created #51 to address the AsyncStream issue.

However, after some time to think over and studying @FranzBusch’s suggestion (apple/swift-async-algorithms#305.), I’ve come to the conclusion that we can’t make the read function “push based” (using AsyncStream) if we want to depend on DispatchIO. The main reason is that DispatchIO doesn’t support back pressure, so any back pressure mechanism we implement would be limited by DispatchIO since we can’t instruct it to slow down (blocking the handler block is a no-go).

The current method (AsyncStream + buffer size limit) is better because it prevents the unlimited buffer problem, but it loses the advantage of letting DispatchIO determine the optimal buffer size. Instead of calling .read once with .max, we call it multiple times with a small buffer size. Additionally, as @FranzBusch pointed out, AsyncStream has performance issues, making it not worthwhile to use any more.

Therefore, for this PR, could you please:

  • Remove the use of AsyncStream and .stream() and instead use withCheckedThrowingContinuation and .read() with a simple async function to read data up to a max length (similar to my implementation here https://github.com/swiftlang/swift-subprocess/pull/48/files)
  • This change means we can’t rely on calling .setLimit(lowWater:) / .setLimit(highWater:) since .read() will always read until the buffer size is reached. Therefore, instead of PlatformOptions.StreamOptions, we just need a let preferredStreamBufferSize: Int to set the actual buffer size we instruct .read() to fetch each time.

I believe the above approach should still allow callers to set how frequent they wish to be updated without needing to introduce AsyncStream.

}

let buffer: Buffer?

do {
buffer = try await streamIterator.next()
} catch {
#if os(Windows)
try self.diskIO.close()
#else
self.diskIO.close()
#endif

throw error
}

if let buffer {
bytesRemaining -= buffer.count
return buffer
} else {
#if os(Windows)
try self.diskIO.close()
#else
self.diskIO.close()
#endif

return nil
}
return data
}
}

Expand Down
21 changes: 18 additions & 3 deletions Sources/Subprocess/Platforms/Subprocess+Darwin.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public struct PlatformOptions: Sendable {
/// Creates a session and sets the process group ID
/// i.e. Detach from the terminal.
public var createSession: Bool = false

public var streamOptions: StreamOptions = .init()

/// An ordered list of steps in order to tear down the child
/// process in case the parent task is cancelled before
/// the child proces terminates.
Expand Down Expand Up @@ -126,6 +129,18 @@ extension PlatformOptions {
#endif
}

extension PlatformOptions {
public struct StreamOptions: Sendable {
let minimumBufferSize: Int?
let maximumBufferSize: Int?

public init(minimumBufferSize: Int? = nil, maximumBufferSize: Int? = nil) {
self.minimumBufferSize = minimumBufferSize
self.maximumBufferSize = maximumBufferSize
}
}
}

extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible {
internal func description(withIndent indent: Int) -> String {
let indent = String(repeating: " ", count: indent * 4)
Expand Down Expand Up @@ -441,9 +456,9 @@ extension Configuration {
)
return SpawnResult(
execution: execution,
inputWriteEnd: inputWriteFileDescriptor?.createPlatformDiskIO(),
outputReadEnd: outputReadFileDescriptor?.createPlatformDiskIO(),
errorReadEnd: errorReadFileDescriptor?.createPlatformDiskIO()
inputWriteEnd: inputWriteFileDescriptor?.createPlatformDiskIO(with: platformOptions.streamOptions),
outputReadEnd: outputReadFileDescriptor?.createPlatformDiskIO(with: platformOptions.streamOptions),
errorReadEnd: errorReadFileDescriptor?.createPlatformDiskIO(with: platformOptions.streamOptions)
)
}

Expand Down
15 changes: 15 additions & 0 deletions Sources/Subprocess/Platforms/Subprocess+Linux.swift
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ public struct PlatformOptions: Sendable {
// Creates a session and sets the process group ID
// i.e. Detach from the terminal.
public var createSession: Bool = false

public var streamOptions: StreamOptions = .init()

/// An ordered list of steps in order to tear down the child
/// process in case the parent task is cancelled before
/// the child proces terminates.
Expand All @@ -235,6 +238,18 @@ public struct PlatformOptions: Sendable {
public init() {}
}

extension PlatformOptions {
public struct StreamOptions: Sendable {
let minimumBufferSize: Int?
let maximumBufferSize: Int?

public init(minimumBufferSize: Int? = nil, maximumBufferSize: Int? = nil) {
self.minimumBufferSize = minimumBufferSize
self.maximumBufferSize = maximumBufferSize
}
}
}

extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible {
internal func description(withIndent indent: Int) -> String {
let indent = String(repeating: " ", count: indent * 4)
Expand Down
68 changes: 36 additions & 32 deletions Sources/Subprocess/Platforms/Subprocess+Unix.swift
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ internal typealias PlatformFileDescriptor = CInt
internal typealias TrackedPlatformDiskIO = TrackedDispatchIO

extension TrackedFileDescriptor {
internal consuming func createPlatformDiskIO() -> TrackedPlatformDiskIO {
internal consuming func createPlatformDiskIO(with streamOptions: PlatformOptions.StreamOptions) -> TrackedPlatformDiskIO {
let dispatchIO: DispatchIO = DispatchIO(
type: .stream,
fileDescriptor: self.platformDescriptor(),
Expand All @@ -405,6 +405,15 @@ extension TrackedFileDescriptor {
}
}
)

if let minimumBufferSize = streamOptions.minimumBufferSize {
dispatchIO.setLimit(lowWater: minimumBufferSize)
}

if let maximumBufferSize = streamOptions.maximumBufferSize {
dispatchIO.setLimit(highWater: maximumBufferSize)
}

return .init(dispatchIO, closeWhenDone: self.closeWhenDone)
}
}
Expand All @@ -414,37 +423,32 @@ extension DispatchIO {
#if SubprocessSpan
@available(SubprocessSpan, *)
#endif
internal func readChunk(upToLength maxLength: Int) async throws -> AsyncBufferSequence.Buffer? {
return try await withCheckedThrowingContinuation { continuation in
var buffer: DispatchData = .empty
self.read(
offset: 0,
length: maxLength,
queue: .global()
) { done, data, error in
if error != 0 {
continuation.resume(
throwing: SubprocessError(
code: .init(.failedToReadFromSubprocess),
underlyingError: .init(rawValue: error)
)
)
return
}
if let data = data {
if buffer.isEmpty {
buffer = data
} else {
buffer.append(data)
}
}
if done {
if !buffer.isEmpty {
continuation.resume(returning: AsyncBufferSequence.Buffer(data: buffer))
} else {
continuation.resume(returning: nil)
}
}
internal func stream(upToLength maxLength: Int, continuation: AsyncBufferSequence.Iterator.Stream.Continuation) {
self.read(
offset: 0,
length: maxLength,
queue: .global()
) { done, data, error in
guard error == 0 else {
continuation.finish(throwing: SubprocessError(
code: .init(.failedToReadFromSubprocess),
underlyingError: .init(rawValue: error)
))
return
}

guard let data else {
fatalError("Unexpectedly received nil data from DispatchIO with error == 0.")
}

if !data.isEmpty {
// We have non-empty data. Yield it to the continuation
continuation.yield(AsyncBufferSequence.Buffer(data: data))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, this strips backpressure. If you do that then a slow consumer and fast producer will OOM kill you. You won't be able to use AsyncStream here (it's a type that should be avoided unless you use AsyncStream(unfolding: {...}) or set it to drop elements if the buffer is full (but of course that's not possible here because it would lose data).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the concern, but I don't think this is the right PR to address this issue since it's trying to solve a different problem. I'll address this in a separate PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #51

} else if done {
// Receiving an empty data and done == true means we've reached the end of the file.
continuation.finish()
} else {
fatalError("Unexpectedly received no data from DispatchIO with it indicating it is not done.")
}
}
}
Expand Down
59 changes: 49 additions & 10 deletions Sources/Subprocess/Platforms/Subprocess+Windows.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1071,18 +1071,57 @@ extension FileDescriptor {
}

extension FileDescriptor {
internal func readChunk(upToLength maxLength: Int) async throws -> AsyncBufferSequence.Buffer? {
return try await withCheckedThrowingContinuation { continuation in
self.readUntilEOF(
upToLength: maxLength
) { result in
switch result {
case .failure(let error):
continuation.resume(throwing: error)
case .success(let bytes):
continuation.resume(returning: AsyncBufferSequence.Buffer(data: bytes))
internal func stream(upToLength maxLength: Int, continuation: AsyncBufferSequence.Iterator.Stream.Continuation) {
do {
var totalBytesRead: Int = 0

while totalBytesRead < maxLength {
let values = try [UInt8](
unsafeUninitializedCapacity: maxLength - totalBytesRead
) { buffer, initializedCount in
guard let baseAddress = buffer.baseAddress else {
initializedCount = 0
return
}

var bytesRead: DWORD = 0
let readSucceed = ReadFile(
self.platformDescriptor,
UnsafeMutableRawPointer(mutating: baseAddress),
DWORD(maxLength - totalBytesRead),
&bytesRead,
nil
)

if !readSucceed {
// Windows throws ERROR_BROKEN_PIPE when the pipe is closed
let error = GetLastError()
if error == ERROR_BROKEN_PIPE {
// We are done reading
initializedCount = 0
} else {
initializedCount = 0
throw SubprocessError(
code: .init(.failedToReadFromSubprocess),
underlyingError: .init(rawValue: error)
)
}
} else {
// We successfully read the current round
initializedCount += Int(bytesRead)
}
}

if values.count > 0 {
totalBytesRead += values.count
continuation.yield(AsyncBufferSequence.Buffer(data: values))
} else {
continuation.finish()
return
}
}
} catch {
continuation.finish(throwing: error)
}
}

Expand Down
44 changes: 44 additions & 0 deletions Tests/SubprocessTests/SubprocessTests+Unix.swift
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,50 @@ extension SubprocessUnixTests {
#expect(catResult.terminationStatus.isSuccess)
#expect(catResult.standardError == expected)
}

@Test func testSlowDripRedirectedOutputRedirectToSequence() async throws {
guard #available(SubprocessSpan , *) else {
return
}
let threshold: Double = 0.5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately in tests you'll have to write

guard #available(SubprocessSpan , *) else {
    return
}

In the beginning to work around the same availability issue. See other tests for examples.

Copy link
Author

@rdingman rdingman May 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iCharlesHu When I first started working on this, I was very confused as to why some of the tests weren't running my new code and it was because of this check. Wouldn't it be better to have them skipped and noted as such in the test output rather than falsely succeeding? I'm thinking something like this:

    @Test(
        .enabled(
            if: {
                if #available(SubprocessSpan , *) {
                    true
                } else {
                    false
                }
            }(),
            "This test requires SubprocessSpan"
        )
    )
    func testSlowDripRedirectedOutputRedirectToSequence() async throws {
    }

Of course, we can have a helper function to make this less verbose.

Thoughts?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iCharlesHu I went ahead and conditionalized this one test this way as an example. Let me know if you don't like that and would like me to revert to a guard

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdingman unfortunately this won't work because on Swift 6.2 and above the compiler will complain the code inside of testSlowDripRedirectedOutputRedirectToSequence needs SubprocessSpan and ask you to put @available around the function. That doesn't work either because the macro won't pick it up.

Unfortunately so far this is the only way I found that works. Very ugly... but it'll have to do for now.

Copy link
Author

@rdingman rdingman May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iCharlesHu Hmm. This builds just fine for me with the latest Swift 6.2 toolchain, has the intended results, and works with the currently shipping Swift 6.1 toolchain. However, I'll go ahead and revert this and match the existing tests.


let script = """
echo "DONE"
sleep \(threshold)
"""

var platformOptions = PlatformOptions()
platformOptions.streamOptions = .init(minimumBufferSize: 0)

let start = ContinuousClock().now

let catResult = try await Subprocess.run(
.path("/bin/bash"),
arguments: ["-c", script],
platformOptions: platformOptions,
error: .discarded,
body: { (execution, standardOutput) in
for try await chunk in standardOutput {
let string = chunk._withUnsafeBytes { String(decoding: $0, as: UTF8.self) }

if string.hasPrefix("DONE") {
let end = ContinuousClock().now

if (end - start) > .seconds(threshold) {
return "Failure"

} else {
return "Success"
}
}
}

return "Failure"
}
)
#expect(catResult.terminationStatus.isSuccess)
#expect(catResult.value == "Success")
}
}

// MARK: - PlatformOption Tests
Expand Down