Skip to content

Commit 18a6e97

Browse files
authored
Server Netty. Fix rejected execution during engine stop (#8671)
* Server Netty. Fix rejected execution during engine stop (#8671) * Server Netty Tests. Fix infinite timeout in tests.
1 parent c1103e7 commit 18a6e97

File tree

5 files changed

+58
-59
lines changed

5 files changed

+58
-59
lines changed

ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/NettyApplicationEngine.kt

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -280,29 +280,37 @@ public class NettyApplicationEngine(
280280
workerEventGroup.shutdownGracefully().sync()
281281
}
282282

283+
private inline fun <R> withStopException(crossinline block: () -> R) {
284+
runCatching(block).onFailure {
285+
environment.log.error("Exception thrown during engine stop", it)
286+
}
287+
}
288+
283289
override fun stop(gracePeriodMillis: Long, timeoutMillis: Long) {
284290
cancellationJob?.complete()
285291
monitor.raise(ApplicationStopPreparing, environment)
286292
val channelFutures = channels?.mapNotNull { if (it.isOpen) it.close() else null }.orEmpty()
293+
channelFutures.forEach { future ->
294+
withStopException { future.sync() }
295+
}
287296

288-
try {
289-
val shutdownConnections =
290-
connectionEventGroup.shutdownGracefully(gracePeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)
291-
292-
val shutdownWorkers =
293-
workerEventGroup.shutdownGracefully(gracePeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)
294-
if (configuration.shareWorkGroup) {
295-
shutdownWorkers.await()
296-
} else {
297-
val shutdownCall =
298-
callEventGroup.shutdownGracefully(gracePeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)
299-
shutdownWorkers.await()
300-
shutdownCall.await()
297+
val shutdownConnections = connectionEventGroup.shutdownGracefully(
298+
gracePeriodMillis,
299+
timeoutMillis,
300+
TimeUnit.MILLISECONDS
301+
)
302+
val shutdownWorkers = workerEventGroup.shutdownGracefully(
303+
gracePeriodMillis,
304+
timeoutMillis,
305+
TimeUnit.MILLISECONDS
306+
)
307+
if (!configuration.shareWorkGroup) {
308+
withStopException {
309+
callEventGroup.shutdownGracefully(gracePeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).sync()
301310
}
302-
shutdownConnections.await()
303-
} finally {
304-
channelFutures.forEach { it.sync() }
305311
}
312+
withStopException { shutdownConnections.sync() }
313+
withStopException { shutdownWorkers.sync() }
306314
}
307315

308316
override fun toString(): String {

ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/NettyApplicationResponse.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2021 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package io.ktor.server.netty

ktor-server/ktor-server-netty/jvm/test/io/ktor/tests/server/netty/NettyConfigurationTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ class NettyConfigurationTest {
106106
return mockk {
107107
every { register(channel) } returns stubResolvedFuture(channel)
108108
every { shutdownGracefully(any(), any(), any()) } returns mockk {
109-
every { await() } returns mockk()
109+
every { sync() } returns mockk()
110110
}
111111
}
112112
}

ktor-server/ktor-server-netty/jvm/test/io/ktor/tests/server/netty/NettyEngineTest.kt

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,8 @@ import io.ktor.websocket.*
2323
import io.netty.buffer.ByteBuf
2424
import io.netty.buffer.Unpooled
2525
import io.netty.handler.codec.http.HttpResponseStatus
26-
import io.netty.handler.codec.http2.DefaultHttp2Headers
27-
import io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder
28-
import io.netty.handler.codec.http2.DefaultHttp2HeadersEncoder
26+
import io.netty.handler.codec.http2.*
2927
import io.netty.handler.codec.http2.Http2CodecUtil.readUnsignedInt
30-
import io.netty.handler.codec.http2.Http2Flags
31-
import io.netty.handler.codec.http2.Http2FrameTypes
3228
import kotlinx.coroutines.flow.consumeAsFlow
3329
import kotlin.test.Test
3430
import kotlin.test.assertEquals
@@ -381,7 +377,7 @@ class NettyH2cEnabledTest :
381377
total += n
382378
val s = buf.decodeToString(0, total)
383379
val end = s.indexOf("\r\n\r\n")
384-
if (end >= 0) return s.substring(0, end + 4)
380+
if (end >= 0) return s.take(end + 4)
385381
require(total < maxBytes) { "HTTP/1.1 headers exceed $maxBytes bytes" }
386382
}
387383
}

ktor-server/ktor-server-netty/jvm/test/io/ktor/tests/server/netty/NettySpecificTest.kt

Lines changed: 30 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,42 @@
11
/*
2-
* Copyright 2014-2021 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package io.ktor.tests.server.netty
66

7-
import io.ktor.client.HttpClient
8-
import io.ktor.client.engine.cio.CIO
9-
import io.ktor.client.plugins.DefaultRequest
10-
import io.ktor.client.request.get
11-
import io.ktor.http.HttpHeaders
12-
import io.ktor.http.HttpStatusCode
13-
import io.ktor.http.content.OutgoingContent
14-
import io.ktor.network.selector.SelectorManager
15-
import io.ktor.network.sockets.aSocket
16-
import io.ktor.network.sockets.openReadChannel
17-
import io.ktor.network.sockets.openWriteChannel
7+
import io.ktor.client.*
8+
import io.ktor.client.engine.cio.*
9+
import io.ktor.client.plugins.*
10+
import io.ktor.client.request.*
11+
import io.ktor.http.*
12+
import io.ktor.http.content.*
13+
import io.ktor.network.selector.*
14+
import io.ktor.network.sockets.*
1815
import io.ktor.server.application.*
19-
import io.ktor.server.application.hooks.ResponseSent
16+
import io.ktor.server.application.hooks.*
2017
import io.ktor.server.engine.*
2118
import io.ktor.server.netty.*
22-
import io.ktor.server.response.respond
23-
import io.ktor.server.response.respondBytesWriter
24-
import io.ktor.server.routing.get
25-
import io.ktor.server.routing.routing
26-
import io.ktor.utils.io.ByteReadChannel
27-
import io.ktor.utils.io.readUTF8Line
28-
import io.ktor.utils.io.writeStringUtf8
29-
import kotlinx.coroutines.CompletableDeferred
30-
import kotlinx.coroutines.CoroutineScope
31-
import kotlinx.coroutines.Dispatchers
32-
import kotlinx.coroutines.launch
33-
import kotlinx.coroutines.runBlocking
34-
import kotlinx.coroutines.test.runTest
35-
import kotlinx.coroutines.withTimeout
36-
import java.net.*
37-
import java.util.concurrent.*
19+
import io.ktor.server.response.*
20+
import io.ktor.server.routing.*
21+
import io.ktor.test.dispatcher.*
22+
import io.ktor.utils.io.*
23+
import kotlinx.coroutines.*
24+
import java.net.BindException
25+
import java.net.ServerSocket
26+
import java.util.concurrent.ExecutorService
3827
import kotlin.test.*
28+
import kotlin.time.Duration.Companion.seconds
3929

4030
class NettySpecificTest {
4131

4232
@Test
43-
fun testNoLeakWithoutStartAndStop() {
33+
fun testNoLeakWithoutStartAndStop() = runTestWithRealTime {
4434
repeat(100000) {
4535
embeddedServer(Netty, serverConfig { })
4636
}
4737
}
4838

39+
// Doesn't work with real time
4940
@Test
5041
fun testStartOnUsedPort() {
5142
val socket = ServerSocket(0)
@@ -62,7 +53,7 @@ class NettySpecificTest {
6253
}
6354

6455
@Test
65-
fun testStartMultipleConnectorsOnUsedPort() {
56+
fun testStartMultipleConnectorsOnUsedPort() = runTestWithRealTime {
6657
val socket = ServerSocket(0)
6758
val port = socket.localPort
6859

@@ -96,7 +87,7 @@ class NettySpecificTest {
9687
}
9788

9889
@Test
99-
fun contentLengthAndTransferEncodingAreSafelyRemoved() = runTest {
90+
fun contentLengthAndTransferEncodingAreSafelyRemoved() = runTestWithRealTime {
10091
val appStarted = CompletableDeferred<Application>()
10192
val testScope = CoroutineScope(coroutineContext)
10293
val earlyHints = HttpStatusCode(103, "Early Hints")
@@ -156,7 +147,9 @@ class NettySpecificTest {
156147
}
157148

158149
try {
159-
val serverApp = appStarted.await()
150+
val serverApp = withTimeout(10.seconds) {
151+
appStarted.await()
152+
}
160153
val connector = serverApp.engine.resolvedConnectors()[0]
161154
val host = connector.host
162155
val port = connector.port
@@ -179,7 +172,7 @@ class NettySpecificTest {
179172
}
180173

181174
@Test
182-
fun badRequestOnInvalidQueryString() = runBlocking {
175+
fun badRequestOnInvalidQueryString() = runTestWithRealTime {
183176
val appStarted = CompletableDeferred<Application>()
184177

185178
val serverJob = launch(Dispatchers.IO) {
@@ -198,7 +191,9 @@ class NettySpecificTest {
198191
server.start(wait = true)
199192
}
200193

201-
val serverApp = appStarted.await()
194+
val serverApp = withTimeout(10.seconds) {
195+
appStarted.await()
196+
}
202197
val connector = serverApp.engine.resolvedConnectors()[0]
203198

204199
try {

0 commit comments

Comments
 (0)