Skip to content

Commit db6ecc5

Browse files
author
liguoqiang
committed
Merge branch 'master' into SPARK-1149
2 parents 1e3331e + 923dba5 commit db6ecc5

File tree

8 files changed

+28
-19
lines changed

8 files changed

+28
-19
lines changed

core/src/main/scala/org/apache/spark/BlockStoreShuffleFetcher.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
7979
val completionIter = CompletionIterator[T, Iterator[T]](itr, {
8080
val shuffleMetrics = new ShuffleReadMetrics
8181
shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
82-
shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime
8382
shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
8483
shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
8584
shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -103,13 +103,6 @@ class ShuffleReadMetrics extends Serializable {
103103
*/
104104
var fetchWaitTime: Long = _
105105

106-
/**
107-
* Total time spent fetching remote shuffle blocks. This aggregates the time spent fetching all
108-
* input blocks. Since block fetches are both pipelined and parallelized, this can
109-
* exceed fetchWaitTime and executorRunTime.
110-
*/
111-
var remoteFetchTime: Long = _
112-
113106
/**
114107
* Total number of remote bytes read from the shuffle by this task
115108
*/

core/src/main/scala/org/apache/spark/network/ConnectionManager.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -550,9 +550,6 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
550550

551551
newConnection
552552
}
553-
// I removed the lookupKey stuff as part of merge ... should I re-add it ? We did not find it
554-
// useful in our test-env ... If we do re-add it, we should consistently use it everywhere I
555-
// guess ?
556553
val connection = connectionsById.getOrElseUpdate(connectionManagerId, startNewConnection())
557554
message.senderAddress = id.toSocketAddress()
558555
logDebug("Sending [" + message + "] to [" + connectionManagerId + "]")

core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,6 @@ class JobLogger(val user: String, val logDirName: String)
275275
" BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched +
276276
" BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched +
277277
" REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime +
278-
" REMOTE_FETCH_TIME=" + metrics.remoteFetchTime +
279278
" REMOTE_BYTES_READ=" + metrics.remoteBytesRead
280279
case None => ""
281280
}

core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] wi
4949
def totalBlocks: Int
5050
def numLocalBlocks: Int
5151
def numRemoteBlocks: Int
52-
def remoteFetchTime: Long
5352
def fetchWaitTime: Long
5453
def remoteBytesRead: Long
5554
}
@@ -79,7 +78,6 @@ object BlockFetcherIterator {
7978
import blockManager._
8079

8180
private var _remoteBytesRead = 0L
82-
private var _remoteFetchTime = 0L
8381
private var _fetchWaitTime = 0L
8482

8583
if (blocksByAddress == null) {
@@ -125,7 +123,6 @@ object BlockFetcherIterator {
125123
future.onSuccess {
126124
case Some(message) => {
127125
val fetchDone = System.currentTimeMillis()
128-
_remoteFetchTime += fetchDone - fetchStart
129126
val bufferMessage = message.asInstanceOf[BufferMessage]
130127
val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage)
131128
for (blockMessage <- blockMessageArray) {
@@ -241,7 +238,6 @@ object BlockFetcherIterator {
241238
override def totalBlocks: Int = numLocal + numRemote
242239
override def numLocalBlocks: Int = numLocal
243240
override def numRemoteBlocks: Int = numRemote
244-
override def remoteFetchTime: Long = _remoteFetchTime
245241
override def fetchWaitTime: Long = _fetchWaitTime
246242
override def remoteBytesRead: Long = _remoteBytesRead
247243

core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,32 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
347347
*/
348348
pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
349349
}
350+
351+
test("lookup") {
352+
val pairs = sc.parallelize(Array((1,2), (3,4), (5,6), (5,7)))
353+
354+
assert(pairs.partitioner === None)
355+
assert(pairs.lookup(1) === Seq(2))
356+
assert(pairs.lookup(5) === Seq(6,7))
357+
assert(pairs.lookup(-1) === Seq())
358+
359+
}
360+
361+
test("lookup with partitioner") {
362+
val pairs = sc.parallelize(Array((1,2), (3,4), (5,6), (5,7)))
363+
364+
val p = new Partitioner {
365+
def numPartitions: Int = 2
366+
367+
def getPartition(key: Any): Int = Math.abs(key.hashCode() % 2)
368+
}
369+
val shuffled = pairs.partitionBy(p)
370+
371+
assert(shuffled.partitioner === Some(p))
372+
assert(shuffled.lookup(1) === Seq(2))
373+
assert(shuffled.lookup(5) === Seq(6,7))
374+
assert(shuffled.lookup(-1) === Seq())
375+
}
350376
}
351377

352378
/*

core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
129129
sm.localBlocksFetched should be > (0)
130130
sm.remoteBlocksFetched should be (0)
131131
sm.remoteBytesRead should be (0l)
132-
sm.remoteFetchTime should be (0l)
133132
}
134133
}
135134
}

docs/tuning.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,8 @@ their work directories), *not* on your driver program.
163163
**Cache Size Tuning**
164164

165165
One important configuration parameter for GC is the amount of memory that should be used for caching RDDs.
166-
By default, Spark uses 66% of the configured executor memory (`spark.executor.memory` or `SPARK_MEM`) to
167-
cache RDDs. This means that 33% of memory is available for any objects created during task execution.
166+
By default, Spark uses 60% of the configured executor memory (`spark.executor.memory` or `SPARK_MEM`) to
167+
cache RDDs. This means that 40% of memory is available for any objects created during task execution.
168168

169169
In case your tasks slow down and you find that your JVM is garbage-collecting frequently or running out of
170170
memory, lowering this value will help reduce the memory consumption. To change this to say 50%, you can call

0 commit comments

Comments
 (0)