Skip to content

Commit a011bfa

Browse files
committed
Use PrivateMethodTester on check that delegate stream is closed
1 parent 4ea1712 commit a011bfa

File tree

2 files changed

+10
-7
lines changed

2 files changed

+10
-7
lines changed

core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -319,10 +319,10 @@ final class ShuffleBlockFetcherIterator(
319319
* Note: the delegate parameter is private[storage] to make it available to tests.
320320
*/
321321
private class BufferReleasingInputStream(
322-
private[storage] val delegate: InputStream,
323-
iterator: ShuffleBlockFetcherIterator)
322+
private val delegate: InputStream,
323+
private val iterator: ShuffleBlockFetcherIterator)
324324
extends InputStream {
325-
private var closed = false
325+
private[this] var closed = false
326326

327327
override def read(): Int = delegate.read()
328328

core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,15 @@ import org.mockito.Matchers.{any, eq => meq}
2727
import org.mockito.Mockito._
2828
import org.mockito.invocation.InvocationOnMock
2929
import org.mockito.stubbing.Answer
30+
import org.scalatest.PrivateMethodTester
3031

3132
import org.apache.spark.{SparkFunSuite, TaskContextImpl}
3233
import org.apache.spark.network._
3334
import org.apache.spark.network.buffer.ManagedBuffer
3435
import org.apache.spark.network.shuffle.BlockFetchingListener
3536

3637

37-
class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite {
38+
class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodTester {
3839
// Some of the tests are quite tricky because we are testing the cleanup behavior
3940
// in the presence of faults.
4041

@@ -113,13 +114,15 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite {
113114
// Note: ShuffleBlockFetcherIterator wraps input streams in a BufferReleasingInputStream
114115
val wrappedInputStream = inputStream.get.asInstanceOf[BufferReleasingInputStream]
115116
verify(mockBuf, times(0)).release()
116-
verify(wrappedInputStream.delegate, times(0)).close()
117+
val delegateAccess = PrivateMethod[InputStream]('delegate)
118+
119+
verify(wrappedInputStream.invokePrivate(delegateAccess()), times(0)).close()
117120
wrappedInputStream.close()
118121
verify(mockBuf, times(1)).release()
119-
verify(wrappedInputStream.delegate, times(1)).close()
122+
verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close()
120123
wrappedInputStream.close() // close should be idempotent
121124
verify(mockBuf, times(1)).release()
122-
verify(wrappedInputStream.delegate, times(1)).close()
125+
verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close()
123126
}
124127

125128
// 3 local blocks, and 2 remote blocks

0 commit comments

Comments
 (0)