From ec34adc66cb1009fb7ce7bbce4d9f19853c0380c Mon Sep 17 00:00:00 2001 From: Igor Chevdar Date: Fri, 5 May 2017 17:42:42 +0500 Subject: [PATCH] Non-blocking echo server --- samples/nonBlockingEchoServer/EchoServer.kt | 237 ++++++++++++++++++ samples/nonBlockingEchoServer/README.md | 29 +++ samples/nonBlockingEchoServer/build.gradle | 36 +++ samples/nonBlockingEchoServer/build.sh | 22 ++ .../nonBlockingEchoServer/gradle.properties | 1 + .../gradle.properties.for_bundle | 1 + samples/nonBlockingEchoServer/sockets.def | 2 + 7 files changed, 328 insertions(+) create mode 100644 samples/nonBlockingEchoServer/EchoServer.kt create mode 100644 samples/nonBlockingEchoServer/README.md create mode 100644 samples/nonBlockingEchoServer/build.gradle create mode 100755 samples/nonBlockingEchoServer/build.sh create mode 100644 samples/nonBlockingEchoServer/gradle.properties create mode 100644 samples/nonBlockingEchoServer/gradle.properties.for_bundle create mode 100644 samples/nonBlockingEchoServer/sockets.def diff --git a/samples/nonBlockingEchoServer/EchoServer.kt b/samples/nonBlockingEchoServer/EchoServer.kt new file mode 100644 index 00000000000..5f298987dd4 --- /dev/null +++ b/samples/nonBlockingEchoServer/EchoServer.kt @@ -0,0 +1,237 @@ +/* + * Copyright 2010-2017 JetBrains s.r.o. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import kotlinx.cinterop.* +import sockets.* +import kotlin.coroutines.experimental.* +import kotlin.coroutines.experimental.intrinsics.* + +fun main(args: Array) { + if (args.size < 1) { + println("Usage: ./echo_server ") + return + } + + val port = args[0].toShort() + + memScoped { + + val serverAddr = alloc() + + val listenFd = socket(AF_INET, SOCK_STREAM, 0) + .ensureUnixCallResult { it >= 0 } + + with(serverAddr) { + memset(this.ptr, 0, sockaddr_in.size) + sin_family = AF_INET.narrow() + sin_addr.s_addr = htons(0).toInt() + sin_port = htons(port) + } + + bind(listenFd, serverAddr.ptr.reinterpret(), sockaddr_in.size.toInt()) + .ensureUnixCallResult { it == 0 } + + fcntl(listenFd, F_SETFL, O_NONBLOCK) + .ensureUnixCallResult { it == 0 } + + listen(listenFd, 10) + .ensureUnixCallResult { it == 0 } + + var connectionId = 0 + acceptClientsAndRun(listenFd) { + memScoped { + val bufferLength = 100L + val buffer = allocArray(bufferLength) + val connectionIdString = "#${++connectionId}: ".cstr + val connectionIdBytes = connectionIdString.getPointer(this) + + try { + while (true) { + val length = read(buffer, bufferLength) + + if (length == 0L) + break + + write(connectionIdBytes, connectionIdString.size.toLong()) + write(buffer, length) + } + } catch (e: IOException) { + println("I/O error occured: ${e.message}") + } + } + } + } +} + +sealed class WaitingFor { + class Accept : WaitingFor() + + class Read(val data: CArrayPointer, + val length: Long, + val continuation: Continuation) : WaitingFor() + + class Write(val data: CArrayPointer, + val length: Long, + val continuation: Continuation) : WaitingFor() +} + +class Client(val clientFd: Int, val waitingList: MutableMap) { + suspend fun read(data: CArrayPointer, dataLength: Long): Long { + val length = read(clientFd, data, dataLength) + if (length >= 0) + return length + if (errno != EWOULDBLOCK) + throw IOException(getUnixError()) + // Save continuation and suspend. + return suspendCoroutineOrReturn { continuation -> + waitingList.put(clientFd, WaitingFor.Read(data, dataLength, continuation)) + COROUTINE_SUSPENDED + } + } + + suspend fun write(data: CArrayPointer, length: Long) { + val written = write(clientFd, data, length) + if (written >= 0) + return + if (errno != EWOULDBLOCK) + throw IOException(getUnixError()) + // Save continuation and suspend. + return suspendCoroutineOrReturn { continuation -> + waitingList.put(clientFd, WaitingFor.Write(data, length, continuation)) + COROUTINE_SUSPENDED + } + } +} + +open class EmptyContinuation(override val context: CoroutineContext = EmptyCoroutineContext) : Continuation { + companion object : EmptyContinuation() + override fun resume(value: Any?) {} + override fun resumeWithException(exception: Throwable) { throw exception } +} + +fun acceptClientsAndRun(serverFd: Int, block: suspend Client.() -> Unit) { + memScoped { + val waitingList = mutableMapOf(serverFd to WaitingFor.Accept()) + val readfds = alloc() + val writefds = alloc() + val errorfds = alloc() + var maxfd = serverFd + while (true) { + FD_ZERO(readfds) + FD_ZERO(writefds) + FD_ZERO(errorfds) + for ((socketFd, watingFor) in waitingList) { + when (watingFor) { + is WaitingFor.Accept -> FD_SET(socketFd, readfds) + is WaitingFor.Read -> FD_SET(socketFd, readfds) + is WaitingFor.Write -> FD_SET(socketFd, writefds) + } + FD_SET(socketFd, errorfds) + } + pselect(maxfd + 1, readfds.ptr, writefds.ptr, errorfds.ptr, null, null) + .ensureUnixCallResult { it >= 0 } + loop@for (socketFd in 0..maxfd) { + val waitingFor = waitingList[socketFd] + val errorOccured = FD_ISSET(socketFd, errorfds) + if (FD_ISSET(socketFd, readfds) || FD_ISSET(socketFd, writefds) || errorOccured) { + when (waitingFor) { + is WaitingFor.Accept -> { + if (errorOccured) + throw Error("Socket has been closed externally") + + // Accept new client. + val clientFd = accept(serverFd, null, null) + if (clientFd < 0) { + if (errno != EWOULDBLOCK) + throw Error(getUnixError()) + break@loop + } + fcntl(clientFd, F_SETFL, O_NONBLOCK) + .ensureUnixCallResult { it == 0 } + if (maxfd < clientFd) + maxfd = clientFd + block.startCoroutine(Client(clientFd, waitingList), EmptyContinuation) + } + is WaitingFor.Read -> { + if (errorOccured) + waitingFor.continuation.resumeWithException(IOException("Connection was closed by peer")) + + // Resume reading operation. + waitingList.remove(socketFd) + val length = read(socketFd, waitingFor.data, waitingFor.length) + if (length < 0) // Read error. + waitingFor.continuation.resumeWithException(IOException(getUnixError())) + waitingFor.continuation.resume(length) + } + is WaitingFor.Write -> { + if (errorOccured) + waitingFor.continuation.resumeWithException(IOException("Connection was closed by peer")) + + // Resume writing operation. + waitingList.remove(socketFd) + val written = write(socketFd, waitingFor.data, waitingFor.length) + if (written < 0) // Write error. + waitingFor.continuation.resumeWithException(IOException(getUnixError())) + waitingFor.continuation.resume(Unit) + } + } + } + } + } + } +} + +class IOException: RuntimeException { + constructor() : super() { + } + + constructor(message: String): super(message) { + } +} + +val errno: Int + get() = __error()!!.pointed.value + +fun FD_ZERO(set: fd_set) { + memset(set.fds_bits, 0, sizeOf()) +} + +fun FD_SET(bit: Int, set: fd_set) { + set.fds_bits[bit / 32] = set.fds_bits[bit / 32] or (1 shl (bit % 32)) +} + +fun FD_ISSET(bit: Int, set: fd_set): Boolean { + return set.fds_bits[bit / 32] and (1 shl (bit % 32)) != 0 +} + +// Not available through interop because declared as macro: +fun htons(value: Short) = ((value.toInt() ushr 8) or (value.toInt() shl 8)).toShort() + +fun getUnixError() = strerror(errno)!!.toKString() + +inline fun Int.ensureUnixCallResult(predicate: (Int) -> Boolean): Int { + if (!predicate(this)) { + throw Error(getUnixError()) + } + return this +} + +inline fun Long.ensureUnixCallResult(predicate: (Long) -> Boolean): Long { + if (!predicate(this)) { + throw Error(getUnixError()) + } + return this +} \ No newline at end of file diff --git a/samples/nonBlockingEchoServer/README.md b/samples/nonBlockingEchoServer/README.md new file mode 100644 index 00000000000..54f19d95ce3 --- /dev/null +++ b/samples/nonBlockingEchoServer/README.md @@ -0,0 +1,29 @@ +# Non-blocking echo server demo + +This sample shows how to implement multi-client server using coroutines. +IO operations are implemented using non-blocking OS calls, and instead coroutines +are being suspended and resumed whenever relevant. + +Thus, while server can process multiple connections concurrently, +each individual connection handler is written in simple linear manner. + +Compile the echo server (in EAP only supported on Mac host): + + ./build.sh + +You also may use Gradle to build the server: + + ../gradlew build + +Run the server: + + ./EchoServer.kexe 3000 & + +Test the server by conecting to it, for example with telnet: + + telnet localhost 3000 + +Write something to console and watch server echoing it back. + +~~Quit telnet by pressing ctrl+] ctrl+D~~ + diff --git a/samples/nonBlockingEchoServer/build.gradle b/samples/nonBlockingEchoServer/build.gradle new file mode 100644 index 00000000000..8982b2d20c0 --- /dev/null +++ b/samples/nonBlockingEchoServer/build.gradle @@ -0,0 +1,36 @@ +buildscript { + repositories { + mavenCentral() + maven { + url "https://dl.bintray.com/jetbrains/kotlin-native-dependencies" + } + } + + dependencies { + classpath "org.jetbrains.kotlin:kotlin-native-gradle-plugin:0.1" + } +} + +apply plugin: 'konan' + +konanInterop { + sockets { + defFile "sockets.def" + } +} + +konanArtifacts { + EchoServer { + inputFiles project.file("EchoServer.kt") + useInterop "sockets" + } +} + +build { + doLast { + copy { + from compileKonanEchoServer.artifactPath + into projectDir.canonicalPath + } + } +} diff --git a/samples/nonBlockingEchoServer/build.sh b/samples/nonBlockingEchoServer/build.sh new file mode 100755 index 00000000000..4962195956d --- /dev/null +++ b/samples/nonBlockingEchoServer/build.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash + +PATH=../../dist/bin:../../bin:$PATH +DIR=. + +if [ x$TARGET == x ]; then +case "$OSTYPE" in + darwin*) TARGET=macbook ;; + linux*) TARGET=linux ;; + *) echo "unknown: $OSTYPE" && exit 1;; +esac +fi + +var=CFLAGS_${TARGET} +CFLAGS=${!var} +var=LINKER_ARGS_${TARGET} +LINKER_ARGS=${!var} +var=COMPILER_ARGS_${TARGET} +COMPILER_ARGS=${!var} # add -opt for an optimized build. + +cinterop -def $DIR/sockets.def -copt "$CFLAGS" -target $TARGET -o sockets.kt.bc || exit 1 +konanc $COMPILER_ARGS -target $TARGET $DIR/EchoServer.kt -library sockets.kt.bc -o EchoServer.kexe || exit 1 diff --git a/samples/nonBlockingEchoServer/gradle.properties b/samples/nonBlockingEchoServer/gradle.properties new file mode 100644 index 00000000000..196f4b3055e --- /dev/null +++ b/samples/nonBlockingEchoServer/gradle.properties @@ -0,0 +1 @@ +konan.home=../../dist diff --git a/samples/nonBlockingEchoServer/gradle.properties.for_bundle b/samples/nonBlockingEchoServer/gradle.properties.for_bundle new file mode 100644 index 00000000000..5e41860b04f --- /dev/null +++ b/samples/nonBlockingEchoServer/gradle.properties.for_bundle @@ -0,0 +1 @@ +konan.home=../.. diff --git a/samples/nonBlockingEchoServer/sockets.def b/samples/nonBlockingEchoServer/sockets.def new file mode 100644 index 00000000000..3f8f2666168 --- /dev/null +++ b/samples/nonBlockingEchoServer/sockets.def @@ -0,0 +1,2 @@ +headers = sys/socket.h sys/errno.h sys/select.h fcntl.h netdb.h stdio.h string.h unistd.h stdlib.h netinet/in.h +excludeDependentModules.osx = true