Skip to content

Commit 5621540

Browse files
committed
KTOR-8204 Fix for exception on empty chunked body response
1 parent c9712a3 commit 5621540

File tree

5 files changed

+66
-55
lines changed

5 files changed

+66
-55
lines changed

ktor-client/ktor-client-core/common/src/io/ktor/client/plugins/sse/DefaultClientSSESession.kt

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class DefaultClientSSESession(
4545
// we have an outer while to obtain new input
4646
while (this@DefaultClientSSESession.coroutineContext.isActive) {
4747
while (this@DefaultClientSSESession.coroutineContext.isActive) {
48-
val event = input.parseEvent() ?: break
48+
val event = input.tryParseEvent() ?: break
4949

5050
if (event.isCommentsEvent() && !showCommentEvents) continue
5151
if (event.isRetryEvent() && !showRetryEvents) continue
@@ -140,6 +140,14 @@ public class DefaultClientSSESession(
140140
input.cancel()
141141
}
142142

143+
private suspend fun ByteReadChannel.tryParseEvent(): ServerSentEvent? =
144+
try {
145+
parseEvent()
146+
} catch (_: ClosedByteChannelException) {
147+
// this is expected when the server disconnects
148+
null
149+
}
150+
143151
private suspend fun ByteReadChannel.parseEvent(): ServerSentEvent? {
144152
val data = StringBuilder()
145153
val comments = StringBuilder()

ktor-client/ktor-client-tests/common/test/io/ktor/client/tests/plugins/ServerSentEventsTest.kt

Lines changed: 42 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import io.ktor.sse.*
1919
import io.ktor.utils.io.*
2020
import io.ktor.utils.io.charsets.*
2121
import kotlinx.coroutines.*
22-
import kotlinx.coroutines.flow.collect
2322
import kotlinx.coroutines.flow.collectIndexed
2423
import kotlinx.coroutines.flow.single
2524
import kotlinx.coroutines.flow.take
@@ -33,8 +32,6 @@ import kotlin.coroutines.resume
3332
import kotlin.coroutines.suspendCoroutine
3433
import kotlin.test.*
3534
import kotlin.time.Duration.Companion.milliseconds
36-
import kotlin.time.Duration.Companion.seconds
37-
import kotlin.time.measureTime
3835

3936
class ServerSentEventsTest : ClientLoader() {
4037

@@ -138,19 +135,23 @@ class ServerSentEventsTest : ClientLoader() {
138135

139136
test { client ->
140137
coroutineScope {
141-
val job = launch {
142-
val session = client.serverSentEventsSession("$TEST_SERVER/sse/hello?times=20&interval=100")
138+
val started = Job()
139+
val session = client.serverSentEventsSession("$TEST_SERVER/sse/hello?times=20&interval=100")
140+
val readJob = launch {
143141
try {
144-
session.incoming.collect()
142+
session.incoming.collect {
143+
started.complete()
144+
}
145145
} finally {
146146
withContext(NonCancellable) {
147-
delay(500)
147+
started.join()
148148
assertFalse(session.isActive)
149149
}
150150
}
151151
}
152-
delay(500)
153-
job.cancelAndJoin()
152+
started.join()
153+
readJob.cancelAndJoin()
154+
assertFalse(session.isActive)
154155
}
155156
}
156157
}
@@ -582,25 +583,22 @@ class ServerSentEventsTest : ClientLoader() {
582583
config {
583584
install(SSE) {
584585
maxReconnectionAttempts = 1
585-
reconnectionTime = 2.seconds
586+
reconnectionTime = 100.milliseconds
586587
}
587588
}
588589

589590
test { client ->
590591
val events = mutableListOf<ServerSentEvent>()
591592

592-
val time = measureTime {
593-
client.sse("$TEST_SERVER/sse/reconnection?count=5") {
594-
incoming.take(10).collect {
595-
events.add(it)
596-
}
593+
client.sse("$TEST_SERVER/sse/reconnection?count=5") {
594+
incoming.take(10).collect {
595+
events.add(it)
597596
}
598597
}
599598

600599
events.forEachIndexed { index, event ->
601600
assertEquals(index + 1, event.id?.toInt())
602601
}
603-
assertTrue { time > 2.seconds }
604602
}
605603
}
606604

@@ -617,7 +615,7 @@ class ServerSentEventsTest : ClientLoader() {
617615
var count = 0
618616

619617
assertFailsWith<IllegalStateException> {
620-
client.sse("$TEST_SERVER/sse/reconnection?count=5", reconnectionTime = 1.seconds) {
618+
client.sse("$TEST_SERVER/sse/reconnection?count=5", reconnectionTime = 10.milliseconds) {
621619
incoming.collect {
622620
events.add(it)
623621
count++
@@ -669,7 +667,6 @@ class ServerSentEventsTest : ClientLoader() {
669667
fun testSeveralReconnections() = clientTests(except("OkHttp")) {
670668
config {
671669
install(SSE) {
672-
reconnectionTime = 500.milliseconds
673670
maxReconnectionAttempts = 2
674671
}
675672
}
@@ -678,14 +675,12 @@ class ServerSentEventsTest : ClientLoader() {
678675
val events = mutableListOf<ServerSentEvent>()
679676
var count = 0
680677

681-
val time = measureTime {
682-
client.sse("$TEST_SERVER/sse/reconnection?count=5") {
683-
incoming.collect {
684-
events.add(it)
685-
count++
686-
if (count == 15) {
687-
cancel()
688-
}
678+
client.sse("$TEST_SERVER/sse/reconnection?count=5", reconnectionTime = 10.milliseconds) {
679+
incoming.collect {
680+
events.add(it)
681+
count++
682+
if (count == 15) {
683+
cancel()
689684
}
690685
}
691686
}
@@ -694,15 +689,14 @@ class ServerSentEventsTest : ClientLoader() {
694689
events.forEachIndexed { index, event ->
695690
assertEquals(index + 1, event.id?.toInt())
696691
}
697-
assertTrue { 1.seconds < time && time < 2.seconds }
698692
}
699693
}
700694

701695
@Test
702696
fun testMaxRetries() = clientTests(except("OkHttp")) {
703697
config {
704698
install(SSE) {
705-
reconnectionTime = 500.milliseconds
699+
reconnectionTime = 10.milliseconds
706700
maxReconnectionAttempts = 4
707701
}
708702
}
@@ -711,14 +705,12 @@ class ServerSentEventsTest : ClientLoader() {
711705
val events = mutableListOf<ServerSentEvent>()
712706
var count = 0
713707

714-
val time = measureTime {
715-
client.sse("$TEST_SERVER/sse/exception-on-reconnection?count=5&count-of-reconnections=4") {
716-
incoming.collect {
717-
events.add(it)
718-
count++
719-
if (count == 10) {
720-
cancel()
721-
}
708+
client.sse("$TEST_SERVER/sse/exception-on-reconnection?count=5&count-of-reconnections=4") {
709+
incoming.collect {
710+
events.add(it)
711+
count++
712+
if (count == 10) {
713+
cancel()
722714
}
723715
}
724716
}
@@ -727,7 +719,6 @@ class ServerSentEventsTest : ClientLoader() {
727719
events.forEachIndexed { index, event ->
728720
assertEquals(index % 5, event.id?.toInt())
729721
}
730-
assertTrue { 2.seconds < time && time < 3.seconds }
731722
}
732723
}
733724

@@ -741,7 +732,6 @@ class ServerSentEventsTest : ClientLoader() {
741732
}
742733

743734
test { client ->
744-
745735
client.sse("$TEST_SERVER/sse/no-content") {
746736
assertEquals(HttpStatusCode.NoContent, call.response.status)
747737
assertEquals(0, incoming.toList().size)
@@ -759,4 +749,18 @@ class ServerSentEventsTest : ClientLoader() {
759749
}
760750
}
761751
}
752+
753+
@Test
754+
fun testNoContentStream() = clientTests(except("OkHttp")) {
755+
config {
756+
install(SSE)
757+
}
758+
759+
test { client ->
760+
client.sse("$TEST_SERVER/sse/no-events") {
761+
assertEquals(HttpStatusCode.OK, call.response.status)
762+
assertEquals(0, incoming.toList().size)
763+
}
764+
}
765+
}
762766
}

ktor-http/ktor-http-cio/common/src/io/ktor/http/cio/ChunkedTransferEncoding.kt

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,8 @@ public suspend fun decodeChunked(input: ByteReadChannel, out: ByteWriteChannel)
6969
var totalBytesCopied = 0L
7070

7171
try {
72-
while (true) {
73-
chunkSizeBuffer.clear()
74-
if (!input.readUTF8LineTo(chunkSizeBuffer, MAX_CHUNK_SIZE_LENGTH, httpLineEndings)) {
75-
throw EOFException("Chunked stream has ended unexpectedly: no chunk size")
76-
} else if (chunkSizeBuffer.isEmpty()) {
72+
while (input.readUTF8LineTo(chunkSizeBuffer, MAX_CHUNK_SIZE_LENGTH, httpLineEndings)) {
73+
if (chunkSizeBuffer.isEmpty()) {
7774
throw EOFException("Invalid chunk size: empty")
7875
}
7976

ktor-http/ktor-http-cio/jvm/test/io/ktor/tests/http/cio/ChunkedTest.kt

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,6 @@ import kotlin.test.assertTrue
2424

2525
class ChunkedTest {
2626

27-
@Test
28-
fun testEmptyBroken(): Unit = runBlocking {
29-
val bodyText = ""
30-
val ch = ByteReadChannel(bodyText.toByteArray())
31-
val parsed = ByteChannel()
32-
33-
assertFailsWith<EOFException> {
34-
decodeChunked(ch, parsed)
35-
}
36-
}
37-
3827
@Test
3928
fun testChunkedWithContentLength() = runBlocking {
4029
val chunkedContent = listOf(
@@ -71,6 +60,16 @@ class ChunkedTest {
7160
}
7261
}
7362

63+
@Test
64+
fun testEmptyNoCRLF(): Unit = runBlocking {
65+
val bodyText = ""
66+
val ch = ByteReadChannel(bodyText.toByteArray())
67+
val parsed = ByteChannel()
68+
69+
decodeChunked(ch, parsed)
70+
assertEquals("", parsed.readBuffer().readText())
71+
}
72+
7473
@Test
7574
fun testEmpty() = runBlocking {
7675
val bodyText = "0\r\n\r\n"

ktor-test-server/src/main/kotlin/test/server/tests/ServerSentEvents.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,9 @@ internal fun Application.serverSentEvents() {
153153
get("no-content") {
154154
call.respond(HttpStatusCode.NoContent)
155155
}
156+
get("no-events") {
157+
call.respondBytesWriter(ContentType.Text.EventStream) {}
158+
}
156159
get("no-content-after-reconnection") {
157160
val count = call.parameters["count"]?.toInt() ?: 0
158161
val lastEventId = call.request.header("Last-Event-ID")

0 commit comments

Comments
 (0)