Skip to content

Commit 2960c93

Browse files
committed
Added ShuffleBlockFetcherIteratorSuite.
1 parent e29c721 commit 2960c93

File tree

3 files changed

+184
-121
lines changed

3 files changed

+184
-121
lines changed

core/src/main/scala/org/apache/spark/network/BlockTransferService.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ abstract class BlockTransferService {
7474
level: StorageLevel): Future[Unit]
7575

7676
/**
77-
* A special case of [[fetchBlocks]], since it only fetches on block and is blocking.
77+
* A special case of [[fetchBlocks]], as it fetches only one block and is blocking.
7878
*
7979
* It is also only available after [[init]] is invoked.
8080
*/

core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala

Lines changed: 0 additions & 120 deletions
This file was deleted.
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.storage
19+
20+
import org.apache.spark.TaskContext
21+
import org.apache.spark.network.{BlockFetchingListener, BlockTransferService}
22+
23+
import org.mockito.Mockito._
24+
import org.mockito.Matchers.{any, eq => meq}
25+
import org.mockito.invocation.InvocationOnMock
26+
import org.mockito.stubbing.Answer
27+
28+
import org.scalatest.FunSuite
29+
30+
31+
class ShuffleBlockFetcherIteratorSuite extends FunSuite {
32+
33+
test("handle local read failures in BlockManager") {
34+
val transfer = mock(classOf[BlockTransferService])
35+
val blockManager = mock(classOf[BlockManager])
36+
doReturn(BlockManagerId("test-client", "test-client", 1)).when(blockManager).blockManagerId
37+
38+
val blIds = Array[BlockId](
39+
ShuffleBlockId(0,0,0),
40+
ShuffleBlockId(0,1,0),
41+
ShuffleBlockId(0,2,0),
42+
ShuffleBlockId(0,3,0),
43+
ShuffleBlockId(0,4,0))
44+
45+
val optItr = mock(classOf[Option[Iterator[Any]]])
46+
val answer = new Answer[Option[Iterator[Any]]] {
47+
override def answer(invocation: InvocationOnMock) = Option[Iterator[Any]] {
48+
throw new Exception
49+
}
50+
}
51+
52+
// 3rd block is going to fail
53+
doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(0)), any())
54+
doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(1)), any())
55+
doAnswer(answer).when(blockManager).getLocalShuffleFromDisk(meq(blIds(2)), any())
56+
doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(3)), any())
57+
doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(4)), any())
58+
59+
val bmId = BlockManagerId("test-client", "test-client", 1)
60+
val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
61+
(bmId, blIds.map(blId => (blId, 1.asInstanceOf[Long])).toSeq)
62+
)
63+
64+
val iterator = new ShuffleBlockFetcherIterator(
65+
new TaskContext(0, 0, 0),
66+
transfer,
67+
blockManager,
68+
blocksByAddress,
69+
null,
70+
48 * 1024 * 1024)
71+
72+
// Without exhausting the iterator, the iterator should be lazy and not call
73+
// getLocalShuffleFromDisk.
74+
verify(blockManager, times(0)).getLocalShuffleFromDisk(any(), any())
75+
76+
assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements")
77+
// the 2nd element of the tuple returned by iterator.next should be defined when
78+
// fetching successfully
79+
assert(iterator.next()._2.isDefined,
80+
"1st element should be defined but is not actually defined")
81+
verify(blockManager, times(1)).getLocalShuffleFromDisk(any(), any())
82+
83+
assert(iterator.hasNext, "iterator should have 5 elements but actually has 1 element")
84+
assert(iterator.next()._2.isDefined,
85+
"2nd element should be defined but is not actually defined")
86+
verify(blockManager, times(2)).getLocalShuffleFromDisk(any(), any())
87+
88+
assert(iterator.hasNext, "iterator should have 5 elements but actually has 2 elements")
89+
// 3rd fetch should be failed
90+
intercept[Exception] {
91+
iterator.next()
92+
}
93+
verify(blockManager, times(3)).getLocalShuffleFromDisk(any(), any())
94+
}
95+
96+
test("handle local read successes") {
97+
val transfer = mock(classOf[BlockTransferService])
98+
val blockManager = mock(classOf[BlockManager])
99+
doReturn(BlockManagerId("test-client", "test-client", 1)).when(blockManager).blockManagerId
100+
101+
val blIds = Array[BlockId](
102+
ShuffleBlockId(0,0,0),
103+
ShuffleBlockId(0,1,0),
104+
ShuffleBlockId(0,2,0),
105+
ShuffleBlockId(0,3,0),
106+
ShuffleBlockId(0,4,0))
107+
108+
val optItr = mock(classOf[Option[Iterator[Any]]])
109+
110+
// All blocks should be fetched successfully
111+
doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(0)), any())
112+
doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(1)), any())
113+
doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(2)), any())
114+
doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(3)), any())
115+
doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(4)), any())
116+
117+
val bmId = BlockManagerId("test-client", "test-client", 1)
118+
val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
119+
(bmId, blIds.map(blId => (blId, 1.asInstanceOf[Long])).toSeq)
120+
)
121+
122+
val iterator = new ShuffleBlockFetcherIterator(
123+
new TaskContext(0, 0, 0),
124+
transfer,
125+
blockManager,
126+
blocksByAddress,
127+
null,
128+
48 * 1024 * 1024)
129+
130+
// Without exhausting the iterator, the iterator should be lazy and not call getLocalShuffleFromDisk.
131+
verify(blockManager, times(0)).getLocalShuffleFromDisk(any(), any())
132+
133+
assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements")
134+
assert(iterator.next()._2.isDefined,
135+
"All elements should be defined but 1st element is not actually defined")
136+
assert(iterator.hasNext, "iterator should have 5 elements but actually has 1 element")
137+
assert(iterator.next()._2.isDefined,
138+
"All elements should be defined but 2nd element is not actually defined")
139+
assert(iterator.hasNext, "iterator should have 5 elements but actually has 2 elements")
140+
assert(iterator.next()._2.isDefined,
141+
"All elements should be defined but 3rd element is not actually defined")
142+
assert(iterator.hasNext, "iterator should have 5 elements but actually has 3 elements")
143+
assert(iterator.next()._2.isDefined,
144+
"All elements should be defined but 4th element is not actually defined")
145+
assert(iterator.hasNext, "iterator should have 5 elements but actually has 4 elements")
146+
assert(iterator.next()._2.isDefined,
147+
"All elements should be defined but 5th element is not actually defined")
148+
149+
verify(blockManager, times(5)).getLocalShuffleFromDisk(any(), any())
150+
}
151+
152+
test("handle remote fetch failures in BlockTransferService") {
153+
val transfer = mock(classOf[BlockTransferService])
154+
when(transfer.fetchBlocks(any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
155+
override def answer(invocation: InvocationOnMock): Unit = {
156+
val listener = invocation.getArguments()(3).asInstanceOf[BlockFetchingListener]
157+
listener.onBlockFetchFailure(new Exception("blah"))
158+
}
159+
})
160+
161+
val blockManager = mock(classOf[BlockManager])
162+
163+
when(blockManager.blockManagerId).thenReturn(BlockManagerId("test-client", "test-client", 1))
164+
165+
val blId1 = ShuffleBlockId(0, 0, 0)
166+
val blId2 = ShuffleBlockId(0, 1, 0)
167+
val bmId = BlockManagerId("test-server", "test-server", 1)
168+
val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
169+
(bmId, Seq((blId1, 1L), (blId2, 1L))))
170+
171+
val iterator = new ShuffleBlockFetcherIterator(
172+
new TaskContext(0, 0, 0),
173+
transfer,
174+
blockManager,
175+
blocksByAddress,
176+
null,
177+
48 * 1024 * 1024)
178+
179+
iterator.foreach { case (_, iterOption) =>
180+
assert(!iterOption.isDefined)
181+
}
182+
}
183+
}

0 commit comments

Comments
 (0)