Skip to content

Create platform specific AsyncIO #64

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions Sources/Subprocess/AsyncBufferSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
public typealias Failure = any Swift.Error
public typealias Element = Buffer

#if os(Windows)
internal typealias DiskIO = FileDescriptor
#else
#if canImport(Darwin)
internal typealias DiskIO = DispatchIO
#else
internal typealias DiskIO = FileDescriptor
#endif

@_nonSendable
Expand All @@ -47,15 +47,16 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
return self.buffer.removeFirst()
}
// Read more data
let data = try await self.diskIO.read(
upToLength: readBufferSize
let data = try await AsyncIO.shared.read(
from: self.diskIO,
upTo: readBufferSize
)
guard let data else {
// We finished reading. Close the file descriptor now
#if os(Windows)
try self.diskIO.close()
#else
#if canImport(Darwin)
self.diskIO.close()
#else
try self.diskIO.close()
#endif
return nil
}
Expand Down Expand Up @@ -130,17 +131,7 @@ extension AsyncBufferSequence {
self.eofReached = true
return nil
}
#if os(Windows)
// Cast data to CodeUnit type
let result = buffer.withUnsafeBytes { ptr in
return Array(
UnsafeBufferPointer<Encoding.CodeUnit>(
start: ptr.bindMemory(to: Encoding.CodeUnit.self).baseAddress!,
count: ptr.count / MemoryLayout<Encoding.CodeUnit>.size
)
)
}
#else
#if canImport(Darwin)
// Unfortunately here we _have to_ copy the bytes out because
// DispatchIO (rightfully) reuses buffer, which means `buffer.data`
// has the same address on all iterations, therefore we can't directly
Expand All @@ -155,7 +146,16 @@ extension AsyncBufferSequence {
UnsafeBufferPointer(start: ptr.baseAddress?.assumingMemoryBound(to: Encoding.CodeUnit.self), count: elementCount)
)
}

#else
// Cast data to CodeUnit type
let result = buffer.withUnsafeBytes { ptr in
return Array(
UnsafeBufferPointer<Encoding.CodeUnit>(
start: ptr.bindMemory(to: Encoding.CodeUnit.self).baseAddress!,
count: ptr.count / MemoryLayout<Encoding.CodeUnit>.size
)
)
}
#endif
return result.isEmpty ? nil : result
}
Expand Down
47 changes: 26 additions & 21 deletions Sources/Subprocess/Buffer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,8 @@
extension AsyncBufferSequence {
/// A immutable collection of bytes
public struct Buffer: Sendable {
#if os(Windows)
internal let data: [UInt8]

internal init(data: [UInt8]) {
self.data = data
}

internal static func createFrom(_ data: [UInt8]) -> [Buffer] {
return [.init(data: data)]
}
#else
// We need to keep the backingData alive while _ContiguousBufferView is alive
#if canImport(Darwin)
// We need to keep the backingData alive while Slice is alive
internal let backingData: DispatchData
internal let data: DispatchData._ContiguousBufferView

Expand All @@ -45,7 +35,17 @@ extension AsyncBufferSequence {
}
return slices.map{ .init(data: $0, backingData: data) }
}
#endif
#else
internal let data: [UInt8]

internal init(data: [UInt8]) {
self.data = data
}

internal static func createFrom(_ data: [UInt8]) -> [Buffer] {
return [.init(data: data)]
}
#endif // canImport(Darwin)
}
}

Expand Down Expand Up @@ -92,26 +92,23 @@ extension AsyncBufferSequence.Buffer {

// MARK: - Hashable, Equatable
extension AsyncBufferSequence.Buffer: Equatable, Hashable {
#if os(Windows)
// Compiler generated conformances
#else
#if canImport(Darwin)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know these aren't not new changes, but why do we have custom implementation only for Darwin?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DispatchData is unfortunately not Hashable

public static func == (lhs: AsyncBufferSequence.Buffer, rhs: AsyncBufferSequence.Buffer) -> Bool {
return lhs.data.elementsEqual(rhs.data)
return lhs.data == rhs.data
}

public func hash(into hasher: inout Hasher) {
self.data.withUnsafeBytes { ptr in
hasher.combine(bytes: ptr)
}
hasher.combine(self.data)
}
#endif
// else Compiler generated conformances
}

// MARK: - DispatchData.Block
#if canImport(Darwin) || canImport(Glibc) || canImport(Android) || canImport(Musl)
extension DispatchData {
/// Unfortunately `DispatchData.Region` is not available on Linux, hence our own wrapper
internal struct _ContiguousBufferView: @unchecked Sendable, RandomAccessCollection {
internal struct _ContiguousBufferView: @unchecked Sendable, RandomAccessCollection, Hashable {
typealias Element = UInt8

internal let bytes: UnsafeBufferPointer<UInt8>
Expand All @@ -127,6 +124,14 @@ extension DispatchData {
return try body(UnsafeRawBufferPointer(self.bytes))
}

internal func hash(into hasher: inout Hasher) {
hasher.combine(bytes: UnsafeRawBufferPointer(self.bytes))
}

internal static func == (lhs: DispatchData._ContiguousBufferView, rhs: DispatchData._ContiguousBufferView) -> Bool {
return lhs.bytes.elementsEqual(rhs.bytes)
}

subscript(position: Int) -> UInt8 {
_read {
yield self.bytes[position]
Expand Down
2 changes: 0 additions & 2 deletions Sources/Subprocess/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -589,15 +589,13 @@ internal struct TrackedFileDescriptor: ~Copyable {
self.closeWhenDone = closeWhenDone
}

#if os(Windows)
consuming func consumeDiskIO() -> FileDescriptor {
let result = self.fileDescriptor
// Transfer the ownership out and therefor
// don't perform close on deinit
self.closeWhenDone = false
return result
}
#endif

internal mutating func safelyClose() throws {
guard self.closeWhenDone else {
Expand Down
17 changes: 11 additions & 6 deletions Sources/Subprocess/Error.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ extension SubprocessError {
case failedToWriteToSubprocess
case failedToMonitorProcess
case streamOutputExceedsLimit(Int)
case asyncIOFailed(String)
// Signal
case failedToSendSignal(Int32)
// Windows Only
Expand All @@ -67,18 +68,20 @@ extension SubprocessError {
return 5
case .streamOutputExceedsLimit(_):
return 6
case .failedToSendSignal(_):
case .asyncIOFailed(_):
return 7
case .failedToTerminate:
case .failedToSendSignal(_):
return 8
case .failedToSuspend:
case .failedToTerminate:
return 9
case .failedToResume:
case .failedToSuspend:
return 10
case .failedToCreatePipe:
case .failedToResume:
return 11
case .invalidWindowsPath(_):
case .failedToCreatePipe:
return 12
case .invalidWindowsPath(_):
return 13
}
}

Expand Down Expand Up @@ -108,6 +111,8 @@ extension SubprocessError: CustomStringConvertible, CustomDebugStringConvertible
return "Failed to monitor the state of child process with underlying error: \(self.underlyingError!)"
case .streamOutputExceedsLimit(let limit):
return "Failed to create output from current buffer because the output limit (\(limit)) was reached."
case .asyncIOFailed(let reason):
return "An error occurred within the AsyncIO subsystem: \(reason). Underlying error: \(self.underlyingError!)"
case .failedToSendSignal(let signal):
return "Failed to send signal \(signal) to the child process."
case .failedToTerminate:
Expand Down
Loading
Loading