Skip to content

Commit ae05fcd

Browse files
committed
Updated tests, although DistributedSuite is hanging.
1 parent d8d595c commit ae05fcd

File tree

8 files changed

+85
-363
lines changed

8 files changed

+85
-363
lines changed

core/src/main/scala/org/apache/spark/network/BlockTransferService.scala

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,39 @@ abstract class BlockTransferService {
6363
* }}}
6464
*/
6565
def fetchBlock(hostName: String, port: Int, blockId: String): ManagedBuffer = {
66-
//fetchBlocks(hostName, port, Seq(blockId)).iterator().next()._2
67-
null
66+
// TODO(rxin): Add timeout?
67+
val lock = new Object
68+
@volatile var result: Either[ManagedBuffer, Exception] = null
69+
fetchBlocks(hostName, port, Seq(blockId), new BlockFetchingListener {
70+
override def onBlockFetchFailure(exception: Exception): Unit = {
71+
lock.synchronized {
72+
result = Right(exception)
73+
lock.notify()
74+
}
75+
}
76+
override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
77+
lock.synchronized {
78+
result = Left(data)
79+
lock.notify()
80+
}
81+
}
82+
})
83+
84+
// Sleep until result is no longer null
85+
lock.synchronized {
86+
while (result == null) {
87+
try {
88+
lock.wait()
89+
} catch {
90+
case e: InterruptedException =>
91+
}
92+
}
93+
}
94+
95+
result match {
96+
case Left(data: ManagedBuffer) => data
97+
case Right(e: Exception) => throw e
98+
}
6899
}
69100

