Skip to content

init commit for hiding Mapoutputtracker behind shuffle manager #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 74 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
d484dde
[SPARK-2163] class LBFGS optimize with Double tolerance instead of Int
BaiGang Jun 20, 2014
6a224c3
SPARK-1868: Users should be allowed to cogroup at least 4 RDDs
douglaz Jun 20, 2014
171ebb3
SPARK-2180: support HAVING clauses in Hive queries
willb Jun 20, 2014
0ac71d1
[SPARK-2225] Turn HAVING without GROUP BY into WHERE.
rxin Jun 20, 2014
01125a1
Clean up CacheManager et al.
andrewor14 Jun 21, 2014
d4c7572
Move ScriptTransformation into the appropriate place.
rxin Jun 21, 2014
2044784
[SQL] Use hive.SessionState, not the thread local SessionState
aarondav Jun 21, 2014
08d0aca
SPARK-1902 Silence stacktrace from logs when doing port failover to p…
ash211 Jun 21, 2014
e99903b
[SPARK-1970] Update unit test in XORShiftRandomSuite to use ChiSquare…
dorx Jun 21, 2014
a678642
HOTFIX: Fixing style error introduced by 08d0ac
pwendell Jun 21, 2014
010c460
[SPARK-2061] Made splits deprecated in JavaRDDLike
AtlasPilotPuppy Jun 21, 2014
648553d
Fix some tests.
Jun 21, 2014
ca5d8b5
[SQL] Pass SQLContext instead of SparkContext into physical operators.
rxin Jun 21, 2014
ec935ab
[SQL] Break hiveOperators.scala into multiple files.
rxin Jun 21, 2014
0a432d6
HOTFIX: Fix missing MIMA ignore
pwendell Jun 21, 2014
3e0b078
HOTFIX: Add excludes for new MIMA files
pwendell Jun 21, 2014
1db9cbc
SPARK-1996. Remove use of special Maven repo for Akka
srowen Jun 22, 2014
58b32f3
SPARK-2231: dev/run-tests should include YARN and use a recent Hadoop…
pwendell Jun 22, 2014
476581e
SPARK-2034. KafkaInputDStream doesn't close resources and may prevent…
srowen Jun 22, 2014
9fe28c3
SPARK-1316. Remove use of Commons IO
srowen Jun 22, 2014
409d24e
SPARK-2229: FileAppender throw an llegalArgumentException in jdk6
witgo Jun 23, 2014
9fc373e
SPARK-2241: quote command line args in ec2 script
orikremer Jun 23, 2014
9cb64b2
SPARK-2166 - Listing of instances to be terminated before the prompt
Jun 23, 2014
e380767
[SPARK-1395] Fix "local:" URI support in Yarn mode (again).
Jun 23, 2014
b88238f
Fixed small running on YARN docs typo
frol Jun 23, 2014
853a2b9
Fix mvn detection
Jun 23, 2014
a4bc442
[SPARK-1669][SQL] Made cacheTable idempotent
liancheng Jun 23, 2014
6dc6722
[SPARK-2118] spark class should complain if tools jar is missing.
ScrapCodes Jun 23, 2014
21ddd7d
[SPARK-1768] History server enhancements.
Jun 23, 2014
383bf72
Cleanup on Connection, ConnectionManagerId, ConnectionManager classes…
hsaputra Jun 24, 2014
51c8168
[SPARK-2227] Support dfs command in SQL.
rxin Jun 24, 2014
56eb8af
[SPARK-2124] Move aggregation into shuffle implementations
jerryshao Jun 24, 2014
420c1c3
[SPARK-2252] Fix MathJax for HTTPs.
rxin Jun 24, 2014
924b708
SPARK-1937: fix issue with task locality
Jun 24, 2014
221909e
HOTFIX: Disabling tests per SPARK-2264
pwendell Jun 24, 2014
1978a90
Fix broken Json tests.
kayousterhout Jun 24, 2014
a162c9b
[SPARK-2264][SQL] Fix failing CachedTableSuite
marmbrus Jun 25, 2014
8ca4176
[SPARK-1112, 2156] Bootstrap to fetch the driver's Spark properties.
mengxr Jun 25, 2014
133495d
[SQL]Add base row updating methods for JoinedRow
chenghao-intel Jun 25, 2014
54055fb
Autodetect JAVA_HOME on RPM-based systems
Jun 25, 2014
2714968
Fix possible null pointer in acumulator toString
marmbrus Jun 25, 2014
b6b4485
SPARK-2248: spark.default.parallelism does not apply in local mode
witgo Jun 25, 2014
8fade89
[SPARK-2263][SQL] Support inserting MAP<K, V> to Hive tables
liancheng Jun 25, 2014
22036ae
[BUGFIX][SQL] Should match java.math.BigDecimal when wnrapping Hive o…
liancheng Jun 25, 2014
acc01ab
SPARK-2038: rename "conf" parameters in the saveAsHadoop functions wi…
CodingCat Jun 25, 2014
ac06a85
Replace doc reference to Shark with Spark SQL.
rxin Jun 25, 2014
5603e4c
[SPARK-2242] HOTFIX: pyspark shell hangs on simple job
andrewor14 Jun 25, 2014
9aa6032
[SPARK-2258 / 2266] Fix a few worker UI bugs
andrewor14 Jun 25, 2014
7ff2c75
[SPARK-2270] Kryo cannot serialize results returned by asJavaIterable
rxin Jun 25, 2014
1132e47
[SPARK-2204] Launch tasks on the proper executors in mesos fine-grain…
sebastienrainville Jun 25, 2014
9d824fe
[SQL] SPARK-1800 Add broadcast hash join operator & associated hints.
concretevitamin Jun 26, 2014
7f196b0
[SPARK-2283][SQL] Reset test environment before running PruningSuite
liancheng Jun 26, 2014
b88a59a
[SPARK-1749] Job cancellation when SchedulerBackend does not implemen…
markhamstra Jun 26, 2014
4a346e2
[SPARK-2284][UI] Mark all failed tasks as failures.
rxin Jun 26, 2014
441cdcc
[SPARK-2172] PySpark cannot import mllib modules in YARN-client mode
piotrszul Jun 26, 2014
e4899a2
[SPARK-2254] [SQL] ScalaRefection should mark primitive types as non-…
ueshin Jun 26, 2014
48a82a8
Remove use of spark.worker.instances
kayousterhout Jun 26, 2014
32a1ad7
[SPARK-2295] [SQL] Make JavaBeans nullability stricter.
ueshin Jun 26, 2014
6587ef7
[SPARK-2286][UI] Report exception/errors for failed tasks that are no…
rxin Jun 26, 2014
62d4a0f
Fixing AWS instance type information based upon current EC2 data
Jun 26, 2014
f1f7385
Strip '@' symbols when merging pull requests.
pwendell Jun 27, 2014
981bde9
[SQL]Extract the joinkeys from join condition
chenghao-intel Jun 27, 2014
bf578de
Removed throwable field from FetchFailedException and added MetadataF…
rxin Jun 27, 2014
d1636dd
[SPARK-2297][UI] Make task attempt and speculation more explicit in UI.
rxin Jun 27, 2014
b9b2f8f
init commit for hiding Mapoutputtracker behind shuffle manager
CodingCat Jun 20, 2014
b7514b5
style fix
CodingCat Jun 20, 2014
07f8f24
avoid type checking
CodingCat Jun 20, 2014
982252f
fix test cases
CodingCat Jun 20, 2014
235a7d7
fix BlockManagerSuite
CodingCat Jun 20, 2014
a073a3a
customize MapOutputTracker by passing arguments in SparkConf
CodingCat Jun 21, 2014
31b7be1
disassociate epoch with shuffleManager
CodingCat Jun 21, 2014
573ca34
add docs and move MapStatus to shuffle package
CodingCat Jun 27, 2014
71f3633
fix trackerActor timeout issue
CodingCat Jun 27, 2014
d684705
fix npe in DAGSchedulerSuite
CodingCat Jun 27, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ target
.classpath
.mima-excludes
.generated-mima-excludes
.generated-mima-class-excludes
.generated-mima-member-excludes
.rat-excludes
.*md
derby.log
Expand Down
5 changes: 5 additions & 0 deletions bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ else
fi

