Skip to content

Commit 1552665

Browse files
ConeyLiucloud-fan
authored andcommitted
[SPARK-19956][CORE] Optimize a location order of blocks with topology information
## What changes were proposed in this pull request? When call the method getLocations of BlockManager, we only compare the data block host. Random selection for non-local data blocks, this may cause the selected data block to be in a different rack. So in this patch to increase the sort of the rack. ## How was this patch tested? New test case. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Xianyang Liu <xianyang.liu@intel.com> Closes apache#17300 from ConeyLiu/blockmanager.
1 parent 0f820e2 commit 1552665

File tree

2 files changed

+37
-5
lines changed

2 files changed

+37
-5
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -612,12 +612,19 @@ private[spark] class BlockManager(
612612

613613
/**
614614
* Return a list of locations for the given block, prioritizing the local machine since
615-
* multiple block managers can share the same host.
615+
* multiple block managers can share the same host, followed by hosts on the same rack.
616616
*/
617617
private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
618618
val locs = Random.shuffle(master.getLocations(blockId))
619619
val (preferredLocs, otherLocs) = locs.partition { loc => blockManagerId.host == loc.host }
620-
preferredLocs ++ otherLocs
620+
blockManagerId.topologyInfo match {
621+
case None => preferredLocs ++ otherLocs
622+
case Some(_) =>
623+
val (sameRackLocs, differentRackLocs) = otherLocs.partition {
624+
loc => blockManagerId.topologyInfo == loc.topologyInfo
625+
}
626+
preferredLocs ++ sameRackLocs ++ differentRackLocs
627+
}
621628
}
622629

623630
/**

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -496,8 +496,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
496496
assert(list2DiskGet.get.readMethod === DataReadMethod.Disk)
497497
}
498498

499-
test("optimize a location order of blocks") {
500-
val localHost = Utils.localHostName()
499+
test("optimize a location order of blocks without topology information") {
500+
val localHost = "localhost"
501501
val otherHost = "otherHost"
502502
val bmMaster = mock(classOf[BlockManagerMaster])
503503
val bmId1 = BlockManagerId("id1", localHost, 1)
@@ -508,7 +508,32 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
508508
val blockManager = makeBlockManager(128, "exec", bmMaster)
509509
val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations)
510510
val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0))
511-
assert(locations.map(_.host).toSet === Set(localHost, localHost, otherHost))
511+
assert(locations.map(_.host) === Seq(localHost, localHost, otherHost))
512+
}
513+
514+
test("optimize a location order of blocks with topology information") {
515+
val localHost = "localhost"
516+
val otherHost = "otherHost"
517+
val localRack = "localRack"
518+
val otherRack = "otherRack"
519+
520+
val bmMaster = mock(classOf[BlockManagerMaster])
521+
val bmId1 = BlockManagerId("id1", localHost, 1, Some(localRack))
522+
val bmId2 = BlockManagerId("id2", localHost, 2, Some(localRack))
523+
val bmId3 = BlockManagerId("id3", otherHost, 3, Some(otherRack))
524+
val bmId4 = BlockManagerId("id4", otherHost, 4, Some(otherRack))
525+
val bmId5 = BlockManagerId("id5", otherHost, 5, Some(localRack))
526+
when(bmMaster.getLocations(mc.any[BlockId]))
527+
.thenReturn(Seq(bmId1, bmId2, bmId5, bmId3, bmId4))
528+
529+
val blockManager = makeBlockManager(128, "exec", bmMaster)
530+
blockManager.blockManagerId =
531+
BlockManagerId(SparkContext.DRIVER_IDENTIFIER, localHost, 1, Some(localRack))
532+
val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations)
533+
val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0))
534+
assert(locations.map(_.host) === Seq(localHost, localHost, otherHost, otherHost, otherHost))
535+
assert(locations.flatMap(_.topologyInfo)
536+
=== Seq(localRack, localRack, localRack, otherRack, otherRack))
512537
}
513538

514539
test("SPARK-9591: getRemoteBytes from another location when Exception throw") {

0 commit comments

Comments
 (0)