@@ -514,15 +514,16 @@ package final class AsyncProcess {
514514 if self . outputRedirection. redirectsOutput {
515515 let stdoutPipe = Pipe ( )
516516 let stderrPipe = Pipe ( )
517+ let stdoutStream = DispatchFD ( fileHandle: stdoutPipe. fileHandleForReading) . dataStream ( )
518+ let stderrStream = DispatchFD ( fileHandle: stderrPipe. fileHandleForReading) . dataStream ( )
517519
518520 group. enter ( )
519- stdoutPipe. fileHandleForReading. readabilityHandler = { ( fh: FileHandle ) in
520- let data = ( try ? fh. read ( upToCount: Int . max) ) ?? Data ( )
521- if data. count == 0 {
522- stdoutPipe. fileHandleForReading. readabilityHandler = nil
521+ Task {
522+ defer {
523523 group. leave ( )
524- } else {
525- let contents = data. withUnsafeBytes { [ UInt8] ( $0) }
524+ }
525+ for try await data in stdoutStream {
526+ let contents = [ UInt8] ( data)
526527 self . outputRedirection. outputClosures? . stdoutClosure ( contents)
527528 stdoutLock. withLock {
528529 stdout += contents
@@ -531,13 +532,12 @@ package final class AsyncProcess {
531532 }
532533
533534 group. enter ( )
534- stderrPipe. fileHandleForReading. readabilityHandler = { ( fh: FileHandle ) in
535- let data = ( try ? fh. read ( upToCount: Int . max) ) ?? Data ( )
536- if data. count == 0 {
537- stderrPipe. fileHandleForReading. readabilityHandler = nil
535+ Task {
536+ defer {
538537 group. leave ( )
539- } else {
540- let contents = data. withUnsafeBytes { [ UInt8] ( $0) }
538+ }
539+ for try await data in stderrStream {
540+ let contents = [ UInt8] ( data)
541541 self . outputRedirection. outputClosures? . stderrClosure ( contents)
542542 stderrLock. withLock {
543543 stderr += contents
@@ -1354,3 +1354,54 @@ extension FileHandle: WritableByteStream {
13541354 }
13551355}
13561356#endif
1357+
1358+ extension DispatchFD {
1359+ public func readChunk( upToLength maxLength: Int ) async throws -> DispatchData {
1360+ return try await withCheckedThrowingContinuation { continuation in
1361+ DispatchIO . read ( fromFileDescriptor: numericCast ( self . rawValue) , maxLength: maxLength, runningHandlerOn: DispatchQueue . global ( ) )
1362+ { data, error in
1363+ if error != 0 {
1364+ continuation. resume ( throwing: StringError ( " POSIX error: \( error) " ) )
1365+ return
1366+ }
1367+ if data. count == 0 {
1368+ continuation. resume ( throwing: StringError ( " No more data " ) )
1369+ }
1370+ continuation. resume ( returning: data)
1371+ }
1372+ }
1373+
1374+ }
1375+
1376+ /// Returns an async stream which reads bytes from the specified file descriptor. Unlike `FileHandle.bytes`, it does not block the caller.
1377+ @available ( macOS 15 . 0 , iOS 18 . 0 , tvOS 18 . 0 , watchOS 11 . 0 , visionOS 2 . 0 , * )
1378+ public func dataStream( ) -> some AsyncSequence < DispatchData , any Error > {
1379+ AsyncThrowingStream < DispatchData , any Error > {
1380+ while !Task. isCancelled {
1381+ let chunk = try await readChunk ( upToLength: 4096 )
1382+ if chunk. isEmpty {
1383+ return nil
1384+ }
1385+ return chunk
1386+ }
1387+ throw CancellationError ( )
1388+ }
1389+ }
1390+ }
1391+
1392+ public struct DispatchFD {
1393+ #if os(Windows)
1394+ fileprivate let rawValue : Int
1395+ #else
1396+ fileprivate let rawValue : Int32
1397+ #endif
1398+
1399+ init ( fileHandle: FileHandle ) {
1400+ #if os(Windows)
1401+ // This may look unsafe, but is how swift-corelibs-dispatch works. Basically, dispatch_fd_t directly represents either a POSIX file descriptor OR a Windows HANDLE pointer address, meaning that the fileDescriptor parameter of various Dispatch APIs is actually NOT a file descriptor on Windows but rather a HANDLE. This means that the handle should NOT be converted using _open_osfhandle, and the return value of this function should ONLY be passed to Dispatch functions where the fileDescriptor parameter is masquerading as a HANDLE in this manner. Use with extreme caution.
1402+ rawValue = . init( bitPattern: fileHandle. _handle)
1403+ #else
1404+ rawValue = fileHandle. fileDescriptor
1405+ #endif
1406+ }
1407+ }
0 commit comments