if [[ "$1" =~ org.apache.spark.tools.* ]]; then
if test -z "$SPARK_TOOLS_JAR"; then
echo "Failed to find Spark Tools Jar in $FWDIR/tools/target/scala-$SCALA_VERSION/" 1>&2
echo "You need to build spark before running $1." 1>&2
exit 1
fi
CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR"
fi

Expand Down
1 change: 1 addition & 0 deletions conf/log4j.properties.template
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}:

# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@
<artifactId>easymockclassextension</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.novocode</groupId>
<artifactId>junit-interface</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}:

# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class Accumulable[R, T] (
Accumulators.register(this, false)
}

override def toString = value_.toString
override def toString = if (value_ == null) "null" else value_.toString
}

/**
Expand Down
163 changes: 92 additions & 71 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,105 +20,54 @@ package org.apache.spark
import scala.collection.mutable.{ArrayBuffer, HashSet}

import org.apache.spark.rdd.RDD
import org.apache.spark.storage.{BlockId, BlockManager, BlockStatus, RDDBlockId, StorageLevel}
import org.apache.spark.storage._

/**
* Spark class responsible for passing RDDs split contents to the BlockManager and making
* Spark class responsible for passing RDDs partition contents to the BlockManager and making
* sure a node doesn't load two copies of an RDD at once.
*/
private[spark] class CacheManager(blockManager: BlockManager) extends Logging {

/** Keys of RDD splits that are being computed/loaded. */
/** Keys of RDD partitions that are being computed/loaded. */
private val loading = new HashSet[RDDBlockId]()

/** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
/** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](
rdd: RDD[T],
split: Partition,
partition: Partition,
context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = {

val key = RDDBlockId(rdd.id, split.index)
val key = RDDBlockId(rdd.id, partition.index)
logDebug(s"Looking for partition $key")
blockManager.get(key) match {
case Some(values) =>
// Partition is already materialized, so just return its values
new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])

case None =>
// Mark the split as loading (unless someone else marks it first)
loading.synchronized {
if (loading.contains(key)) {
logInfo(s"Another thread is loading $key, waiting for it to finish...")
while (loading.contains(key)) {
try {
loading.wait()
} catch {
case e: Exception =>
logWarning(s"Got an exception while waiting for another thread to load $key", e)
}
}
logInfo(s"Finished waiting for $key")
/* See whether someone else has successfully loaded it. The main way this would fail
* is for the RDD-level cache eviction policy if someone else has loaded the same RDD
* partition but we didn't want to make space for it. However, that case is unlikely
* because it's unlikely that two threads would work on the same RDD partition. One
* downside of the current code is that threads wait serially if this does happen. */
blockManager.get(key) match {
case Some(values) =>
return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
case None =>
logInfo(s"Whoever was loading $key failed; we'll try it ourselves")
loading.add(key)
}
} else {
loading.add(key)
}
// Acquire a lock for loading this partition
// If another thread already holds the lock, wait for it to finish return its results
val storedValues = acquireLockForPartition[T](key)
if (storedValues.isDefined) {
return new InterruptibleIterator[T](context, storedValues.get)
}

