Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-33287
Browse files Browse the repository at this point in the history
  • Loading branch information
gaborgsomogyi committed Nov 21, 2020
2 parents 9a87255 + cf74901 commit 864062f
Show file tree
Hide file tree
Showing 37 changed files with 996 additions and 270 deletions.
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{ShuffleHandle, ShuffleWriteProcessor}
import org.apache.spark.storage.BlockManagerId

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -95,6 +96,20 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, this)

/**
* Stores the location of the list of chosen external shuffle services for handling the
* shuffle merge requests from mappers in this shuffle map stage.
*/
private[spark] var mergerLocs: Seq[BlockManagerId] = Nil

def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = {
if (mergerLocs != null) {
this.mergerLocs = mergerLocs
}
}

def getMergerLocs: Seq[BlockManagerId] = mergerLocs

_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
_rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
}
Expand Down
47 changes: 47 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1945,4 +1945,51 @@ package object config {
.version("3.0.1")
.booleanConf
.createWithDefault(false)

private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
ConfigBuilder("spark.shuffle.push.enabled")
.doc("Set to 'true' to enable push-based shuffle on the client side and this works in " +
"conjunction with the server side flag spark.shuffle.server.mergedShuffleFileManagerImpl " +
"which needs to be set with the appropriate " +
"org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for push-based " +
"shuffle to be enabled")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

private[spark] val SHUFFLE_MERGER_MAX_RETAINED_LOCATIONS =
ConfigBuilder("spark.shuffle.push.maxRetainedMergerLocations")
.doc("Maximum number of shuffle push merger locations cached for push based shuffle. " +
"Currently, shuffle push merger locations are nothing but external shuffle services " +
"which are responsible for handling pushed blocks and merging them and serving " +
"merged blocks for later shuffle fetch.")
.version("3.1.0")
.intConf
.createWithDefault(500)

private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO =
ConfigBuilder("spark.shuffle.push.mergersMinThresholdRatio")
.doc("The minimum number of shuffle merger locations required to enable push based " +
"shuffle for a stage. This is specified as a ratio of the number of partitions in " +
"the child stage. For example, a reduce stage which has 100 partitions and uses the " +
"default value 0.05 requires at least 5 unique merger locations to enable push based " +
"shuffle. Merger locations are currently defined as external shuffle services.")
.version("3.1.0")
.doubleConf
.createWithDefault(0.05)

private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD =
ConfigBuilder("spark.shuffle.push.mergersMinStaticThreshold")
.doc(s"The static threshold for number of shuffle push merger locations should be " +
"available in order to enable push based shuffle for a stage. Note this config " +
s"works in conjunction with ${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key}. " +
"Maximum of spark.shuffle.push.mergersMinStaticThreshold and " +
s"${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key} ratio number of mergers needed to " +
"enable push based shuffle for a stage. For eg: with 1000 partitions for the child " +
"stage with spark.shuffle.push.mergersMinStaticThreshold as 5 and " +
s"${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key} set to 0.05, we would need " +
"at least 50 mergers to enable push based shuffle for that stage.")
.version("3.1.0")
.doubleConf
.createWithDefault(5)
}
40 changes: 40 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ private[spark] class DAGScheduler(
private[spark] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)

private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(sc.getConf)

/**
* Called by the TaskSetManager to report task's starting.
*/
Expand Down Expand Up @@ -1252,6 +1254,33 @@ private[spark] class DAGScheduler(
execCores.map(cores => properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores))
}

/**
* If push based shuffle is enabled, set the shuffle services to be used for the given
* shuffle map stage for block push/merge.
*
* Even with dynamic resource allocation kicking in and significantly reducing the number
* of available active executors, we would still be able to get sufficient shuffle service
* locations for block push/merge by getting the historical locations of past executors.
*/
private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage): Unit = {
// TODO(SPARK-32920) Handle stage reuse/retry cases separately as without finalize
// TODO changes we cannot disable shuffle merge for the retry/reuse cases
val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)

