Skip to content

Commit 0d17060

Browse files
committed
Import, comments, and style fixes (minor)
1 parent c92e4d9 commit 0d17060

20 files changed

+42
-48
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
112112
}
113113

114114
/**
115-
* Called from executors to get the server URIs and
116-
* output sizes of the map outputs of a given shuffle
115+
* Called from executors to get the server URIs and output sizes of the map outputs of
116+
* a given shuffle.
117117
*/
118118
def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = {
119119
val statuses = mapStatuses.get(shuffleId).orNull
@@ -218,10 +218,9 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
218218
private var cacheEpoch = epoch
219219

220220
/**
221-
* Timestamp based HashMap for storing mapStatuses and cached serialized statuses
222-
* in the master, so that statuses are dropped only by explicit deregistering or
223-
* by TTL-based cleaning (if set). Other than these two
224-
* scenarios, nothing should be dropped from this HashMap.
221+
* Timestamp based HashMap for storing mapStatuses and cached serialized statuses in the master,
222+
* so that statuses are dropped only by explicit deregistering or by TTL-based cleaning (if set).
223+
* Other than these two scenarios, nothing should be dropped from this HashMap.
225224
*/
226225
protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]()
227226
private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]()

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ class SparkContext(
230230

231231
private[spark] val cleaner = new ContextCleaner(this)
232232
cleaner.start()
233+
233234
postEnvironmentUpdate()
234235

235236
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
@@ -773,7 +774,7 @@ class SparkContext(
773774
* filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
774775
*/
775776
def addJar(path: String) {
776-
if (path == null) {
777+
if (path == null) {
777778
logWarning("null specified as parameter to addJar")
778779
} else {
779780
var key = ""

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ object SparkEnv extends Logging {
185185
} else {
186186
new MapOutputTrackerWorker(conf)
187187
}
188+
188189
// Have to assign trackerActor after initialization as MapOutputTrackerActor
189190
// requires the MapOutputTracker itself
190191
mapOutputTracker.trackerActor = registerOrLookup(

core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import org.apache.spark.SparkConf
2727
* entire Spark job.
2828
*/
2929
trait BroadcastFactory {
30-
def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager): Unit
30+
def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager)
3131
def newBroadcast[T](value: T, isLocal: Boolean, id: Long): Broadcast[T]
3232
def unbroadcast(id: Long, removeFromDriver: Boolean)
33-
def stop(): Unit
33+
def stop()
3434
}

core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ private[spark] object HttpBroadcast extends Logging {
9090
private var securityManager: SecurityManager = null
9191

9292
// TODO: This shouldn't be a global variable so that multiple SparkContexts can coexist
93-
val files = new TimeStampedHashSet[String]
93+
private val files = new TimeStampedHashSet[String]
9494
private var cleaner: MetadataCleaner = null
9595

9696
private val httpReadTimeout = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES).toInt
@@ -195,7 +195,7 @@ private[spark] object HttpBroadcast extends Logging {
195195
def unpersist(id: Long, removeFromDriver: Boolean) = synchronized {
196196
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver)
197197
if (removeFromDriver) {
198-
val file = new File(broadcastDir, BroadcastBlockId(id).name)
198+
val file = getFile(id)
199199
files.remove(file.toString)
200200
deleteBroadcastFile(file)
201201
}
@@ -217,10 +217,9 @@ private[spark] object HttpBroadcast extends Logging {
217217
}
218218
}
219219

220-
/** Delete the given broadcast file. */
221220
private def deleteBroadcastFile(file: File) {
222221
try {
223-
if (!file.exists()) {
222+
if (!file.exists) {
224223
logWarning("Broadcast file to be deleted does not exist: %s".format(file))
225224
} else if (file.delete()) {
226225
logInfo("Deleted broadcast file: %s".format(file))
@@ -229,7 +228,7 @@ private[spark] object HttpBroadcast extends Logging {
229228
}
230229
} catch {
231230
case e: Exception =>
232-
logWarning("Exception while deleting broadcast file: %s".format(file), e)
231+
logError("Exception while deleting broadcast file: %s".format(file), e)
233232
}
234233
}
235234

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ private[spark] class TorrentBroadcast[T](@transient var value_ : T, isLocal: Boo
7272
}
7373

7474
/**
75-
* Remove all persisted state associated with this HTTP broadcast.
75+
* Remove all persisted state associated with this Torrent broadcast.
7676
* @param removeFromDriver Whether to remove state from the driver.
7777
*/
7878
override def unpersist(removeFromDriver: Boolean) {
@@ -177,13 +177,12 @@ private[spark] class TorrentBroadcast[T](@transient var value_ : T, isLocal: Boo
177177
}
178178

179179
private[spark] object TorrentBroadcast extends Logging {
180+
private lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
180181
private var initialized = false
181182
private var conf: SparkConf = null
182183

183-
lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
184-
185184
def initialize(_isDriver: Boolean, conf: SparkConf) {
186-
TorrentBroadcast.conf = conf //TODO: we might have to fix it in tests
185+
TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests
187186
synchronized {
188187
if (!initialized) {
189188
initialized = true

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ abstract class RDD[T: ClassTag](
158158
*/
159159
def unpersist(blocking: Boolean = true): RDD[T] = {
160160
logInfo("Removing RDD " + id + " from persistence list")
161-
sc.unpersistRDD(this.id, blocking)
161+
sc.unpersistRDD(id, blocking)
162162
storageLevel = StorageLevel.NONE
163163
this
164164
}
@@ -1128,4 +1128,5 @@ abstract class RDD[T: ClassTag](
11281128
def toJavaRDD() : JavaRDD[T] = {
11291129
new JavaRDD(this)(elementClassTag)
11301130
}
1131+
11311132
}

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1090,7 +1090,6 @@ class DAGScheduler(
10901090
eventProcessActor ! StopDAGScheduler
10911091
}
10921092
taskScheduler.stop()
1093-
listenerBus.stop()
10941093
}
10951094
}
10961095

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ private[spark] class BlockManager(
4949
maxMemory: Long,
5050
val conf: SparkConf,
5151
securityManager: SecurityManager,
52-
mapOutputTracker: MapOutputTracker
53-
) extends Logging {
52+
mapOutputTracker: MapOutputTracker)
53+
extends Logging {
5454

5555
val shuffleBlockManager = new ShuffleBlockManager(this)
5656
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
8282

8383
/**
8484
* Check if block manager master has a block. Note that this can be used to check for only
85-
* those blocks that are expected to be reported to block manager master.
85+
* those blocks that are reported to block manager master.
8686
*/
8787
def contains(blockId: BlockId) = {
8888
!getLocations(blockId).isEmpty

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
167167
* from the executors, but not from the driver.
168168
*/
169169
private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean) {
170-
// TODO(aor): Consolidate usages of <driver>
170+
// TODO: Consolidate usages of <driver>
171171
val removeMsg = RemoveBroadcast(broadcastId)
172172
blockManagerInfo.values
173173
.filter { info => removeFromDriver || info.blockManagerId.executorId != "<driver>" }
@@ -350,7 +350,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
350350
// Note that this logic will select the same node multiple times if there aren't enough peers
351351
Array.tabulate[BlockManagerId](size) { i => peers((selfIndex + i + 1) % peers.length) }.toSeq
352352
}
353-
354353
}
355354

356355

core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,9 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
2222
import akka.actor.ActorRef
2323

2424
private[storage] object BlockManagerMessages {
25-
2625
//////////////////////////////////////////////////////////////////////////////////
2726
// Messages from the master to slaves.
2827
//////////////////////////////////////////////////////////////////////////////////
29-
3028
sealed trait ToBlockManagerSlave
3129

3230
// Remove a block from the slaves that have it. This can only be used to remove
@@ -50,7 +48,6 @@ private[storage] object BlockManagerMessages {
5048
//////////////////////////////////////////////////////////////////////////////////
5149
// Messages from slaves to the master.
5250
//////////////////////////////////////////////////////////////////////////////////
53-
5451
sealed trait ToBlockManagerMaster
5552

5653
case class RegisterBlockManager(
@@ -122,5 +119,4 @@ private[storage] object BlockManagerMessages {
122119

123120
// For testing. Have the master ask all slaves for the given block's storage level.
124121
case class AskForStorageLevels(blockId: BlockId) extends ToBlockManagerMaster
125-
126122
}

core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
9090

9191
def getFile(blockId: BlockId): File = getFile(blockId.name)
9292

93-
/** Check if disk block manager has a block */
93+
/** Check if disk block manager has a block. */
9494
def containsBlock(blockId: BlockId): Boolean = {
9595
getBlockLocation(blockId).file.exists()
9696
}

core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,13 +169,13 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
169169
throw new IllegalStateException("Failed to find shuffle block: " + id)
170170
}
171171

172-
/** Remove all the blocks / files and metadata related to a particular shuffle */
172+
/** Remove all the blocks / files and metadata related to a particular shuffle. */
173173
def removeShuffle(shuffleId: ShuffleId) {
174174
removeShuffleBlocks(shuffleId)
175175
shuffleStates.remove(shuffleId)
176176
}
177177

178-
/** Remove all the blocks / files related to a particular shuffle */
178+
/** Remove all the blocks / files related to a particular shuffle. */
179179
private def removeShuffleBlocks(shuffleId: ShuffleId) {
180180
shuffleStates.get(shuffleId) match {
181181
case Some(state) =>

core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ private[spark] object ThreadingTest {
4848
val block = (1 to blockSize).map(_ => Random.nextInt())
4949
val level = randomLevel()
5050
val startTime = System.currentTimeMillis()
51-
manager.put(blockId, block.iterator, level, true)
51+
manager.put(blockId, block.iterator, level, tellMaster = true)
5252
println("Pushed block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms")
5353
queue.add((blockId, block))
5454
}

core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,15 +78,16 @@ private[spark] object MetadataCleaner {
7878
conf.getInt("spark.cleaner.ttl", -1)
7979
}
8080

81-
def getDelaySeconds(conf: SparkConf, cleanerType: MetadataCleanerType.MetadataCleanerType): Int =
82-
{
83-
conf.get(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds(conf).toString)
84-
.toInt
81+
def getDelaySeconds(
82+
conf: SparkConf,
83+
cleanerType: MetadataCleanerType.MetadataCleanerType): Int = {
84+
conf.get(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds(conf).toString).toInt
8585
}
8686

87-
def setDelaySeconds(conf: SparkConf, cleanerType: MetadataCleanerType.MetadataCleanerType,
88-
delay: Int)
89-
{
87+
def setDelaySeconds(
88+
conf: SparkConf,
89+
cleanerType: MetadataCleanerType.MetadataCleanerType,
90+
delay: Int) {
9091
conf.set(MetadataCleanerType.systemProperty(cleanerType), delay.toString)
9192
}
9293

core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717

1818
package org.apache.spark.util
1919

20-
import scala.collection.{JavaConversions, immutable}
21-
22-
import java.util
2320
import java.lang.ref.WeakReference
21+
import java.util
2422
import java.util.concurrent.ConcurrentHashMap
23+
import java.util.concurrent.atomic.AtomicInteger
24+
25+
import scala.collection.JavaConversions
2526

2627
import org.apache.spark.Logging
27-
import java.util.concurrent.atomic.AtomicInteger
2828

2929
private[util] case class TimeStampedWeakValue[T](timestamp: Long, weakValue: WeakReference[T]) {
3030
def this(timestamp: Long, value: T) = this(timestamp, new WeakReference[T](value))

core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
9696
assert(tracker.getServerStatuses(10, 0).isEmpty)
9797
}
9898

99-
test("master register shuffle and unregister mapoutput and fetch") {
99+
test("master register shuffle and unregister map output and fetch") {
100100
val actorSystem = ActorSystem("test")
101101
val tracker = new MapOutputTrackerMaster(conf)
102102
tracker.trackerActor =

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ import org.scalatest.concurrent.Timeouts._
2828
import org.scalatest.matchers.ShouldMatchers._
2929
import org.scalatest.time.SpanSugar._
3030

31-
import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf, SparkContext}
32-
import org.apache.spark.{SecurityManager, SparkConf}
31+
import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
3332
import org.apache.spark.scheduler.LiveListenerBus
3433
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
3534
import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils}

core/src/test/scala/org/apache/spark/util/WrappedJavaHashMapSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
package org.apache.spark.util
1919

20-
import java.util
2120
import java.lang.ref.WeakReference
21+
import java.util
2222

2323
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
2424
import scala.util.Random

0 commit comments

Comments
 (0)