Skip to content

[SPARK-34942][API][CORE] Abstract Location in MapStatus to enable support for custom storage #31876

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.api;

import org.apache.spark.annotation.Private;

/**
* :: Private ::
* A type of {@link Location} which based on the executor.
*
* @since 3.2.0
*/
@Private
public interface ExecutorLocation extends HostLocation {
String executorId();
}
33 changes: 33 additions & 0 deletions core/src/main/java/org/apache/spark/shuffle/api/HostLocation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.api;

import org.apache.spark.annotation.Private;

/**
* :: Private ::
* A type of {@link Location} which based on the host.
*
* @since 3.2.0
*/
@Private
public interface HostLocation extends Location {
String host();

int port();
}
61 changes: 61 additions & 0 deletions core/src/main/java/org/apache/spark/shuffle/api/Location.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.api;

import org.apache.spark.annotation.Private;

import java.io.Externalizable;
import java.io.ObjectInput;
import java.io.ObjectOutput;


/**
* :: Private ::
* An interface for plugging in the location of shuffle files, in order to support store shuffle
* data in different storage, e.g., BlockManager, HDFS, S3. It would be generated by
* {@link ShuffleMapOutputWriter} after writing a shuffle data file and used by ShuffleMapOutputReader
* to read the shuffle data.
*
* Since the location is returned by {@link ShuffleMapOutputWriter#commitAllPartitions()} at executor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to add the Location in MapOutputCommitMessage and slightly modify the ShuffleWriters to propagate Location to the driver. Could be a follow up PR? Or are there any other design choices to propagate the Location information to the driver (right now it is hardcoded to blockManager.blockManagerId).

* and would be sent to driver, users must ensure the location is serializable by
*
* - implement a 0-arg constructor
* - implement {@link java.io.Externalizable#readExternal(ObjectInput)} for deserialization
* - implement {@link java.io.Externalizable#writeExternal(ObjectOutput)} for serialization
*
* Since the location will be used as keys in maps or comparing with others, users must ensure that
Copy link
Member

@dongjoon-hyun dongjoon-hyun Apr 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. This is annotated with Private. users looks a little too broad in this context.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is annotated with Private

This follows the other shuffle APIs (e.g., ShuffleDataIO) intentionally. I guess this's the for safety before pushing the complete feature.

And users means the end-users here.

* invoking {@link java.lang.Object#equals(Object)} or {@link java.lang.Object#hashCode()} on the
* {@link Location} instances would distinguish the different locations.
*
* Spark has its own default implementation of {@link Location} as
* {@link org.apache.spark.storage.BlockManagerId}, which is a subclass of {@link ExecutorLocation}
* since each {@link org.apache.spark.storage.BlockManager} must belong to a certain executor.
* And {@link ExecutorLocation} is a subclass of {@link HostLocation} since each executor must
* belong to a certain host. Users should choose the appropriate location interface according to their
* own use cases.
*
* :: Caution ::
* Spark would reuse the same location instance for locations which are equal due to the
* performance concern. Thus, users should also guarantee the implemented {@link Location}
* is IMMUTABLE.
*
* @since 3.2.0
*/
@Private
public interface Location extends Externalizable {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good if we consider only one Location for a map task. However, this is not addressing the concern with hybrid storage as originally proposed in this design doc, where few of the blocks written in local disk and few are written to HDFS or S3 etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically a single abstract Location interface which can be implemented as you like can handle multiple real locations. IMHO it is just naming: Location => Locations and as mostly single real locations are represented I would even keep the current name.

}
63 changes: 37 additions & 26 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import org.apache.spark.io.CompressionCodec
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId}
import org.apache.spark.shuffle.api.{ExecutorLocation, HostLocation, Location}
import org.apache.spark.storage.{BlockId, ShuffleBlockId}
import org.apache.spark.util._

