-
Notifications
You must be signed in to change notification settings - Fork 23
Allow callers to run a subprocess and provide low and high water marks when using SequenceOutput to emit standard output and standard error as soon as it arrives. #40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
7324ac4
7b6899c
9b173ab
ab43ae4
10aa523
aa903ad
bb57bf2
955e9c9
5f2df5f
4bac06a
ae0de61
d65f2ce
c8fe778
4de3d79
30876f5
ff7ae12
48b97f2
d94a9c6
888666b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,33 +35,59 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { | |
@_nonSendable | ||
public struct Iterator: AsyncIteratorProtocol { | ||
public typealias Element = Buffer | ||
internal typealias Stream = AsyncThrowingStream<Buffer, Swift.Error> | ||
|
||
private let diskIO: DiskIO | ||
private var buffer: [UInt8] | ||
private var currentPosition: Int | ||
private var finished: Bool | ||
private var streamIterator: Stream.AsyncIterator | ||
private let continuation: Stream.Continuation | ||
private var bytesRemaining: Int | ||
|
||
internal init(diskIO: DiskIO) { | ||
self.diskIO = diskIO | ||
self.buffer = [] | ||
self.currentPosition = 0 | ||
self.finished = false | ||
let (stream, continuation) = AsyncThrowingStream<Buffer, Swift.Error>.makeStream() | ||
self.streamIterator = stream.makeAsyncIterator() | ||
self.continuation = continuation | ||
self.bytesRemaining = 0 | ||
} | ||
|
||
public func next() async throws -> Buffer? { | ||
let data = try await self.diskIO.readChunk( | ||
upToLength: readBufferSize | ||
) | ||
if data == nil { | ||
// We finished reading. Close the file descriptor now | ||
public mutating func next() async throws -> Buffer? { | ||
if bytesRemaining <= 0 { | ||
bytesRemaining = readBufferSize | ||
diskIO.stream(upToLength: readBufferSize, continuation: continuation) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think you need to manually stream only up to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we sure we want that? That exacerbates the concerns of @weissi and @FranzBusch. With the current approach, the easing of back pressure is limited to buffering up to about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was trying to unblock this PR so I created #51 to address the However, after some time to think over and studying @FranzBusch’s suggestion (apple/swift-async-algorithms#305.), I’ve come to the conclusion that we can’t make the The current method (AsyncStream + buffer size limit) is better because it prevents the unlimited buffer problem, but it loses the advantage of letting Therefore, for this PR, could you please:
I believe the above approach should still allow callers to set how frequent they wish to be updated without needing to introduce |
||
} | ||
|
||
let buffer: Buffer? | ||
|
||
do { | ||
buffer = try await streamIterator.next() | ||
} catch { | ||
#if os(Windows) | ||
try self.diskIO.close() | ||
#else | ||
self.diskIO.close() | ||
#endif | ||
|
||
throw error | ||
} | ||
|
||
if let buffer { | ||
bytesRemaining -= buffer.count | ||
return buffer | ||
} else { | ||
#if os(Windows) | ||
try self.diskIO.close() | ||
#else | ||
self.diskIO.close() | ||
#endif | ||
|
||
return nil | ||
} | ||
return data | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -393,7 +393,7 @@ internal typealias PlatformFileDescriptor = CInt | |
internal typealias TrackedPlatformDiskIO = TrackedDispatchIO | ||
|
||
extension TrackedFileDescriptor { | ||
internal consuming func createPlatformDiskIO() -> TrackedPlatformDiskIO { | ||
internal consuming func createPlatformDiskIO(with streamOptions: PlatformOptions.StreamOptions) -> TrackedPlatformDiskIO { | ||
let dispatchIO: DispatchIO = DispatchIO( | ||
type: .stream, | ||
fileDescriptor: self.platformDescriptor(), | ||
|
@@ -405,6 +405,15 @@ extension TrackedFileDescriptor { | |
} | ||
} | ||
) | ||
|
||
if let minimumBufferSize = streamOptions.minimumBufferSize { | ||
dispatchIO.setLimit(lowWater: minimumBufferSize) | ||
} | ||
|
||
if let maximumBufferSize = streamOptions.maximumBufferSize { | ||
dispatchIO.setLimit(highWater: maximumBufferSize) | ||
} | ||
|
||
return .init(dispatchIO, closeWhenDone: self.closeWhenDone) | ||
} | ||
} | ||
|
@@ -414,37 +423,32 @@ extension DispatchIO { | |
#if SubprocessSpan | ||
@available(SubprocessSpan, *) | ||
#endif | ||
internal func readChunk(upToLength maxLength: Int) async throws -> AsyncBufferSequence.Buffer? { | ||
return try await withCheckedThrowingContinuation { continuation in | ||
var buffer: DispatchData = .empty | ||
self.read( | ||
offset: 0, | ||
length: maxLength, | ||
queue: .global() | ||
) { done, data, error in | ||
if error != 0 { | ||
continuation.resume( | ||
throwing: SubprocessError( | ||
code: .init(.failedToReadFromSubprocess), | ||
underlyingError: .init(rawValue: error) | ||
) | ||
) | ||
return | ||
} | ||
if let data = data { | ||
if buffer.isEmpty { | ||
buffer = data | ||
} else { | ||
buffer.append(data) | ||
} | ||
} | ||
if done { | ||
if !buffer.isEmpty { | ||
continuation.resume(returning: AsyncBufferSequence.Buffer(data: buffer)) | ||
} else { | ||
continuation.resume(returning: nil) | ||
} | ||
} | ||
internal func stream(upToLength maxLength: Int, continuation: AsyncBufferSequence.Iterator.Stream.Continuation) { | ||
self.read( | ||
offset: 0, | ||
length: maxLength, | ||
queue: .global() | ||
) { done, data, error in | ||
guard error == 0 else { | ||
continuation.finish(throwing: SubprocessError( | ||
code: .init(.failedToReadFromSubprocess), | ||
underlyingError: .init(rawValue: error) | ||
)) | ||
return | ||
} | ||
|
||
guard let data else { | ||
fatalError("Unexpectedly received nil data from DispatchIO with error == 0.") | ||
} | ||
|
||
if !data.isEmpty { | ||
// We have non-empty data. Yield it to the continuation | ||
continuation.yield(AsyncBufferSequence.Buffer(data: data)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wait, this strips backpressure. If you do that then a slow consumer and fast producer will OOM kill you. You won't be able to use AsyncStream here (it's a type that should be avoided unless you use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand the concern, but I don't think this is the right PR to address this issue since it's trying to solve a different problem. I'll address this in a separate PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Created #51 |
||
} else if done { | ||
// Receiving an empty data and done == true means we've reached the end of the file. | ||
continuation.finish() | ||
} else { | ||
fatalError("Unexpectedly received no data from DispatchIO with it indicating it is not done.") | ||
} | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -662,6 +662,50 @@ extension SubprocessUnixTests { | |
#expect(catResult.terminationStatus.isSuccess) | ||
#expect(catResult.standardError == expected) | ||
} | ||
|
||
@Test func testSlowDripRedirectedOutputRedirectToSequence() async throws { | ||
guard #available(SubprocessSpan , *) else { | ||
return | ||
} | ||
let threshold: Double = 0.5 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately in tests you'll have to write
In the beginning to work around the same availability issue. See other tests for examples. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @iCharlesHu When I first started working on this, I was very confused as to why some of the tests weren't running my new code and it was because of this check. Wouldn't it be better to have them skipped and noted as such in the test output rather than falsely succeeding? I'm thinking something like this:
Of course, we can have a helper function to make this less verbose. Thoughts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @iCharlesHu I went ahead and conditionalized this one test this way as an example. Let me know if you don't like that and would like me to revert to a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rdingman unfortunately this won't work because on Swift 6.2 and above the compiler will complain the code inside of Unfortunately so far this is the only way I found that works. Very ugly... but it'll have to do for now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @iCharlesHu Hmm. This builds just fine for me with the latest Swift 6.2 toolchain, has the intended results, and works with the currently shipping Swift 6.1 toolchain. However, I'll go ahead and revert this and match the existing tests. |
||
|
||
let script = """ | ||
echo "DONE" | ||
sleep \(threshold) | ||
""" | ||
|
||
var platformOptions = PlatformOptions() | ||
platformOptions.streamOptions = .init(minimumBufferSize: 0) | ||
|
||
let start = ContinuousClock().now | ||
|
||
let catResult = try await Subprocess.run( | ||
.path("/bin/bash"), | ||
arguments: ["-c", script], | ||
platformOptions: platformOptions, | ||
error: .discarded, | ||
body: { (execution, standardOutput) in | ||
for try await chunk in standardOutput { | ||
let string = chunk._withUnsafeBytes { String(decoding: $0, as: UTF8.self) } | ||
|
||
if string.hasPrefix("DONE") { | ||
let end = ContinuousClock().now | ||
|
||
if (end - start) > .seconds(threshold) { | ||
return "Failure" | ||
|
||
} else { | ||
return "Success" | ||
} | ||
} | ||
} | ||
|
||
return "Failure" | ||
} | ||
) | ||
#expect(catResult.terminationStatus.isSuccess) | ||
#expect(catResult.value == "Success") | ||
} | ||
} | ||
|
||
// MARK: - PlatformOption Tests | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit concerned with the usage of the
AsyncStream
here for two reasons. The first is thatAsyncStream
has no way to propagate the internal async sequence back pressure to the external system that is producing the elements.We had the same issue in
swift-nio
and created the NIOAsyncSequenceProducer that allowed us to bridge NIO's back pressure into the back pressure of an async sequence. The learned a lot from the APIs and implementation in NIO and have an open PR inswift-async-algorithms
that generalizes this concept as a MultiProducerSingleConsumerAsyncChannel.Now having said that it might be fine here back pressure wise since the
DispatchIO
is going to call theioHandler
multiple times but only up toreadBufferSize
. So we do have some maximum limit and our buffer can't balloon indefinitely.However, this brings me to my second point which is the performance of this.
AsyncStream
is not super fast so I expect this PR to have performance impact when streaming a lot of data to/from a subprocess. It would be good to understand that impact and if theMultiProducerSingleConsumerAsyncChannel
can improve this.Lastly, aren't we missing to close the
diskIO
when the iterator is dropped. We probably want to setup aonTerminationCallback
on the stream or the channel to close the diskIO right?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll look into this in a separate PR. I think the
AsyncStream
based implementation is good enough for this PR.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will investigate under #51