Skip to content

Commit ee6655e

Browse files
committed
Server Netty. Fix rejected execution during engine stop (#8671)
1 parent 87dd92c commit ee6655e

File tree

4 files changed

+29
-25
lines changed

4 files changed

+29
-25
lines changed

ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/NettyApplicationCallHandler.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ internal class NettyApplicationCallHandler(
3535
}
3636
}
3737

38+
override fun channelUnregistered(ctx: ChannelHandlerContext?) {
39+
currentJob?.cancel()
40+
super.channelUnregistered(ctx)
41+
}
42+
3843
private fun handleRequest(context: ChannelHandlerContext, call: PipelineCall) {
3944
val callContext = CallHandlerCoroutineName + NettyDispatcher.CurrentContext(context)
4045

ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/NettyApplicationEngine.kt

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -280,29 +280,32 @@ public class NettyApplicationEngine(
280280
workerEventGroup.shutdownGracefully().sync()
281281
}
282282

283+
private inline fun <R> withStopException(crossinline block: () -> R) {
284+
runCatching(block).onFailure {
285+
environment.log.error("Exception thrown during engine stop", it)
286+
}
287+
}
288+
283289
override fun stop(gracePeriodMillis: Long, timeoutMillis: Long) {
284-
cancellationJob?.complete()
290+
if (cancellationJob?.complete() != true) {
291+
// Engine was already stopped or started stop process
292+
return
293+
}
285294
monitor.raise(ApplicationStopPreparing, environment)
286295
val channelFutures = channels?.mapNotNull { if (it.isOpen) it.close() else null }.orEmpty()
296+
channelFutures.forEach { future ->
297+
withStopException { future.sync() }
298+
}
287299

288-
try {
289-
val shutdownConnections =
290-
connectionEventGroup.shutdownGracefully(gracePeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)
291-
292-
val shutdownWorkers =
293-
workerEventGroup.shutdownGracefully(gracePeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)
294-
if (configuration.shareWorkGroup) {
295-
shutdownWorkers.await()
296-
} else {
297-
val shutdownCall =
298-
callEventGroup.shutdownGracefully(gracePeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)
299-
shutdownWorkers.await()
300-
shutdownCall.await()
300+
val shutdownConnections = connectionEventGroup.shutdownGracefully(gracePeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)
301+
val shutdownWorkers = workerEventGroup.shutdownGracefully(gracePeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS)
302+
if (!configuration.shareWorkGroup) {
303+
withStopException {
304+
callEventGroup.shutdownGracefully(gracePeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).sync()
301305
}
302-
shutdownConnections.await()
303-
} finally {
304-
channelFutures.forEach { it.sync() }
305306
}
307+
withStopException { shutdownConnections.sync() }
308+
withStopException { shutdownWorkers.sync() }
306309
}
307310

308311
override fun toString(): String {

ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/NettyApplicationResponse.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public abstract class NettyApplicationResponse(
5151
// because it should've been set by commitHeaders earlier
5252
val chunked = headers[HttpHeaders.TransferEncoding] == "chunked"
5353

54-
if (responseMessageSent) return
54+
if (responseMessageSent || !context.channel().isActive) return
5555

5656
val message = responseMessage(chunked, bytes)
5757
responseChannel = when (message) {
@@ -111,7 +111,7 @@ public abstract class NettyApplicationResponse(
111111
}
112112

113113
internal fun sendResponse(chunked: Boolean = true, content: ByteReadChannel) {
114-
if (responseMessageSent) return
114+
if (responseMessageSent || !context.channel().isActive) return
115115

116116
responseChannel = content
117117
responseMessage = when {

ktor-server/ktor-server-netty/jvm/test/io/ktor/tests/server/netty/NettyEngineTest.kt

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,8 @@ import io.ktor.websocket.*
2323
import io.netty.buffer.ByteBuf
2424
import io.netty.buffer.Unpooled
2525
import io.netty.handler.codec.http.HttpResponseStatus
26-
import io.netty.handler.codec.http2.DefaultHttp2Headers
27-
import io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder
28-
import io.netty.handler.codec.http2.DefaultHttp2HeadersEncoder
26+
import io.netty.handler.codec.http2.*
2927
import io.netty.handler.codec.http2.Http2CodecUtil.readUnsignedInt
30-
import io.netty.handler.codec.http2.Http2Flags
31-
import io.netty.handler.codec.http2.Http2FrameTypes
3228
import kotlinx.coroutines.flow.consumeAsFlow
3329
import kotlin.test.Test
3430
import kotlin.test.assertEquals
@@ -381,7 +377,7 @@ class NettyH2cEnabledTest :
381377
total += n
382378
val s = buf.decodeToString(0, total)
383379
val end = s.indexOf("\r\n\r\n")
384-
if (end >= 0) return s.substring(0, end + 4)
380+
if (end >= 0) return s.take(end + 4)
385381
require(total < maxBytes) { "HTTP/1.1 headers exceed $maxBytes bytes" }
386382
}
387383
}

0 commit comments

Comments
 (0)