Skip to content

Commit

Permalink
Properly handle unexhausted network responses (ChuckerTeam#288)
Browse files Browse the repository at this point in the history
* Handle properly not consumed upstream body.
* Handle IO issues while reading from file.
  • Loading branch information
MiSikora authored Mar 27, 2020
1 parent 8d2e67c commit 9bafb00
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ class ChuckerInterceptor internal constructor(
override fun onSuccess(file: File) {
val buffer = readResponseBuffer(file, response.isGzipped)
file.delete()
processResponseBody(response, buffer, transaction)
if (buffer != null) processResponseBody(response, buffer, transaction)
collector.onResponseReceived(transaction)
}

Expand All @@ -253,15 +253,17 @@ class ChuckerInterceptor internal constructor(
collector.onResponseReceived(transaction)
}

private fun readResponseBuffer(responseBody: File, isGzipped: Boolean): Buffer {
private fun readResponseBuffer(responseBody: File, isGzipped: Boolean): Buffer? {
val bufferedSource = Okio.buffer(Okio.source(responseBody))
val source = if (isGzipped) {
GzipSource(bufferedSource)
} else {
bufferedSource
}
return Buffer().apply {
writeAll(source)
return try {
Buffer().apply { writeAll(source) }
} catch (_: IOException) {
null
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import okio.Timeout
* to a [sideChannel] file. After the [upstream] is depleted or when a failure occurs
* an appropriate [callback] method is called.
*
* Failure is considered any [IOException] during reading the bytes or exceeding [readBytesLimit] length.
* Failure is considered any [IOException] during reading the bytes,
* exceeding [readBytesLimit] length or not reading the whole upstream.
*/
internal class TeeSource(
private val upstream: Source,
Expand All @@ -25,8 +26,9 @@ internal class TeeSource(
) : Source {
private val sideStream = Okio.buffer(Okio.sink(sideChannel))
private var totalBytesRead = 0L
private var reachedLimit = false
private var upstreamFailed = false
private var isReadLimitExceeded = false
private var isUpstreamExhausted = false
private var isFailure = false

override fun read(sink: Buffer, byteCount: Long): Long {
val bytesRead = try {
Expand All @@ -37,19 +39,20 @@ internal class TeeSource(
}

if (bytesRead == -1L) {
isUpstreamExhausted = true
sideStream.close()
return -1L
}

totalBytesRead += bytesRead
if (!reachedLimit && (totalBytesRead <= readBytesLimit)) {
if (!isReadLimitExceeded && (totalBytesRead <= readBytesLimit)) {
val offset = sink.size() - bytesRead
sink.copyTo(sideStream.buffer(), offset, bytesRead)
sideStream.emitCompleteSegments()
return bytesRead
}
if (!reachedLimit) {
reachedLimit = true
if (!isReadLimitExceeded) {
isReadLimitExceeded = true
sideStream.close()
callSideChannelFailure(IOException("Capacity of $readBytesLimit bytes exceeded"))
}
Expand All @@ -60,16 +63,18 @@ internal class TeeSource(
override fun close() {
sideStream.close()
upstream.close()
if (!upstreamFailed) {
if (isUpstreamExhausted) {
callback.onSuccess(sideChannel)
} else {
callSideChannelFailure(IOException("Upstream was not fully consumed"))
}
}

override fun timeout(): Timeout = upstream.timeout()

private fun callSideChannelFailure(exception: IOException) {
if (!upstreamFailed) {
upstreamFailed = true
if (!isFailure) {
isFailure = true
callback.onFailure(exception, sideChannel)
}
}
Expand All @@ -83,7 +88,10 @@ internal class TeeSource(
/**
* Called when there was an issue while copying bytes to the [file].
*
* It might occur due to an exception thrown while reading bytes or due to exceeding capacity limit.
* It might occur due to one of the following reasons:
* - an exception was thrown while reading bytes
* - capacity limit was exceeded
* - upstream was not fully consumed
*/
fun onFailure(exception: IOException, file: File)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,36 @@ class TeeSourceTest {
.isEqualTo("Hello there!")
}

@Test
fun notConsumedUpstream_isNotConsideredSuccess(@TempDir tempDir: File) {
val testFile = File(tempDir, "testFile")
// Okio uses 8KiB as a single size read.
val testSource = TestSource(8_192 * 2)

val teeSource = TeeSource(testSource, testFile, teeCallback)
Okio.buffer(teeSource).use { source ->
source.readByteString(8_192)
}

assertThat(teeCallback.exception)
.hasMessageThat()
.isEqualTo("Upstream was not fully consumed")
}

@Test
fun partiallyReadBytesFromUpstream_areAvailableToSideChannel(@TempDir tempDir: File) {
val testFile = File(tempDir, "testFile")
// Okio uses 8KiB as a single size read.
val testSource = TestSource(8_192 * 2)

val teeSource = TeeSource(testSource, testFile, teeCallback)
Okio.buffer(teeSource).use { source ->
source.readByteString(8_192)
}

assertThat(teeCallback.fileContent).isEqualTo(testSource.content.substring(0, 8_192))
}

private class TestSource(contentLength: Int = 1_000) : Source {
val content: ByteString = ByteString.of(*Random.nextBytes(contentLength))
private val buffer = Buffer().apply { write(content) }
Expand Down

0 comments on commit 9bafb00

Please sign in to comment.