Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -280,29 +280,37 @@ public class NettyApplicationEngine(
workerEventGroup.shutdownGracefully().sync()
}

private inline fun <R> withStopException(crossinline block: () -> R) {
runCatching(block).onFailure {
environment.log.error("Exception thrown during engine stop", it)
}
}

override fun stop(gracePeriodMillis: Long, timeoutMillis: Long) {
cancellationJob?.complete()
monitor.raise(ApplicationStopPreparing, environment)
val channelFutures = channels?.mapNotNull { if (it.isOpen) it.close() else null }.orEmpty()
channelFutures.forEach { future ->
withStopException { future.sync() }
}

try {
val shutdownConnections =
connectionEventGroup.shutdownGracefully(gracePeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)

val shutdownWorkers =
workerEventGroup.shutdownGracefully(gracePeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)
if (configuration.shareWorkGroup) {
shutdownWorkers.await()
} else {
val shutdownCall =
callEventGroup.shutdownGracefully(gracePeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)
shutdownWorkers.await()
shutdownCall.await()
val shutdownConnections = connectionEventGroup.shutdownGracefully(
gracePeriodMillis,
timeoutMillis,
TimeUnit.MILLISECONDS
)
val shutdownWorkers = workerEventGroup.shutdownGracefully(
gracePeriodMillis,
timeoutMillis,
TimeUnit.MILLISECONDS
)
if (!configuration.shareWorkGroup) {
withStopException {
callEventGroup.shutdownGracefully(gracePeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).sync()
}
shutdownConnections.await()
} finally {
channelFutures.forEach { it.sync() }
}
withStopException { shutdownConnections.sync() }
withStopException { shutdownWorkers.sync() }
}

override fun toString(): String {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2021 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.server.netty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class NettyConfigurationTest {
return mockk {
every { register(channel) } returns stubResolvedFuture(channel)
every { shutdownGracefully(any(), any(), any()) } returns mockk {
every { await() } returns mockk()
every { sync() } returns mockk()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,8 @@ import io.ktor.websocket.*
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http2.DefaultHttp2Headers
import io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder
import io.netty.handler.codec.http2.DefaultHttp2HeadersEncoder
import io.netty.handler.codec.http2.*
import io.netty.handler.codec.http2.Http2CodecUtil.readUnsignedInt
import io.netty.handler.codec.http2.Http2Flags
import io.netty.handler.codec.http2.Http2FrameTypes
import kotlinx.coroutines.flow.consumeAsFlow
import kotlin.test.Test
import kotlin.test.assertEquals
Expand Down Expand Up @@ -381,7 +377,7 @@ class NettyH2cEnabledTest :
total += n
val s = buf.decodeToString(0, total)
val end = s.indexOf("\r\n\r\n")
if (end >= 0) return s.substring(0, end + 4)
if (end >= 0) return s.take(end + 4)
require(total < maxBytes) { "HTTP/1.1 headers exceed $maxBytes bytes" }
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,51 +1,42 @@
/*
* Copyright 2014-2021 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.tests.server.netty

import io.ktor.client.HttpClient
import io.ktor.client.engine.cio.CIO
import io.ktor.client.plugins.DefaultRequest
import io.ktor.client.request.get
import io.ktor.http.HttpHeaders
import io.ktor.http.HttpStatusCode
import io.ktor.http.content.OutgoingContent
import io.ktor.network.selector.SelectorManager
import io.ktor.network.sockets.aSocket
import io.ktor.network.sockets.openReadChannel
import io.ktor.network.sockets.openWriteChannel
import io.ktor.client.*
import io.ktor.client.engine.cio.*
import io.ktor.client.plugins.*
import io.ktor.client.request.*
import io.ktor.http.*
import io.ktor.http.content.*
import io.ktor.network.selector.*
import io.ktor.network.sockets.*
import io.ktor.server.application.*
import io.ktor.server.application.hooks.ResponseSent
import io.ktor.server.application.hooks.*
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import io.ktor.server.response.respond
import io.ktor.server.response.respondBytesWriter
import io.ktor.server.routing.get
import io.ktor.server.routing.routing
import io.ktor.utils.io.ByteReadChannel
import io.ktor.utils.io.readUTF8Line
import io.ktor.utils.io.writeStringUtf8
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withTimeout
import java.net.*
import java.util.concurrent.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.ktor.test.dispatcher.*
import io.ktor.utils.io.*
import kotlinx.coroutines.*
import java.net.BindException
import java.net.ServerSocket
import java.util.concurrent.ExecutorService
import kotlin.test.*
import kotlin.time.Duration.Companion.seconds

class NettySpecificTest {

@Test
fun testNoLeakWithoutStartAndStop() {
fun testNoLeakWithoutStartAndStop() = runTestWithRealTime {
repeat(100000) {
embeddedServer(Netty, serverConfig { })
}
}

// Doesn't work with real time
@Test
fun testStartOnUsedPort() {
val socket = ServerSocket(0)
Expand All @@ -62,7 +53,7 @@ class NettySpecificTest {
}

@Test
fun testStartMultipleConnectorsOnUsedPort() {
fun testStartMultipleConnectorsOnUsedPort() = runTestWithRealTime {
val socket = ServerSocket(0)
val port = socket.localPort

Expand Down Expand Up @@ -96,7 +87,7 @@ class NettySpecificTest {
}

@Test
fun contentLengthAndTransferEncodingAreSafelyRemoved() = runTest {
fun contentLengthAndTransferEncodingAreSafelyRemoved() = runTestWithRealTime {
val appStarted = CompletableDeferred<Application>()
val testScope = CoroutineScope(coroutineContext)
val earlyHints = HttpStatusCode(103, "Early Hints")
Expand Down Expand Up @@ -156,7 +147,9 @@ class NettySpecificTest {
}

try {
val serverApp = appStarted.await()
val serverApp = withTimeout(10.seconds) {
appStarted.await()
}
val connector = serverApp.engine.resolvedConnectors()[0]
val host = connector.host
val port = connector.port
Expand All @@ -179,7 +172,7 @@ class NettySpecificTest {
}

@Test
fun badRequestOnInvalidQueryString() = runBlocking {
fun badRequestOnInvalidQueryString() = runTestWithRealTime {
val appStarted = CompletableDeferred<Application>()

val serverJob = launch(Dispatchers.IO) {
Expand All @@ -198,7 +191,9 @@ class NettySpecificTest {
server.start(wait = true)
}

val serverApp = appStarted.await()
val serverApp = withTimeout(10.seconds) {
appStarted.await()
}
val connector = serverApp.engine.resolvedConnectors()[0]

try {
Expand Down