Skip to content

Commit

Permalink
fixup! KTOR-1159 Add support for UDP sockets on native
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas-Vos committed Nov 15, 2020
1 parent 0a8a211 commit a09e961
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2014-2020 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.network.sockets

import io.ktor.network.selector.*
import io.ktor.network.util.*
import io.ktor.utils.io.core.*
import io.ktor.utils.io.errors.*
import kotlinx.cinterop.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.*
import platform.posix.*

internal class DatagramSendChannel(
val descriptor: Int,
val socket: DatagramSocketImpl
) : SendChannel<Datagram> {
@ExperimentalCoroutinesApi
override val isClosedForSend: Boolean
get() = socket.isClosed

@ExperimentalCoroutinesApi
override val isFull: Boolean
get() = if (isClosedForSend) false else lock.isLocked

private val lock = Mutex()

override fun close(cause: Throwable?): Boolean {
// if (socket.isClosed) {
// return false
// }
//
// socket.close()
return true
}


override fun offer(element: Datagram): Boolean {
TODO("not implemented")
}

override suspend fun send(element: Datagram) {
lock.withLock {
sendImpl(element)
}
}

private tailrec suspend fun sendImpl(
datagram: Datagram,
bytes: ByteArray = datagram.packet.readBytes()
) {
var bytesWritten: Int? = null
bytes.usePinned { pinned ->
datagram.address.address.nativeAddress { address, addressSize ->
bytesWritten = sendto(
descriptor,
pinned.addressOf(0),
bytes.size.convert(),
0,
address,
addressSize
).toInt()
}
}
when (bytesWritten ?: error("bytesWritten cannot be null")) {
0 -> throw IOException("Failed writing to closed socket")
-1 -> {
if (errno == EAGAIN) {
socket.selector.select(socket.selectable, SelectInterest.WRITE)
sendImpl(datagram, bytes)
} else {
throw PosixException.forErrno()
}
}
}
}

override val onSend: SelectClause2<Datagram, SendChannel<Datagram>>
get() = TODO("[DatagramSendChannel] doesn't support [onSend] select clause")

@ExperimentalCoroutinesApi
override fun invokeOnClose(handler: (cause: Throwable?) -> Unit) {
TODO("[DatagramSendChannel] doesn't support [invokeOnClose] operation.")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ import kotlin.coroutines.*

internal class DatagramSocketImpl(
private val descriptor: Int,
private val selector: SelectorManager,
val selector: SelectorManager,
private val _localAddress: NetworkAddress,
private val _remoteAddress: NetworkAddress?,
parent: CoroutineContext = EmptyCoroutineContext
) : BoundDatagramSocket, ConnectedDatagramSocket, Socket, CoroutineScope {
private val _context: CompletableJob = Job(parent[Job])
private val selectable: SelectableNative = SelectableNative(descriptor)
val selectable: SelectableNative = SelectableNative(descriptor)

override val coroutineContext: CoroutineContext = parent + Dispatchers.Unconfined + _context

Expand All @@ -36,7 +36,7 @@ internal class DatagramSocketImpl(
override val remoteAddress: NetworkAddress
get() = _remoteAddress!! // TODO: What should happen here?

private val sender = Channel<Datagram>()
private val sender: SendChannel<Datagram> = DatagramSendChannel(descriptor, this)

private val receiver = produce<Datagram>(coroutineContext) {
while (true) {
Expand All @@ -47,12 +47,6 @@ internal class DatagramSocketImpl(

init {
makeShared()

launch {
sender.consumeEach { datagram ->
sendImpl(datagram)
}
}
}

override val outgoing: SendChannel<Datagram>
Expand All @@ -66,36 +60,6 @@ internal class DatagramSocketImpl(
override fun attachForWriting(channel: ByteChannel): ReaderJob =
attachForWritingImpl(channel, descriptor, selectable, selector)

private tailrec suspend fun sendImpl(
datagram: Datagram,
bytes: ByteArray = datagram.packet.readBytes()
) {
var bytesWritten: Int? = null
bytes.usePinned { pinned ->
datagram.address.address.nativeAddress { address, addressSize ->
bytesWritten = sendto(
descriptor,
pinned.addressOf(0),
bytes.size.convert(),
0,
address,
addressSize
).toInt()
}
}
when (bytesWritten ?: error("bytesWritten cannot be null")) {
0 -> throw IOException("Failed writing to closed socket")
-1 -> {
if (errno == EAGAIN) {
selector.select(selectable, SelectInterest.WRITE)
sendImpl(datagram, bytes)
} else {
throw PosixException.forErrno()
}
}
}
}

private tailrec suspend fun receiveImpl(
buffer: IoBuffer = DefaultDatagramByteBufferPool.borrow()
): Datagram {
Expand Down

0 comments on commit a09e961

Please sign in to comment.