/**
Expand Down Expand Up @@ -124,13 +125,13 @@ private class ShuffleStatus(numPartitions: Int) extends Logging {
/**
* Update the map output location (e.g. during migration).
*/
def updateMapOutput(mapId: Long, bmAddress: BlockManagerId): Unit = withWriteLock {
def updateMapOutput(mapId: Long, loc: Location): Unit = withWriteLock {
try {
val mapStatusOpt = mapStatuses.find(_.mapId == mapId)
mapStatusOpt match {
case Some(mapStatus) =>
logInfo(s"Updating map output for ${mapId} to ${bmAddress}")
mapStatus.updateLocation(bmAddress)
logInfo(s"Updating map output for $mapId to $loc")
mapStatus.updateLocation(loc)
invalidateSerializedMapOutputStatusCache()
case None =>
logWarning(s"Asked to update map output ${mapId} for untracked map status.")
Expand All @@ -146,9 +147,9 @@ private class ShuffleStatus(numPartitions: Int) extends Logging {
* This is a no-op if there is no registered map output or if the registered output is from a
* different block manager.
*/
def removeMapOutput(mapIndex: Int, bmAddress: BlockManagerId): Unit = withWriteLock {
logDebug(s"Removing existing map output ${mapIndex} ${bmAddress}")
if (mapStatuses(mapIndex) != null && mapStatuses(mapIndex).location == bmAddress) {
def removeMapOutput(mapIndex: Int, loc: Location): Unit = withWriteLock {
logDebug(s"Removing existing map output $mapIndex $loc")
if (mapStatuses(mapIndex) != null && mapStatuses(mapIndex).location == loc) {
_numAvailableOutputs -= 1
mapStatuses(mapIndex) = null
invalidateSerializedMapOutputStatusCache()
Expand All @@ -161,7 +162,10 @@ private class ShuffleStatus(numPartitions: Int) extends Logging {
*/
def removeOutputsOnHost(host: String): Unit = withWriteLock {
logDebug(s"Removing outputs for host ${host}")
removeOutputsByFilter(x => x.host == host)
removeOutputsByFilter { x =>
assert(x.isInstanceOf[HostLocation], s"Required HostLocation, but got $x")
x.asInstanceOf[HostLocation].host == host
}
}

/**
Expand All @@ -171,14 +175,17 @@ private class ShuffleStatus(numPartitions: Int) extends Logging {
*/
def removeOutputsOnExecutor(execId: String): Unit = withWriteLock {
logDebug(s"Removing outputs for execId ${execId}")
removeOutputsByFilter(x => x.executorId == execId)
removeOutputsByFilter { x =>
assert(x.isInstanceOf[ExecutorLocation], s"Required ExecutorLocation, but got $x")
x.asInstanceOf[ExecutorLocation].executorId == execId
}
}

/**
* Removes all shuffle outputs which satisfies the filter. Note that this will also
* remove outputs which are served by an external shuffle server (if one exists).
*/
def removeOutputsByFilter(f: BlockManagerId => Boolean): Unit = withWriteLock {
def removeOutputsByFilter(f: Location => Boolean): Unit = withWriteLock {
for (mapIndex <- mapStatuses.indices) {
if (mapStatuses(mapIndex) != null && f(mapStatuses(mapIndex).location)) {
_numAvailableOutputs -= 1
Expand Down Expand Up @@ -344,7 +351,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging

// For testing
def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int)
: Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
: Iterator[(Location, Seq[(BlockId, Long, Int)])] = {
getMapSizesByExecutorId(shuffleId, 0, Int.MaxValue, reduceId, reduceId + 1)
}

Expand All @@ -365,7 +372,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
endPartition: Int): Iterator[(Location, Seq[(BlockId, Long, Int)])]

/**
* Deletes map output status information for the specified shuffle stage.
Expand Down Expand Up @@ -488,10 +495,10 @@ private[spark] class MapOutputTrackerMaster(
}
}

def updateMapOutput(shuffleId: Int, mapId: Long, bmAddress: BlockManagerId): Unit = {
def updateMapOutput(shuffleId: Int, mapId: Long, loc: Location): Unit = {
shuffleStatuses.get(shuffleId) match {
case Some(shuffleStatus) =>
shuffleStatus.updateMapOutput(mapId, bmAddress)
shuffleStatus.updateMapOutput(mapId, loc)
case None =>
logError(s"Asked to update map output for unknown shuffle ${shuffleId}")
}
Expand All @@ -502,10 +509,10 @@ private[spark] class MapOutputTrackerMaster(
}

/** Unregister map output information of the given shuffle, mapper and block manager */
def unregisterMapOutput(shuffleId: Int, mapIndex: Int, bmAddress: BlockManagerId): Unit = {
def unregisterMapOutput(shuffleId: Int, mapIndex: Int, loc: Location): Unit = {
shuffleStatuses.get(shuffleId) match {
case Some(shuffleStatus) =>
shuffleStatus.removeMapOutput(mapIndex, bmAddress)
shuffleStatus.removeMapOutput(mapIndex, loc)
incrementEpoch()
case None =>
throw new SparkException("unregisterMapOutput called for nonexistent shuffle ID")
Expand Down Expand Up @@ -643,10 +650,12 @@ private[spark] class MapOutputTrackerMaster(
: Seq[String] = {
if (shuffleLocalityEnabled && dep.rdd.partitions.length < SHUFFLE_PREF_MAP_THRESHOLD &&
dep.partitioner.numPartitions < SHUFFLE_PREF_REDUCE_THRESHOLD) {
val blockManagerIds = getLocationsWithLargestOutputs(dep.shuffleId, partitionId,
val locations = getLocationsWithLargestOutputs(dep.shuffleId, partitionId,
dep.partitioner.numPartitions, REDUCER_PREF_LOCS_FRACTION)
if (blockManagerIds.nonEmpty) {
blockManagerIds.get.map(_.host)
if (locations.nonEmpty) {
locations.get
.filter(_.isInstanceOf[HostLocation])
.map(_.asInstanceOf[HostLocation].host)
} else {
Nil
}
Expand All @@ -670,14 +679,14 @@ private[spark] class MapOutputTrackerMaster(
reducerId: Int,
numReducers: Int,
fractionThreshold: Double)
: Option[Array[BlockManagerId]] = {
: Option[Array[Location]] = {

val shuffleStatus = shuffleStatuses.get(shuffleId).orNull
if (shuffleStatus != null) {
shuffleStatus.withMapStatuses { statuses =>
if (statuses.nonEmpty) {
// HashMap to add up sizes of all blocks at the same location
val locs = new HashMap[BlockManagerId, Long]
val locs = new HashMap[Location, Long]
var totalOutputSize = 0L
var mapIdx = 0
while (mapIdx < statuses.length) {
Expand Down Expand Up @@ -728,7 +737,9 @@ private[spark] class MapOutputTrackerMaster(
if (startMapIndex < endMapIndex &&
(startMapIndex >= 0 && endMapIndex <= statuses.length)) {
val statusesPicked = statuses.slice(startMapIndex, endMapIndex).filter(_ != null)
statusesPicked.map(_.location.host).toSeq
statusesPicked
.filter(_.location.isInstanceOf[HostLocation])
.map(_.location.asInstanceOf[HostLocation].host).toSeq
} else {
Nil
}
Expand Down Expand Up @@ -758,7 +769,7 @@ private[spark] class MapOutputTrackerMaster(
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
endPartition: Int): Iterator[(Location, Seq[(BlockId, Long, Int)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId")
shuffleStatuses.get(shuffleId) match {
case Some(shuffleStatus) =>
Expand Down Expand Up @@ -810,7 +821,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
endPartition: Int): Iterator[(Location, Seq[(BlockId, Long, Int)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId")
val statuses = getStatuses(shuffleId, conf)
try {
Expand Down Expand Up @@ -989,9 +1000,9 @@ private[spark] object MapOutputTracker extends Logging {
endPartition: Int,
statuses: Array[MapStatus],
startMapIndex : Int,
endMapIndex: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
endMapIndex: Int): Iterator[(Location, Seq[(BlockId, Long, Int)])] = {
assert (statuses != null)
val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long, Int)]]
val splitsByAddress = new HashMap[Location, ListBuffer[(BlockId, Long, Int)]]
val iter = statuses.iterator.zipWithIndex
for ((status, mapIndex) <- iter.slice(startMapIndex, endMapIndex)) {
if (status == null) {
Expand Down
18 changes: 18 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,24 @@ package object config {
.stringConf
.createWithDefault(classOf[LocalDiskShuffleDataIO].getName)

private[spark] val SHUFFLE_LOCATION_PLUGIN_CLASS =
ConfigBuilder("spark.shuffle.sort.location.plugin.class")
.doc("Qualified name of the class that used to initiate plugin location instance. " +
"If not specified, Spark will use its native location (a.k.a BlockManagerId) by default.")
.version("3.2.0")
.stringConf
.createOptional
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to use createOptional here? Instead of saying "If not specified, Spark will use its native location (a.k.a BlockManagerId) by default."), can we specify the class here explicitly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used the optional intentionally.

I tried the way of the default value (BlockManagerId) previously. And I realized that we wouldn't expect to load the BlockManager using the reflection, which is low efficiency, but create it directly as it's a built-in class in Spark. As a result, the default value won't be used at all (or it would be only used to see if it's not the default value by comparison, but this's meaningless to do so).


private[spark] val SHUFFLE_LOCATION_CACHE_SIZE =
ConfigBuilder("spark.shuffle.sort.location.cacheSize")
.doc("The cache size for the location instances. Bigger size means that Spark will have " +
"more chances to reuse the location instance for the same location but takes more memory.")
.version("3.2.0")
.intConf
// In the case of `BlockManagerId`, which takes 48B for each, the total memory cost should
// be below 1MB which is feasible.
.createWithDefault(10000)

private[spark] val SHUFFLE_FILE_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.file.buffer")
.doc("Size of the in-memory buffer for each shuffle file output stream, in KiB unless " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1658,7 +1658,7 @@ private[spark] class DAGScheduler(
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
shuffleStage.pendingPartitions -= task.partitionId
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
val execId = event.taskInfo.executorId
logDebug("ShuffleMapTask finished on " + execId)
if (executorFailureEpoch.contains(execId) &&
smt.epoch <= executorFailureEpoch(execId)) {
Expand Down
Loading