Skip to content

Commit

Permalink
Non-blocking echo server
Browse files Browse the repository at this point in the history
  • Loading branch information
homuroll committed May 10, 2017
1 parent 39d1b89 commit ec34adc
Show file tree
Hide file tree
Showing 7 changed files with 328 additions and 0 deletions.
237 changes: 237 additions & 0 deletions samples/nonBlockingEchoServer/EchoServer.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
/*
* Copyright 2010-2017 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import kotlinx.cinterop.*
import sockets.*
import kotlin.coroutines.experimental.*
import kotlin.coroutines.experimental.intrinsics.*

fun main(args: Array<String>) {
if (args.size < 1) {
println("Usage: ./echo_server <port>")
return
}

val port = args[0].toShort()

memScoped {

val serverAddr = alloc<sockaddr_in>()

val listenFd = socket(AF_INET, SOCK_STREAM, 0)
.ensureUnixCallResult { it >= 0 }

with(serverAddr) {
memset(this.ptr, 0, sockaddr_in.size)
sin_family = AF_INET.narrow()
sin_addr.s_addr = htons(0).toInt()
sin_port = htons(port)
}

bind(listenFd, serverAddr.ptr.reinterpret(), sockaddr_in.size.toInt())
.ensureUnixCallResult { it == 0 }

fcntl(listenFd, F_SETFL, O_NONBLOCK)
.ensureUnixCallResult { it == 0 }

listen(listenFd, 10)
.ensureUnixCallResult { it == 0 }

var connectionId = 0
acceptClientsAndRun(listenFd) {
memScoped {
val bufferLength = 100L
val buffer = allocArray<ByteVar>(bufferLength)
val connectionIdString = "#${++connectionId}: ".cstr
val connectionIdBytes = connectionIdString.getPointer(this)

try {
while (true) {
val length = read(buffer, bufferLength)

if (length == 0L)
break

write(connectionIdBytes, connectionIdString.size.toLong())
write(buffer, length)
}
} catch (e: IOException) {
println("I/O error occured: ${e.message}")
}
}
}
}
}

sealed class WaitingFor {
class Accept : WaitingFor()

class Read(val data: CArrayPointer<ByteVar>,
val length: Long,
val continuation: Continuation<Long>) : WaitingFor()

class Write(val data: CArrayPointer<ByteVar>,
val length: Long,
val continuation: Continuation<Unit>) : WaitingFor()
}

class Client(val clientFd: Int, val waitingList: MutableMap<Int, WaitingFor>) {
suspend fun read(data: CArrayPointer<ByteVar>, dataLength: Long): Long {
val length = read(clientFd, data, dataLength)
if (length >= 0)
return length
if (errno != EWOULDBLOCK)
throw IOException(getUnixError())
// Save continuation and suspend.
return suspendCoroutineOrReturn { continuation ->
waitingList.put(clientFd, WaitingFor.Read(data, dataLength, continuation))
COROUTINE_SUSPENDED
}
}

suspend fun write(data: CArrayPointer<ByteVar>, length: Long) {
val written = write(clientFd, data, length)
if (written >= 0)
return
if (errno != EWOULDBLOCK)
throw IOException(getUnixError())
// Save continuation and suspend.
return suspendCoroutineOrReturn { continuation ->
waitingList.put(clientFd, WaitingFor.Write(data, length, continuation))
COROUTINE_SUSPENDED
}
}
}

open class EmptyContinuation(override val context: CoroutineContext = EmptyCoroutineContext) : Continuation<Any?> {
companion object : EmptyContinuation()
override fun resume(value: Any?) {}
override fun resumeWithException(exception: Throwable) { throw exception }
}

fun acceptClientsAndRun(serverFd: Int, block: suspend Client.() -> Unit) {
memScoped {
val waitingList = mutableMapOf<Int, WaitingFor>(serverFd to WaitingFor.Accept())
val readfds = alloc<fd_set>()
val writefds = alloc<fd_set>()
val errorfds = alloc<fd_set>()
var maxfd = serverFd
while (true) {
FD_ZERO(readfds)
FD_ZERO(writefds)
FD_ZERO(errorfds)
for ((socketFd, watingFor) in waitingList) {
when (watingFor) {
is WaitingFor.Accept -> FD_SET(socketFd, readfds)
is WaitingFor.Read -> FD_SET(socketFd, readfds)
is WaitingFor.Write -> FD_SET(socketFd, writefds)
}
FD_SET(socketFd, errorfds)
}
pselect(maxfd + 1, readfds.ptr, writefds.ptr, errorfds.ptr, null, null)
.ensureUnixCallResult { it >= 0 }
loop@for (socketFd in 0..maxfd) {
val waitingFor = waitingList[socketFd]
val errorOccured = FD_ISSET(socketFd, errorfds)
if (FD_ISSET(socketFd, readfds) || FD_ISSET(socketFd, writefds) || errorOccured) {
when (waitingFor) {
is WaitingFor.Accept -> {
if (errorOccured)
throw Error("Socket has been closed externally")

// Accept new client.
val clientFd = accept(serverFd, null, null)
if (clientFd < 0) {
if (errno != EWOULDBLOCK)
throw Error(getUnixError())
break@loop
}
fcntl(clientFd, F_SETFL, O_NONBLOCK)
.ensureUnixCallResult { it == 0 }
if (maxfd < clientFd)
maxfd = clientFd
block.startCoroutine(Client(clientFd, waitingList), EmptyContinuation)
}
is WaitingFor.Read -> {
if (errorOccured)
waitingFor.continuation.resumeWithException(IOException("Connection was closed by peer"))

// Resume reading operation.
waitingList.remove(socketFd)
val length = read(socketFd, waitingFor.data, waitingFor.length)
if (length < 0) // Read error.
waitingFor.continuation.resumeWithException(IOException(getUnixError()))
waitingFor.continuation.resume(length)
}
is WaitingFor.Write -> {
if (errorOccured)
waitingFor.continuation.resumeWithException(IOException("Connection was closed by peer"))

// Resume writing operation.
waitingList.remove(socketFd)
val written = write(socketFd, waitingFor.data, waitingFor.length)
if (written < 0) // Write error.
waitingFor.continuation.resumeWithException(IOException(getUnixError()))
waitingFor.continuation.resume(Unit)
}
}
}
}
}
}
}

class IOException: RuntimeException {
constructor() : super() {
}

constructor(message: String): super(message) {
}
}

val errno: Int
get() = __error()!!.pointed.value

fun FD_ZERO(set: fd_set) {
memset(set.fds_bits, 0, sizeOf<fd_set>())
}

fun FD_SET(bit: Int, set: fd_set) {
set.fds_bits[bit / 32] = set.fds_bits[bit / 32] or (1 shl (bit % 32))
}

fun FD_ISSET(bit: Int, set: fd_set): Boolean {
return set.fds_bits[bit / 32] and (1 shl (bit % 32)) != 0
}

// Not available through interop because declared as macro:
fun htons(value: Short) = ((value.toInt() ushr 8) or (value.toInt() shl 8)).toShort()

fun getUnixError() = strerror(errno)!!.toKString()

inline fun Int.ensureUnixCallResult(predicate: (Int) -> Boolean): Int {
if (!predicate(this)) {
throw Error(getUnixError())
}
return this
}

inline fun Long.ensureUnixCallResult(predicate: (Long) -> Boolean): Long {
if (!predicate(this)) {
throw Error(getUnixError())
}
return this
}
29 changes: 29 additions & 0 deletions samples/nonBlockingEchoServer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Non-blocking echo server demo

This sample shows how to implement multi-client server using coroutines.
IO operations are implemented using non-blocking OS calls, and instead coroutines
are being suspended and resumed whenever relevant.

Thus, while server can process multiple connections concurrently,
each individual connection handler is written in simple linear manner.

Compile the echo server (in EAP only supported on Mac host):

./build.sh

You also may use Gradle to build the server:

../gradlew build

Run the server:

./EchoServer.kexe 3000 &

Test the server by conecting to it, for example with telnet:

telnet localhost 3000

Write something to console and watch server echoing it back.

~~Quit telnet by pressing ctrl+] ctrl+D~~

36 changes: 36 additions & 0 deletions samples/nonBlockingEchoServer/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
buildscript {
repositories {
mavenCentral()
maven {
url "https://dl.bintray.com/jetbrains/kotlin-native-dependencies"
}
}

dependencies {
classpath "org.jetbrains.kotlin:kotlin-native-gradle-plugin:0.1"
}
}

apply plugin: 'konan'

konanInterop {
sockets {
defFile "sockets.def"
}
}

konanArtifacts {
EchoServer {
inputFiles project.file("EchoServer.kt")
useInterop "sockets"
}
}

build {
doLast {
copy {
from compileKonanEchoServer.artifactPath
into projectDir.canonicalPath
}
}
}
22 changes: 22 additions & 0 deletions samples/nonBlockingEchoServer/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/usr/bin/env bash

PATH=../../dist/bin:../../bin:$PATH
DIR=.

if [ x$TARGET == x ]; then
case "$OSTYPE" in
darwin*) TARGET=macbook ;;
linux*) TARGET=linux ;;
*) echo "unknown: $OSTYPE" && exit 1;;
esac
fi

var=CFLAGS_${TARGET}
CFLAGS=${!var}
var=LINKER_ARGS_${TARGET}
LINKER_ARGS=${!var}
var=COMPILER_ARGS_${TARGET}
COMPILER_ARGS=${!var} # add -opt for an optimized build.

cinterop -def $DIR/sockets.def -copt "$CFLAGS" -target $TARGET -o sockets.kt.bc || exit 1
konanc $COMPILER_ARGS -target $TARGET $DIR/EchoServer.kt -library sockets.kt.bc -o EchoServer.kexe || exit 1
1 change: 1 addition & 0 deletions samples/nonBlockingEchoServer/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
konan.home=../../dist
1 change: 1 addition & 0 deletions samples/nonBlockingEchoServer/gradle.properties.for_bundle
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
konan.home=../..
2 changes: 2 additions & 0 deletions samples/nonBlockingEchoServer/sockets.def
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
headers = sys/socket.h sys/errno.h sys/select.h fcntl.h netdb.h stdio.h string.h unistd.h stdlib.h netinet/in.h
excludeDependentModules.osx = true

0 comments on commit ec34adc

Please sign in to comment.