// Otherwise, we have to load the partition ourselves
try {
// If we got here, we have to load the split
logInfo(s"Partition $key not found, computing it")
val computedValues = rdd.computeOrReadCheckpoint(split, context)
val computedValues = rdd.computeOrReadCheckpoint(partition, context)

// Persist the result, so long as the task is not running locally
// If the task is running locally, do not persist the result
if (context.runningLocally) {
return computedValues
}

// Keep track of blocks with updated statuses
var updatedBlocks = Seq[(BlockId, BlockStatus)]()
val returnValue: Iterator[T] = {
if (storageLevel.useDisk && !storageLevel.useMemory) {
/* In the case that this RDD is to be persisted using DISK_ONLY
* the iterator will be passed directly to the blockManager (rather then
* caching it to an ArrayBuffer first), then the resulting block data iterator
* will be passed back to the user. If the iterator generates a lot of data,
* this means that it doesn't all have to be held in memory at one time.
* This could also apply to MEMORY_ONLY_SER storage, but we need to make sure
* blocks aren't dropped by the block store before enabling that. */
updatedBlocks = blockManager.put(key, computedValues, storageLevel, tellMaster = true)
blockManager.get(key) match {
case Some(values) =>
values.asInstanceOf[Iterator[T]]
case None =>
logInfo(s"Failure to store $key")
throw new SparkException("Block manager failed to return persisted value")
}
} else {
// In this case the RDD is cached to an array buffer. This will save the results
// if we're dealing with a 'one-time' iterator
val elements = new ArrayBuffer[Any]
elements ++= computedValues
updatedBlocks = blockManager.put(key, elements, storageLevel, tellMaster = true)
elements.iterator.asInstanceOf[Iterator[T]]
}
}

// Update task metrics to include any blocks whose storage status is updated
val metrics = context.taskMetrics
metrics.updatedBlocks = Some(updatedBlocks)

new InterruptibleIterator(context, returnValue)
// Otherwise, cache the values and keep track of any updates in block statuses
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
context.taskMetrics.updatedBlocks = Some(updatedBlocks)
new InterruptibleIterator(context, cachedValues)

} finally {
loading.synchronized {
Expand All @@ -128,4 +77,76 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
}
}
}