if (mergerLocs.nonEmpty) {
stage.shuffleDep.setMergerLocs(mergerLocs)
logInfo(s"Push-based shuffle enabled for $stage (${stage.name}) with" +
s" ${stage.shuffleDep.getMergerLocs.size} merger locations")

logDebug("List of shuffle push merger locations " +
s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
} else {
logInfo("No available merger locations." +
s" Push-based shuffle disabled for $stage (${stage.name})")
}
}

/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
logDebug("submitMissingTasks(" + stage + ")")
Expand Down Expand Up @@ -1281,6 +1310,12 @@ private[spark] class DAGScheduler(
stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
// Only generate merger location for a given shuffle dependency once. This way, even if
// this stage gets retried, it would still be merging blocks using the same set of
// shuffle services.
if (pushBasedShuffleEnabled) {
prepareShuffleServicesForShuffleMapStage(s)
}
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
Expand Down Expand Up @@ -2027,6 +2062,11 @@ private[spark] class DAGScheduler(
if (!executorFailureEpoch.contains(execId) || executorFailureEpoch(execId) < currentEpoch) {
executorFailureEpoch(execId) = currentEpoch
logInfo(s"Executor lost: $execId (epoch $currentEpoch)")
if (pushBasedShuffleEnabled) {
// Remove fetchFailed host in the shuffle push merger list for push based shuffle
hostToUnregisterOutputs.foreach(
host => blockManagerMaster.removeShufflePushMergerLocation(host))
}
blockManagerMaster.removeExecutor(execId)
clearCacheLocs()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.scheduler

import org.apache.spark.resource.ResourceProfile
import org.apache.spark.storage.BlockManagerId

/**
* A backend interface for scheduling systems that allows plugging in different ones under
Expand Down Expand Up @@ -92,4 +93,16 @@ private[spark] trait SchedulerBackend {
*/
def maxNumConcurrentTasks(rp: ResourceProfile): Int

/**
* Get the list of host locations for push based shuffle
*
* Currently push based shuffle is disabled for both stage retry and stage reuse cases
* (for eg: in the case where few partitions are lost due to failure). Hence this method
* should be invoked only once for a ShuffleDependency.
* @return List of external shuffle services locations
*/
def getShufflePushMergerLocations(
numPartitions: Int,
resourceProfileId: Int): Seq[BlockManagerId] = Nil

}
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,6 @@ private[spark] object BlockManagerId {
def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {
blockManagerIdCache.get(id)
}

private[spark] val SHUFFLE_MERGER_IDENTIFIER = "shuffle-push-merger"
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,26 @@ class BlockManagerMaster(
driverEndpoint.askSync[Seq[BlockManagerId]](GetPeers(blockManagerId))
}

/**
* Get a list of unique shuffle service locations where an executor is successfully
* registered in the past for block push/merge with push based shuffle.
*/
def getShufflePushMergerLocations(
numMergersNeeded: Int,
hostsToFilter: Set[String]): Seq[BlockManagerId] = {
driverEndpoint.askSync[Seq[BlockManagerId]](
GetShufflePushMergerLocations(numMergersNeeded, hostsToFilter))
}

/**
* Remove the host from the candidate list of shuffle push mergers. This can be
* triggered if there is a FetchFailedException on the host
* @param host
*/
def removeShufflePushMergerLocation(host: String): Unit = {
driverEndpoint.askSync[Seq[BlockManagerId]](RemoveShufflePushMergerLocation(host))
}

def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
driverEndpoint.askSync[Option[RpcEndpointRef]](GetExecutorEndpointRef(executorId))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ class BlockManagerMasterEndpoint(
// Mapping from block id to the set of block managers that have the block.
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]

// Mapping from host name to shuffle (mergers) services where the current app
// registered an executor in the past. Older hosts are removed when the
// maxRetainedMergerLocations size is reached in favor of newer locations.
private val shuffleMergerLocations = new mutable.LinkedHashMap[String, BlockManagerId]()

// Maximum number of merger locations to cache
private val maxRetainedMergerLocations = conf.get(config.SHUFFLE_MERGER_MAX_RETAINED_LOCATIONS)

private val askThreadPool =
ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool", 100)
private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool)
Expand All @@ -92,6 +100,8 @@ class BlockManagerMasterEndpoint(

val defaultRpcTimeout = RpcUtils.askRpcTimeout(conf)

private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(conf)

logInfo("BlockManagerMasterEndpoint up")
// same as `conf.get(config.SHUFFLE_SERVICE_ENABLED)
// && conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)`
Expand Down Expand Up @@ -139,6 +149,12 @@ class BlockManagerMasterEndpoint(
case GetBlockStatus(blockId, askStorageEndpoints) =>
context.reply(blockStatus(blockId, askStorageEndpoints))

case GetShufflePushMergerLocations(numMergersNeeded, hostsToFilter) =>
context.reply(getShufflePushMergerLocations(numMergersNeeded, hostsToFilter))

case RemoveShufflePushMergerLocation(host) =>
context.reply(removeShufflePushMergerLocation(host))

case IsExecutorAlive(executorId) =>
context.reply(blockManagerIdByExecutor.contains(executorId))

Expand Down Expand Up @@ -360,6 +376,17 @@ class BlockManagerMasterEndpoint(

}

private def addMergerLocation(blockManagerId: BlockManagerId): Unit = {
if (!blockManagerId.isDriver && !shuffleMergerLocations.contains(blockManagerId.host)) {
val shuffleServerId = BlockManagerId(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER,
blockManagerId.host, externalShuffleServicePort)
if (shuffleMergerLocations.size >= maxRetainedMergerLocations) {
shuffleMergerLocations -= shuffleMergerLocations.head._1
}
shuffleMergerLocations(shuffleServerId.host) = shuffleServerId
}
}

private def removeExecutor(execId: String): Unit = {
logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
Expand Down Expand Up @@ -526,6 +553,10 @@ class BlockManagerMasterEndpoint(

blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(),
maxOnHeapMemSize, maxOffHeapMemSize, storageEndpoint, externalShuffleServiceBlockStatus)

if (pushBasedShuffleEnabled) {
addMergerLocation(id)
}
}
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
Expand Down Expand Up @@ -657,6 +688,40 @@ class BlockManagerMasterEndpoint(
}
}

private def getShufflePushMergerLocations(
numMergersNeeded: Int,
hostsToFilter: Set[String]): Seq[BlockManagerId] = {
val blockManagerHosts = blockManagerIdByExecutor.values.map(_.host).toSet
val filteredBlockManagerHosts = blockManagerHosts.filterNot(hostsToFilter.contains(_))
val filteredMergersWithExecutors = filteredBlockManagerHosts.map(
BlockManagerId(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER, _, externalShuffleServicePort))
// Enough mergers are available as part of active executors list
if (filteredMergersWithExecutors.size >= numMergersNeeded) {
filteredMergersWithExecutors.toSeq
} else {
// Delta mergers added from inactive mergers list to the active mergers list
val filteredMergersWithExecutorsHosts = filteredMergersWithExecutors.map(_.host)
val filteredMergersWithoutExecutors = shuffleMergerLocations.values
.filterNot(x => hostsToFilter.contains(x.host))
.filterNot(x => filteredMergersWithExecutorsHosts.contains(x.host))
val randomFilteredMergersLocations =
if (filteredMergersWithoutExecutors.size >
numMergersNeeded - filteredMergersWithExecutors.size) {
Utils.randomize(filteredMergersWithoutExecutors)
.take(numMergersNeeded - filteredMergersWithExecutors.size)
} else {
filteredMergersWithoutExecutors
}
filteredMergersWithExecutors.toSeq ++ randomFilteredMergersLocations
}
}

private def removeShufflePushMergerLocation(host: String): Unit = {
if (shuffleMergerLocations.contains(host)) {
shuffleMergerLocations.remove(host)
}
}

/**
* Returns an [[RpcEndpointRef]] of the [[BlockManagerReplicaEndpoint]] for sending RPC messages.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,10 @@ private[spark] object BlockManagerMessages {
case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

case class IsExecutorAlive(executorId: String) extends ToBlockManagerMaster

case class GetShufflePushMergerLocations(numMergersNeeded: Int, hostsToFilter: Set[String])
extends ToBlockManagerMaster

case class RemoveShufflePushMergerLocation(host: String) extends ToBlockManagerMaster

}
Loading

0 comments on commit 864062f

Please sign in to comment.