70101
/**

core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717

1818
package org.apache.spark.network
1919

20-
import java.io.{File, FileInputStream, InputStream}
20+
import java.io.{RandomAccessFile, File, FileInputStream, InputStream}
2121
import java.nio.ByteBuffer
22+
import java.nio.channels.FileChannel.MapMode
2223

2324
import io.netty.buffer.{ByteBufInputStream, ByteBuf, Unpooled}
2425
import io.netty.channel.DefaultFileRegion
@@ -34,7 +35,7 @@ abstract class ManagedBuffer {
3435
// Note that all the methods are defined with parenthesis because their implementations can
3536
// have side effects (io operations).
3637

37-
def byteBuffer(): ByteBuffer = throw new UnsupportedOperationException
38+
def byteBuffer(): ByteBuffer
3839

3940
def fileSegment(): Option[FileSegment] = None
4041

@@ -56,6 +57,11 @@ final class FileSegmentManagedBuffer(file: File, offset: Long, length: Long)
5657

5758
override def size: Long = length
5859

60+
override def byteBuffer(): ByteBuffer = {
61+
val channel = new RandomAccessFile(file, "r").getChannel
62+
channel.map(MapMode.READ_ONLY, offset, length)
63+
}
64+
5965
override private[network] def toNetty(): AnyRef = {
6066
val fileChannel = new FileInputStream(file).getChannel
6167
new DefaultFileRegion(fileChannel, offset, length)

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,9 @@ private[spark] class BlockManager(
6363
mapOutputTracker: MapOutputTracker,
6464
shuffleManager: ShuffleManager,
6565
blockTransferService: BlockTransferService)
66-
extends BlockDataProvider with Logging {
66+
extends BlockDataManager with Logging {
6767

68-
//blockTransferService.init(this)
68+
blockTransferService.init(this)
6969

7070
private val port = conf.getInt("spark.blockManager.port", 0)
7171
val shuffleBlockManager = new ShuffleBlockManager(this, shuffleManager)
@@ -207,20 +207,34 @@ private[spark] class BlockManager(
207207
}
208208
}
209209

210-
override def getBlockData(blockId: String): Either[FileSegment, ByteBuffer] = {
210+
/**
211+
* Interface to get local block data.
212+
*
213+
* @return Some(buffer) if the block exists locally, and None if it doesn't.
214+
*/
215+
override def getBlockData(blockId: String): Option[ManagedBuffer] = {
211216
val bid = BlockId(blockId)
212217
if (bid.isShuffle) {
213-
Left(diskBlockManager.getBlockLocation(bid))
218+
val fileSegment = diskBlockManager.getBlockLocation(bid)
219+
Some(new FileSegmentManagedBuffer(fileSegment.file, fileSegment.offset, fileSegment.length))
214220
} else {
215221
val blockBytesOpt = doGetLocal(bid, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
216222
if (blockBytesOpt.isDefined) {
217-
Right(blockBytesOpt.get)
223+
val buffer = blockBytesOpt.get
224+
Some(new NioByteBufferManagedBuffer(buffer))
218225
} else {
219-
throw new BlockNotFoundException(blockId)
226+
None
220227
}
221228
}
222229
}
223230

231+
/**
232+
* Put the block locally, using the given storage level.
233+
*/
234+
override def putBlockData(blockId: String, data: ManagedBuffer, level: StorageLevel): Unit = {
235+
putBytes(BlockId(blockId), data.byteBuffer(), level)
236+
}
237+
224238
/**
225239
* Get the BlockStatus for the block identified by the given ID, if it exists.
226240
* NOTE: This is mainly for testing, and it doesn't fetch information from Tachyon.

core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,8 @@ final class ShuffleBlockFetcherIterator(
231231
if (!result.failed) {
232232
bytesInFlight -= result.size
233233
}
234-
while (!fetchRequests.isEmpty &&
234+
// Send fetch requests up to maxBytesInFlight
235+
while (fetchRequests.nonEmpty &&
235236
(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
236237
sendRequest(fetchRequests.dequeue())
237238
}

core/src/test/scala/org/apache/spark/DistributedSuite.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark
1919

20-
import org.apache.spark.network.cm.{GetBlock, BlockManagerWorker, ConnectionManagerId}
2120
import org.scalatest.BeforeAndAfter
2221
import org.scalatest.FunSuite
2322
import org.scalatest.concurrent.Timeouts._
@@ -202,12 +201,13 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
202201
val blockIds = data.partitions.indices.map(index => RDDBlockId(data.id, index)).toArray
203202
val blockId = blockIds(0)
204203
val blockManager = SparkEnv.get.blockManager
205-
blockManager.master.getLocations(blockId).foreach(id => {
206-
val bytes = BlockManagerWorker.syncGetBlock(
207-
GetBlock(blockId), ConnectionManagerId(id.host, id.port))
208-
val deserialized = blockManager.dataDeserialize(blockId, bytes).asInstanceOf[Iterator[Int]].toList
204+
val blockTransfer = SparkEnv.get.blockTransferService
205+
blockManager.master.getLocations(blockId).foreach { cmId =>
206+
val bytes = blockTransfer.fetchBlock(cmId.host, cmId.port, blockId.toString)
207+
val deserialized = blockManager.dataDeserialize(blockId, bytes.byteBuffer())
208+
.asInstanceOf[Iterator[Int]].toList
209209
assert(deserialized === (1 to 100).toList)
210-
})
210+
}
211211
}
212212

213213
test("compute without caching when no partitions fit in memory") {
@@ -268,6 +268,8 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
268268
DistributedSuite.amMaster = true
269269
sc = new SparkContext(clusterUrl, "test")
270270
for (i <- 1 to 3) {
271+
println("i = " + i)
272+
Console.out.flush()
271273
val data = sc.parallelize(Seq(true, true), 2)
272274
assert(data.count === 2)
273275
assert(data.map(markNodeIfIdentity).collect.size === 2)
@@ -339,6 +341,7 @@ object DistributedSuite {
339341
// Act like an identity function, but if the argument is true, set mark to true.
340342
def markNodeIfIdentity(item: Boolean): Boolean = {
341343
if (item) {
344+
println("marking node!!!!!!!!!!!!!!!")
342345
assert(!amMaster)
343346
mark = true
344347
}
@@ -349,6 +352,7 @@ object DistributedSuite {
349352
// crashing the entire JVM.
350353
def failOnMarkedIdentity(item: Boolean): Boolean = {
351354
if (mark) {
355+
println("failing node !!!!!!!!!!!!!!!")
352356
System.exit(42)
353357
}
354358
item

0 commit comments

Comments
 (0)