/**
* Acquire a loading lock for the partition identified by the given block ID.
*
* If the lock is free, just acquire it and return None. Otherwise, another thread is already
* loading the partition, so we wait for it to finish and return the values loaded by the thread.
*/
private def acquireLockForPartition[T](id: RDDBlockId): Option[Iterator[T]] = {
loading.synchronized {
if (!loading.contains(id)) {
// If the partition is free, acquire its lock to compute its value
loading.add(id)
None
} else {
// Otherwise, wait for another thread to finish and return its result
logInfo(s"Another thread is loading $id, waiting for it to finish...")
while (loading.contains(id)) {
try {
loading.wait()
} catch {
case e: Exception =>
logWarning(s"Exception while waiting for another thread to load $id", e)
}
}
logInfo(s"Finished waiting for $id")
val values = blockManager.get(id)
if (!values.isDefined) {
/* The block is not guaranteed to exist even after the other thread has finished.
* For instance, the block could be evicted after it was put, but before our get.
* In this case, we still need to load the partition ourselves. */
logInfo(s"Whoever was loading $id failed; we'll try it ourselves")
loading.add(id)
}
values.map(_.asInstanceOf[Iterator[T]])
}
}
}

/**
* Cache the values of a partition, keeping track of any updates in the storage statuses
* of other blocks along the way.
*/
private def putInBlockManager[T](
key: BlockId,
values: Iterator[T],
storageLevel: StorageLevel,
updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)]): Iterator[T] = {

if (!storageLevel.useMemory) {
/* This RDD is not to be cached in memory, so we can just pass the computed values
* as an iterator directly to the BlockManager, rather than first fully unrolling
* it in memory. The latter option potentially uses much more memory and risks OOM
* exceptions that can be avoided. */
updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true)
blockManager.get(key) match {
case Some(v) => v.asInstanceOf[Iterator[T]]
case None =>
logInfo(s"Failure to store $key")
throw new BlockException(key, s"Block manager failed to return cached value for $key!")
}
} else {
/* This RDD is to be cached in memory. In this case we cannot pass the computed values
* to the BlockManager as an iterator and expect to read it back later. This is because
* we may end up dropping a partition from memory store before getting it back, e.g.
* when the entirety of the RDD does not fit in memory. */
val elements = new ArrayBuffer[Any]
elements ++= values
updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true)
elements.iterator.asInstanceOf[Iterator[T]]
}
}

}
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
def doCleanupShuffle(shuffleId: Int, blocking: Boolean) {
try {
logDebug("Cleaning shuffle " + shuffleId)
mapOutputTrackerMaster.unregisterShuffle(shuffleId)
shuffleManager.unregisterShuffle(shuffleId)
blockManagerMaster.removeShuffle(shuffleId, blocking)
listeners.foreach(_.shuffleCleaned(shuffleId))
logInfo("Cleaned shuffle " + shuffleId)
Expand All @@ -173,7 +173,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {

private def blockManagerMaster = sc.env.blockManager.master
private def broadcastManager = sc.env.broadcastManager
private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
private def shuffleManager = sc.env.shuffleManager

// Used for testing. These methods explicitly blocks until cleanup is completed
// to ensure that more reliable testing.
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ class ShuffleDependency[K, V, C](
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None)
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {

val shuffleId: Int = rdd.context.newShuffleId()
Expand Down
21 changes: 4 additions & 17 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ class SparkEnv (
val serializer: Serializer,
val closureSerializer: Serializer,
val cacheManager: CacheManager,
val mapOutputTracker: MapOutputTracker,
val shuffleManager: ShuffleManager,
val broadcastManager: BroadcastManager,
val blockManager: BlockManager,
Expand All @@ -80,7 +79,6 @@ class SparkEnv (
private[spark] def stop() {
pythonWorkers.foreach { case(key, worker) => worker.stop() }
httpFileServer.stop()
mapOutputTracker.stop()
shuffleManager.stop()
broadcastManager.stop()
blockManager.stop()
Expand Down Expand Up @@ -202,24 +200,17 @@ object SparkEnv extends Logging {
}
}

val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf)
} else {
new MapOutputTrackerWorker(conf)
}
val shuffleManager = instantiateClass[ShuffleManager](
"spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager")

// Have to assign trackerActor after initialization as MapOutputTrackerActor
// requires the MapOutputTracker itself
mapOutputTracker.trackerActor = registerOrLookup(
"MapOutputTracker",
new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
shuffleManager.initMapOutputTracker(conf, isDriver, actorSystem)

val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
"BlockManagerMaster",
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf)

val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
serializer, conf, securityManager, mapOutputTracker)
serializer, conf, securityManager, shuffleManager)

val connectionManager = blockManager.connectionManager

Expand Down Expand Up @@ -247,9 +238,6 @@ object SparkEnv extends Logging {
"."
}

val shuffleManager = instantiateClass[ShuffleManager](
"spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager")

// Warn about deprecated spark.cache.class property
if (conf.contains("spark.cache.class")) {
logWarning("The spark.cache.class property is no longer being used! Specify storage " +
Expand All @@ -262,7 +250,6 @@ object SparkEnv extends Logging {
serializer,
closureSerializer,
cacheManager,
mapOutputTracker,
shuffleManager,
broadcastManager,
blockManager,
Expand Down
Loading