From 9bafb00de2dc8788f1dd9fef95caaec4f002f502 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Sikora?= Date: Fri, 27 Mar 2020 18:08:02 +0100 Subject: [PATCH] Properly handle unexhausted network responses (#288) * Handle properly not consumed upstream body. * Handle IO issues while reading from file. --- .../chucker/api/ChuckerInterceptor.kt | 10 ++++--- .../chucker/internal/support/TeeSource.kt | 28 ++++++++++------- .../chucker/internal/support/TeeSourceTest.kt | 30 +++++++++++++++++++ 3 files changed, 54 insertions(+), 14 deletions(-) diff --git a/library/src/main/java/com/chuckerteam/chucker/api/ChuckerInterceptor.kt b/library/src/main/java/com/chuckerteam/chucker/api/ChuckerInterceptor.kt index 14e5078a7..8ddb9d550 100755 --- a/library/src/main/java/com/chuckerteam/chucker/api/ChuckerInterceptor.kt +++ b/library/src/main/java/com/chuckerteam/chucker/api/ChuckerInterceptor.kt @@ -244,7 +244,7 @@ class ChuckerInterceptor internal constructor( override fun onSuccess(file: File) { val buffer = readResponseBuffer(file, response.isGzipped) file.delete() - processResponseBody(response, buffer, transaction) + if (buffer != null) processResponseBody(response, buffer, transaction) collector.onResponseReceived(transaction) } @@ -253,15 +253,17 @@ class ChuckerInterceptor internal constructor( collector.onResponseReceived(transaction) } - private fun readResponseBuffer(responseBody: File, isGzipped: Boolean): Buffer { + private fun readResponseBuffer(responseBody: File, isGzipped: Boolean): Buffer? { val bufferedSource = Okio.buffer(Okio.source(responseBody)) val source = if (isGzipped) { GzipSource(bufferedSource) } else { bufferedSource } - return Buffer().apply { - writeAll(source) + return try { + Buffer().apply { writeAll(source) } + } catch (_: IOException) { + null } } } diff --git a/library/src/main/java/com/chuckerteam/chucker/internal/support/TeeSource.kt b/library/src/main/java/com/chuckerteam/chucker/internal/support/TeeSource.kt index b56a6c302..692e178e0 100644 --- a/library/src/main/java/com/chuckerteam/chucker/internal/support/TeeSource.kt +++ b/library/src/main/java/com/chuckerteam/chucker/internal/support/TeeSource.kt @@ -15,7 +15,8 @@ import okio.Timeout * to a [sideChannel] file. After the [upstream] is depleted or when a failure occurs * an appropriate [callback] method is called. * - * Failure is considered any [IOException] during reading the bytes or exceeding [readBytesLimit] length. + * Failure is considered any [IOException] during reading the bytes, + * exceeding [readBytesLimit] length or not reading the whole upstream. */ internal class TeeSource( private val upstream: Source, @@ -25,8 +26,9 @@ internal class TeeSource( ) : Source { private val sideStream = Okio.buffer(Okio.sink(sideChannel)) private var totalBytesRead = 0L - private var reachedLimit = false - private var upstreamFailed = false + private var isReadLimitExceeded = false + private var isUpstreamExhausted = false + private var isFailure = false override fun read(sink: Buffer, byteCount: Long): Long { val bytesRead = try { @@ -37,19 +39,20 @@ internal class TeeSource( } if (bytesRead == -1L) { + isUpstreamExhausted = true sideStream.close() return -1L } totalBytesRead += bytesRead - if (!reachedLimit && (totalBytesRead <= readBytesLimit)) { + if (!isReadLimitExceeded && (totalBytesRead <= readBytesLimit)) { val offset = sink.size() - bytesRead sink.copyTo(sideStream.buffer(), offset, bytesRead) sideStream.emitCompleteSegments() return bytesRead } - if (!reachedLimit) { - reachedLimit = true + if (!isReadLimitExceeded) { + isReadLimitExceeded = true sideStream.close() callSideChannelFailure(IOException("Capacity of $readBytesLimit bytes exceeded")) } @@ -60,16 +63,18 @@ internal class TeeSource( override fun close() { sideStream.close() upstream.close() - if (!upstreamFailed) { + if (isUpstreamExhausted) { callback.onSuccess(sideChannel) + } else { + callSideChannelFailure(IOException("Upstream was not fully consumed")) } } override fun timeout(): Timeout = upstream.timeout() private fun callSideChannelFailure(exception: IOException) { - if (!upstreamFailed) { - upstreamFailed = true + if (!isFailure) { + isFailure = true callback.onFailure(exception, sideChannel) } } @@ -83,7 +88,10 @@ internal class TeeSource( /** * Called when there was an issue while copying bytes to the [file]. * - * It might occur due to an exception thrown while reading bytes or due to exceeding capacity limit. + * It might occur due to one of the following reasons: + * - an exception was thrown while reading bytes + * - capacity limit was exceeded + * - upstream was not fully consumed */ fun onFailure(exception: IOException, file: File) } diff --git a/library/src/test/java/com/chuckerteam/chucker/internal/support/TeeSourceTest.kt b/library/src/test/java/com/chuckerteam/chucker/internal/support/TeeSourceTest.kt index 165372d7e..95b8c55b2 100644 --- a/library/src/test/java/com/chuckerteam/chucker/internal/support/TeeSourceTest.kt +++ b/library/src/test/java/com/chuckerteam/chucker/internal/support/TeeSourceTest.kt @@ -102,6 +102,36 @@ class TeeSourceTest { .isEqualTo("Hello there!") } + @Test + fun notConsumedUpstream_isNotConsideredSuccess(@TempDir tempDir: File) { + val testFile = File(tempDir, "testFile") + // Okio uses 8KiB as a single size read. + val testSource = TestSource(8_192 * 2) + + val teeSource = TeeSource(testSource, testFile, teeCallback) + Okio.buffer(teeSource).use { source -> + source.readByteString(8_192) + } + + assertThat(teeCallback.exception) + .hasMessageThat() + .isEqualTo("Upstream was not fully consumed") + } + + @Test + fun partiallyReadBytesFromUpstream_areAvailableToSideChannel(@TempDir tempDir: File) { + val testFile = File(tempDir, "testFile") + // Okio uses 8KiB as a single size read. + val testSource = TestSource(8_192 * 2) + + val teeSource = TeeSource(testSource, testFile, teeCallback) + Okio.buffer(teeSource).use { source -> + source.readByteString(8_192) + } + + assertThat(teeCallback.fileContent).isEqualTo(testSource.content.substring(0, 8_192)) + } + private class TestSource(contentLength: Int = 1_000) : Source { val content: ByteString = ByteString.of(*Random.nextBytes(contentLength)) private val buffer = Buffer().apply { write(content) }