-
Notifications
You must be signed in to change notification settings - Fork 66
Add Source and Sink extensions for Apple's NSInputStream and NSOutputStream #174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c0f042f
13bf3b1
3386ab3
6b4bb80
8ca65a2
2be70a6
54e247c
dccd22a
2ba8bed
d7b8e1d
62757c5
613e2be
e5f5c27
f5dfc1a
c9d1f44
242fda0
ead4f78
f9c9305
18ff1d0
c4eb1b9
56bb0e4
9d53c8e
4e3ff87
aa5830c
6f076d9
f789518
feb3145
3508104
9e71d4b
f40d472
caba9b4
afab5b7
365c354
a19c463
0021412
211c5f5
ae33893
5d14977
5cfafa2
e9fcaeb
50fe63f
89a4f80
e296c01
7b3ab06
c74845e
d2d040d
630423c
110e56c
6d034e9
ed5902e
07318d0
4897741
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
/* | ||
* Copyright 2017-2023 JetBrains s.r.o. and respective authors and developers. | ||
* Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. | ||
*/ | ||
|
||
package kotlinx.io | ||
|
||
import kotlinx.cinterop.UnsafeNumber | ||
import platform.Foundation.NSError | ||
import platform.Foundation.NSLocalizedDescriptionKey | ||
import platform.Foundation.NSUnderlyingErrorKey | ||
|
||
@OptIn(UnsafeNumber::class) | ||
internal fun Exception.toNSError() = NSError( | ||
domain = "Kotlin", | ||
code = 0, | ||
userInfo = mapOf( | ||
NSLocalizedDescriptionKey to message, | ||
NSUnderlyingErrorKey to this | ||
) | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
/* | ||
* Copyright 2017-2023 JetBrains s.r.o. and respective authors and developers. | ||
* Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. | ||
*/ | ||
|
||
@file:OptIn(UnsafeNumber::class) | ||
|
||
package kotlinx.io | ||
|
||
import kotlinx.cinterop.* | ||
import platform.Foundation.NSInputStream | ||
import platform.Foundation.NSOutputStream | ||
import platform.Foundation.NSStreamStatusClosed | ||
import platform.Foundation.NSStreamStatusNotOpen | ||
import platform.posix.uint8_tVar | ||
|
||
/** | ||
* Returns [RawSink] that writes to an output stream. | ||
* | ||
* Use [RawSink.buffered] to create a buffered sink from it. | ||
* | ||
* @sample kotlinx.io.samples.KotlinxIoSamplesApple.outputStreamAsSink | ||
*/ | ||
public fun NSOutputStream.asSink(): RawSink = OutputStreamSink(this) | ||
|
||
private open class OutputStreamSink( | ||
private val out: NSOutputStream, | ||
) : RawSink { | ||
|
||
init { | ||
if (out.streamStatus == NSStreamStatusNotOpen) out.open() | ||
} | ||
|
||
override fun write(source: Buffer, byteCount: Long) { | ||
if (out.streamStatus == NSStreamStatusClosed) throw IOException("Stream Closed") | ||
fzhinkin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
checkOffsetAndCount(source.size, 0, byteCount) | ||
var remaining = byteCount | ||
while (remaining > 0) { | ||
val head = source.head!! | ||
val toCopy = minOf(remaining, head.limit - head.pos).toInt() | ||
val bytesWritten = head.data.usePinned { | ||
val bytes = it.addressOf(head.pos).reinterpret<uint8_tVar>() | ||
out.write(bytes, toCopy.convert()).toLong() | ||
} | ||
|
||
if (bytesWritten < 0L) throw IOException(out.streamError?.localizedDescription ?: "Unknown error") | ||
jeffdgr8 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (bytesWritten == 0L) throw IOException("NSOutputStream reached capacity") | ||
|
||
head.pos += bytesWritten.toInt() | ||
remaining -= bytesWritten | ||
source.size -= bytesWritten | ||
|
||
if (head.pos == head.limit) { | ||
source.head = head.pop() | ||
SegmentPool.recycle(head) | ||
} | ||
} | ||
} | ||
|
||
override fun flush() { | ||
// no-op | ||
} | ||
|
||
override fun close() = out.close() | ||
|
||
override fun toString() = "RawSink($out)" | ||
} | ||
|
||
/** | ||
* Returns [RawSource] that reads from an input stream. | ||
* | ||
* Use [RawSource.buffered] to create a buffered source from it. | ||
* | ||
* @sample kotlinx.io.samples.KotlinxIoSamplesApple.inputStreamAsSource | ||
*/ | ||
public fun NSInputStream.asSource(): RawSource = NSInputStreamSource(this) | ||
|
||
private open class NSInputStreamSource( | ||
private val input: NSInputStream, | ||
) : RawSource { | ||
|
||
init { | ||
if (input.streamStatus == NSStreamStatusNotOpen) input.open() | ||
jeffdgr8 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
override fun readAtMostTo(sink: Buffer, byteCount: Long): Long { | ||
if (input.streamStatus == NSStreamStatusClosed) throw IOException("Stream Closed") | ||
|
||
if (byteCount == 0L) return 0L | ||
checkByteCount(byteCount) | ||
|
||
val tail = sink.writableSegment(1) | ||
val maxToCopy = minOf(byteCount, Segment.SIZE - tail.limit) | ||
val bytesRead = tail.data.usePinned { | ||
val bytes = it.addressOf(tail.limit).reinterpret<uint8_tVar>() | ||
input.read(bytes, maxToCopy.convert()).toLong() | ||
} | ||
|
||
if (bytesRead < 0L) throw IOException(input.streamError?.localizedDescription ?: "Unknown error") | ||
jeffdgr8 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (bytesRead == 0L) { | ||
if (tail.pos == tail.limit) { | ||
// We allocated a tail segment, but didn't end up needing it. Recycle! | ||
sink.head = tail.pop() | ||
SegmentPool.recycle(tail) | ||
} | ||
return -1 | ||
} | ||
tail.limit += bytesRead.toInt() | ||
sink.size += bytesRead | ||
return bytesRead | ||
} | ||
|
||
override fun close() = input.close() | ||
|
||
override fun toString() = "RawSource($input)" | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
/* | ||
* Copyright 2017-2023 JetBrains s.r.o. and respective authors and developers. | ||
* Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. | ||
*/ | ||
|
||
@file:OptIn(UnsafeNumber::class) | ||
jeffdgr8 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
package kotlinx.io | ||
|
||
import kotlinx.cinterop.* | ||
import platform.Foundation.* | ||
import platform.darwin.ByteVar | ||
import platform.darwin.NSUIntegerMax | ||
import platform.posix.* | ||
|
||
internal fun Buffer.write(source: CPointer<uint8_tVar>, maxLength: Int) { | ||
require(maxLength >= 0) { "maxLength ($maxLength) must not be negative" } | ||
|
||
var currentOffset = 0 | ||
while (currentOffset < maxLength) { | ||
val tail = writableSegment(1) | ||
|
||
val toCopy = minOf(maxLength - currentOffset, Segment.SIZE - tail.limit) | ||
tail.data.usePinned { | ||
memcpy(it.addressOf(tail.pos), source + currentOffset, toCopy.convert()) | ||
} | ||
|
||
currentOffset += toCopy | ||
tail.limit += toCopy | ||
} | ||
size += maxLength | ||
} | ||
|
||
internal fun Buffer.readAtMostTo(sink: CPointer<uint8_tVar>, maxLength: Int): Int { | ||
require(maxLength >= 0) { "maxLength ($maxLength) must not be negative" } | ||
|
||
val s = head ?: return 0 | ||
val toCopy = minOf(maxLength, s.limit - s.pos) | ||
s.data.usePinned { | ||
memcpy(sink, it.addressOf(s.pos), toCopy.convert()) | ||
} | ||
|
||
s.pos += toCopy | ||
size -= toCopy.toLong() | ||
|
||
if (s.pos == s.limit) { | ||
head = s.pop() | ||
SegmentPool.recycle(s) | ||
} | ||
|
||
return toCopy | ||
} | ||
|
||
internal fun Buffer.snapshotAsNSData(): NSData { | ||
if (size == 0L) return NSData.data() | ||
|
||
check(size.toULong() <= NSUIntegerMax) { "Buffer is too long ($size) to be converted into NSData." } | ||
|
||
val bytes = malloc(size.convert())?.reinterpret<uint8_tVar>() | ||
?: throw Error("malloc failed: ${strerror(errno)?.toKString()}") | ||
var curr = head | ||
var index = 0 | ||
do { | ||
check(curr != null) { "Current segment is null" } | ||
val pos = curr.pos | ||
val length = curr.limit - pos | ||
curr.data.usePinned { | ||
memcpy(bytes + index, it.addressOf(pos), length.convert()) | ||
} | ||
curr = curr.next | ||
index += length | ||
} while (curr !== head) | ||
return NSData.create(bytesNoCopy = bytes, length = size.convert()) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
/* | ||
* Copyright 2017-2023 JetBrains s.r.o. and respective authors and developers. | ||
* Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. | ||
*/ | ||
|
||
package kotlinx.io | ||
|
||
import kotlinx.cinterop.* | ||
import platform.Foundation.* | ||
import platform.darwin.NSInteger | ||
import platform.darwin.NSUInteger | ||
import platform.posix.uint8_tVar | ||
import kotlin.native.ref.WeakReference | ||
|
||
/** | ||
* Returns an output stream that writes to this sink. Closing the stream will also close this sink. | ||
* | ||
jeffdgr8 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* The stream supports both polling and run-loop scheduling, please check | ||
* [Apple's documentation](https://developer.apple.com/library/archive/documentation/Cocoa/Conceptual/Streams/Articles/PollingVersusRunloop.html) | ||
* for information about stream events handling. | ||
* | ||
* The stream does not implement initializers | ||
* ([NSOutputStream.initToBuffer](https://developer.apple.com/documentation/foundation/nsoutputstream/1410805-inittobuffer), | ||
* [NSOutputStream.initToMemory](https://developer.apple.com/documentation/foundation/nsoutputstream/1409909-inittomemory), | ||
* [NSOutputStream.initWithURL](https://developer.apple.com/documentation/foundation/nsoutputstream/1414446-initwithurl), | ||
* [NSOutputStream.initToFileAtPath](https://developer.apple.com/documentation/foundation/nsoutputstream/1416367-inittofileatpath)), | ||
* their use will result in a runtime error. | ||
* | ||
* @sample kotlinx.io.samples.KotlinxIoSamplesApple.asStream | ||
*/ | ||
public fun Sink.asNSOutputStream(): NSOutputStream = SinkNSOutputStream(this) | ||
|
||
@OptIn(UnsafeNumber::class) | ||
private class SinkNSOutputStream( | ||
private val sink: Sink | ||
) : NSOutputStream(toMemory = Unit), NSStreamDelegateProtocol { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure how things work under the hood when inheriting NSInputStream/NSOutputStream and calling a non-default initializer (I understand that no-arg initializer is not available here) as according to Apple's docs, these initializers aimed to instantiate some specific stream's subclass. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was the only way I found to appease the requirement I'm not sure there really should be an expected designated initializer in Kotlin for either of these classes. This is likely an interop error. In Objective-C this compiles and I'm able to construct
It works to use either the memory or URL constructor of both There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Related issues: https://youtrack.jetbrains.com/issue/KT-47992 Looks like there is a workaround for disabling in custom .def interop, but this won't work for the platform interop in this case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeap, seems like there's no way to inherit streams using no-args There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In fact, I was mostly concerned about the possibility of inheriting some methods from NSInput/OutputStream subclasses instantiated for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added the suggested doc comments. It would be strange for a consumer to call any of those There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for updating the doc! Yes, it should be possible to call these initializers only from Objective-C code and it seems like nobody actually does that, but it is still possible :') |
||
|
||
private val isClosed: () -> Boolean = when (sink) { | ||
is RealSink -> sink::closed | ||
is Buffer -> { | ||
{ false } | ||
} | ||
} | ||
|
||
private var status = NSStreamStatusNotOpen | ||
fzhinkin marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct me if I'm wrong, but status transition diagrams seems to look like this: graph TD
NSStreamStatusNotOpen -->|open| NSStreamStatusOpen;
NSStreamStatusNotOpen -->|close| NSStreamStatusClosed;
NSStreamStatusOpen -->|write| NSStreamStatusWriting;
NSStreamStatusWriting -->|write| NSStreamStatusOpen;
NSStreamStatusOpen -->|error| NSStreamStatusError;
NSStreamStatusNotOpen -->|write->error| NSStreamStatusError;
NSStreamStatusClosed -->|write->error| NSStreamStatusError;
NSStreamStatusWriting -->|error| NSStreamStatusError;
NSStreamStatusOpening;
NSStreamStatusReading;
NSStreamStatusAtEnd;
Do all the transitions make sense? Should we change the status to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the stream is not open, I don't think we should go from We should probably mirror the behavior of the platform streams and not allow closing a stream that has never been opened. I'll make that change. You're missing the close from the Where are you getting the I guess we just pass right through Sinks don't provide a mechanism to be limited in capacity, so we never get This is what I believe the status lifecycle should look like after these changes: graph TD
NSStreamStatusNotOpen -->|open| NSStreamStatusOpening;
NSStreamStatusOpening -->|open| NSStreamStatusOpen;
NSStreamStatusOpen -->|close| NSStreamStatusClosed;
NSStreamStatusOpen -->|write| NSStreamStatusWriting;
NSStreamStatusWriting -->|write| NSStreamStatusOpen;
NSStreamStatusWriting -->|error| NSStreamStatusError;
NSStreamStatusReading;
NSStreamStatusAtEnd;
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That's possible when the stream is still open, but the underlying sink is closed and someone is calling
My understanding is that it's mainly intended for streams with complex opening protocols, like network connections. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Oh, ok. This should be changed now where any |
||
private var error: NSError? = null | ||
set(value) { | ||
status = NSStreamStatusError | ||
field = value | ||
postEvent(NSStreamEventErrorOccurred) | ||
sink.close() | ||
} | ||
|
||
override fun streamStatus() = if (status != NSStreamStatusError && isClosed()) NSStreamStatusClosed else status | ||
|
||
override fun streamError() = error | ||
|
||
override fun open() { | ||
if (status == NSStreamStatusNotOpen) { | ||
status = NSStreamStatusOpening | ||
status = NSStreamStatusOpen | ||
postEvent(NSStreamEventOpenCompleted) | ||
postEvent(NSStreamEventHasSpaceAvailable) | ||
} | ||
} | ||
|
||
override fun close() { | ||
if (status == NSStreamStatusError || status == NSStreamStatusNotOpen) return | ||
status = NSStreamStatusClosed | ||
fzhinkin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
runLoop = null | ||
runLoopModes = listOf() | ||
sink.close() | ||
} | ||
|
||
@OptIn(DelicateIoApi::class) | ||
override fun write(buffer: CPointer<uint8_tVar>?, maxLength: NSUInteger): NSInteger { | ||
if (streamStatus != NSStreamStatusOpen || buffer == null) return -1 | ||
status = NSStreamStatusWriting | ||
val toWrite = minOf(maxLength, Int.MAX_VALUE.convert()).toInt() | ||
return try { | ||
sink.writeToInternalBuffer { | ||
it.write(buffer, toWrite) | ||
} | ||
status = NSStreamStatusOpen | ||
toWrite.convert() | ||
} catch (e: Exception) { | ||
error = e.toNSError() | ||
-1 | ||
} | ||
} | ||
|
||
override fun hasSpaceAvailable() = !isFinished | ||
|
||
private val isFinished | ||
get() = when (streamStatus) { | ||
NSStreamStatusClosed, NSStreamStatusError -> true | ||
else -> false | ||
} | ||
|
||
@OptIn(InternalIoApi::class) | ||
override fun propertyForKey(key: NSStreamPropertyKey): Any? = when (key) { | ||
NSStreamDataWrittenToMemoryStreamKey -> sink.buffer.snapshotAsNSData() | ||
else -> null | ||
} | ||
|
||
override fun setProperty(property: Any?, forKey: NSStreamPropertyKey) = false | ||
|
||
// WeakReference as delegate should not be retained | ||
// https://developer.apple.com/documentation/foundation/nsstream/1418423-delegate | ||
private var _delegate: WeakReference<NSStreamDelegateProtocol>? = null | ||
private var runLoop: NSRunLoop? = null | ||
private var runLoopModes = listOf<NSRunLoopMode>() | ||
|
||
private fun postEvent(event: NSStreamEvent) { | ||
val runLoop = runLoop ?: return | ||
runLoop.performInModes(runLoopModes) { | ||
if (runLoop == this.runLoop) { | ||
delegateOrSelf.stream(this, event) | ||
} | ||
} | ||
} | ||
|
||
override fun delegate() = _delegate?.value | ||
|
||
private val delegateOrSelf get() = delegate ?: this | ||
|
||
override fun setDelegate(delegate: NSStreamDelegateProtocol?) { | ||
_delegate = delegate?.let { WeakReference(it) } | ||
} | ||
|
||
override fun stream(aStream: NSStream, handleEvent: NSStreamEvent) { | ||
// no-op | ||
} | ||
|
||
override fun scheduleInRunLoop(aRunLoop: NSRunLoop, forMode: NSRunLoopMode) { | ||
if (runLoop == null) { | ||
runLoop = aRunLoop | ||
} | ||
if (runLoop == aRunLoop) { | ||
runLoopModes += forMode | ||
} | ||
if (status == NSStreamStatusOpen) { | ||
postEvent(NSStreamEventHasSpaceAvailable) | ||
} | ||
} | ||
|
||
override fun removeFromRunLoop(aRunLoop: NSRunLoop, forMode: NSRunLoopMode) { | ||
if (aRunLoop == runLoop) { | ||
runLoopModes -= forMode | ||
if (runLoopModes.isEmpty()) { | ||
runLoop = null | ||
} | ||
} | ||
} | ||
|
||
override fun description() = "$sink.asNSOutputStream()" | ||
} |
Uh oh!
There was an error while loading. Please reload this page.