Skip to content

Spark 1095 : Adding explicit return types to all public methods #168

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHad
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.mesos.MesosNativeLibrary

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
Expand Down Expand Up @@ -209,7 +210,7 @@ class SparkContext(
ui.start()

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = {
val hadoopConfiguration: Configuration = {
val env = SparkEnv.get
val hadoopConf = SparkHadoopUtil.get.newConfiguration()
// Explicitly check for S3 environment variables
Expand Down Expand Up @@ -610,7 +611,7 @@ class SparkContext(
* standard mutable collections. So you can use this with mutable Map, Set, etc.
*/
def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable, T]
(initialValue: R) = {
(initialValue: R): Accumulable[R, T] = {
val param = new GrowableAccumulableParam[R,T]
new Accumulable(initialValue, param)
}
Expand All @@ -620,7 +621,7 @@ class SparkContext(
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
* The variable will be sent to each cluster only once.
*/
def broadcast[T](value: T) = env.broadcastManager.newBroadcast[T](value, isLocal)
def broadcast[T](value: T): Broadcast[T] = env.broadcastManager.newBroadcast[T](value, isLocal)

/**
* Add a file to be downloaded with this Spark job on every node.
Expand Down Expand Up @@ -1072,7 +1073,7 @@ object SparkContext extends Logging {
implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd)

implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
rdd: RDD[(K, V)]) =
rdd: RDD[(K, V)]) =
new SequenceFileRDDFunctions(rdd)

implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassTag, V: ClassTag](
Expand Down Expand Up @@ -1109,27 +1110,33 @@ object SparkContext extends Logging {
}

// Helper objects for converting common types to Writable
private def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T) = {
private def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T)
: WritableConverter[T] = {
val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]]
new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
}

implicit def intWritableConverter() = simpleWritableConverter[Int, IntWritable](_.get)
implicit def intWritableConverter(): WritableConverter[Int] =
simpleWritableConverter[Int, IntWritable](_.get)

implicit def longWritableConverter() = simpleWritableConverter[Long, LongWritable](_.get)
implicit def longWritableConverter(): WritableConverter[Long] =
simpleWritableConverter[Long, LongWritable](_.get)

implicit def doubleWritableConverter() = simpleWritableConverter[Double, DoubleWritable](_.get)
implicit def doubleWritableConverter(): WritableConverter[Double] =
simpleWritableConverter[Double, DoubleWritable](_.get)

implicit def floatWritableConverter() = simpleWritableConverter[Float, FloatWritable](_.get)
implicit def floatWritableConverter(): WritableConverter[Float] =
simpleWritableConverter[Float, FloatWritable](_.get)

implicit def booleanWritableConverter() =
implicit def booleanWritableConverter(): WritableConverter[Boolean] =
simpleWritableConverter[Boolean, BooleanWritable](_.get)

implicit def bytesWritableConverter() = {
implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = {
simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes)
}

implicit def stringWritableConverter() = simpleWritableConverter[String, Text](_.toString)
implicit def stringWritableConverter(): WritableConverter[String] =
simpleWritableConverter[String, Text](_.toString)

implicit def writableWritableConverter[T <: Writable]() =
new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
Expand Down
15 changes: 11 additions & 4 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -391,19 +391,24 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Save this RDD as a text file, using string representations of elements.
*/
def saveAsTextFile(path: String) = rdd.saveAsTextFile(path)
def saveAsTextFile(path: String): Unit = {
rdd.saveAsTextFile(path)
}


/**
* Save this RDD as a compressed text file, using string representations of elements.
*/
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) =
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = {
rdd.saveAsTextFile(path, codec)
}

/**
* Save this RDD as a SequenceFile of serialized objects.
*/
def saveAsObjectFile(path: String) = rdd.saveAsObjectFile(path)
def saveAsObjectFile(path: String): Unit = {
rdd.saveAsObjectFile(path)
}

/**
* Creates tuples of the elements in this RDD by applying `f`.
Expand All @@ -420,7 +425,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* executed on this RDD. It is strongly recommended that this RDD is persisted in
* memory, otherwise saving it on a file will require recomputation.
*/
def checkpoint() = rdd.checkpoint()
def checkpoint(): Unit = {
rdd.checkpoint()
}

/**
* Return whether this RDD has been checkpointed or not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
sc.setCheckpointDir(dir)
}

def getCheckpointDir = JavaUtils.optionToOptional(sc.getCheckpointDir)
def getCheckpointDir: Optional[String] = JavaUtils.optionToOptional(sc.getCheckpointDir)

protected def checkpointFile[T](path: String): JavaRDD[T] = {
implicit val ctag: ClassTag[T] = fakeClassTag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,5 +112,5 @@ private[spark] class ClientArguments(args: Array[String]) {
}

object ClientArguments {
def isValidJarUrl(s: String) = s.matches("(.+):(.+)jar")
def isValidJarUrl(s: String): Boolean = s.matches("(.+):(.+)jar")
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import scala.collection.JavaConversions._
* Contains util methods to interact with Hadoop from Spark.
*/
class SparkHadoopUtil {
val conf = newConfiguration()
val conf: Configuration = newConfiguration()
UserGroupInformation.setConfiguration(conf)

def runAsUser(user: String)(func: () => Unit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.deploy.master
import scala.collection.JavaConversions._

import akka.serialization.Serialization
import org.apache.curator.framework.CuratorFramework
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import org.apache.zookeeper.CreateMode

import org.apache.spark.{Logging, SparkConf}
Expand All @@ -29,7 +30,7 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
with Logging
{
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
val zk = SparkCuratorUtil.newClient(conf)
val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)

SparkCuratorUtil.mkdir(zk, WORKING_DIR)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ConsoleSink(val property: Properties, val registry: MetricRegistry,
case None => CONSOLE_DEFAULT_PERIOD
}

val pollUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
val pollUnit: TimeUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase())
case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class CsvSink(val property: Properties, val registry: MetricRegistry,
case None => CSV_DEFAULT_PERIOD
}

val pollUnit = Option(property.getProperty(CSV_KEY_UNIT)) match {
val pollUnit: TimeUnit = Option(property.getProperty(CSV_KEY_UNIT)) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase())
case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class GraphiteSink(val property: Properties, val registry: MetricRegistry,
val GRAPHITE_KEY_UNIT = "unit"
val GRAPHITE_KEY_PREFIX = "prefix"

def propertyToOption(prop: String) = Option(property.getProperty(prop))
def propertyToOption(prop: String): Option[String] = Option(property.getProperty(prop))

if (!propertyToOption(GRAPHITE_KEY_HOST).isDefined) {
throw new Exception("Graphite sink requires 'host' property.")
Expand All @@ -57,7 +57,7 @@ class GraphiteSink(val property: Properties, val registry: MetricRegistry,
case None => GRAPHITE_DEFAULT_PERIOD
}

val pollUnit = propertyToOption(GRAPHITE_KEY_UNIT) match {
val pollUnit: TimeUnit = propertyToOption(GRAPHITE_KEY_UNIT) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase())
case None => TimeUnit.valueOf(GRAPHITE_DEFAULT_UNIT)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
array
}

override val partitioner = Some(part)
override val partitioner: Some[Partitioner] = Some(part)

override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
val sparkConf = SparkEnv.get.conf
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class HadoopRDD[K, V](
array
}

override def compute(theSplit: Partition, context: TaskContext) = {
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
val iter = new NextIterator[(K, V)] {
val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class JdbcRDD[T: ClassTag](
}

object JdbcRDD {
def resultSetToObjectArray(rs: ResultSet) = {
def resultSetToObjectArray(rs: ResultSet): Array[Object] = {
Array.tabulate[Object](rs.getMetaData.getColumnCount)(i => rs.getObject(i + 1))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class NewHadoopRDD[K, V](
result
}

override def compute(theSplit: Partition, context: TaskContext) = {
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
val iter = new Iterator[(K, V)] {
val split = theSplit.asInstanceOf[NewHadoopPartition]
logInfo("Input split: " + split.serializableHadoopSplit)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ abstract class RDD[T: ClassTag](
@transient var name: String = null

/** Assign a name to this RDD */
def setName(_name: String) = {
def setName(_name: String): RDD[T] = {
name = _name
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,16 @@ object StorageLevel {
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)

/** Create a new StorageLevel object */
def apply(useDisk: Boolean, useMemory: Boolean, deserialized: Boolean, replication: Int = 1) =
def apply(useDisk: Boolean, useMemory: Boolean, deserialized: Boolean,
replication: Int = 1): StorageLevel =
getCachedStorageLevel(new StorageLevel(useDisk, useMemory, deserialized, replication))

/** Create a new StorageLevel object from its integer representation */
def apply(flags: Int, replication: Int) =
def apply(flags: Int, replication: Int): StorageLevel =
getCachedStorageLevel(new StorageLevel(flags, replication))

/** Read StorageLevel object from ObjectInput stream */
def apply(in: ObjectInput) = {
def apply(in: ObjectInput): StorageLevel = {
val obj = new StorageLevel()
obj.readExternal(in)
getCachedStorageLevel(obj)
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/util/Distribution.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.util

import java.io.PrintStream

import scala.collection.immutable.IndexedSeq

/**
* Util for getting some stats from a small sample of numeric values, with some handy
* summary functions.
Expand All @@ -40,15 +42,16 @@ class Distribution(val data: Array[Double], val startIdx: Int, val endIdx: Int)
* given from 0 to 1
* @param probabilities
*/
def getQuantiles(probabilities: Traversable[Double] = defaultProbabilities) = {
def getQuantiles(probabilities: Traversable[Double] = defaultProbabilities)
: IndexedSeq[Double] = {
probabilities.toIndexedSeq.map{p:Double => data(closestIndex(p))}
}

private def closestIndex(p: Double) = {
math.min((p * length).toInt + startIdx, endIdx - 1)
}

def showQuantiles(out: PrintStream = System.out) = {
def showQuantiles(out: PrintStream = System.out): Unit = {
out.println("min\t25%\t50%\t75%\tmax")
getQuantiles(defaultProbabilities).foreach{q => out.print(q + "\t")}
out.println
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit
import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.ganglia.GangliaReporter
import info.ganglia.gmetric4j.gmetric.GMetric
import info.ganglia.gmetric4j.gmetric.GMetric.UDPAddressingMode

import org.apache.spark.SecurityManager
import org.apache.spark.metrics.MetricsSystem
Expand All @@ -33,10 +34,10 @@ class GangliaSink(val property: Properties, val registry: MetricRegistry,
val GANGLIA_DEFAULT_PERIOD = 10

val GANGLIA_KEY_UNIT = "unit"
val GANGLIA_DEFAULT_UNIT = TimeUnit.SECONDS
val GANGLIA_DEFAULT_UNIT: TimeUnit = TimeUnit.SECONDS

val GANGLIA_KEY_MODE = "mode"
val GANGLIA_DEFAULT_MODE = GMetric.UDPAddressingMode.MULTICAST
val GANGLIA_DEFAULT_MODE: UDPAddressingMode = GMetric.UDPAddressingMode.MULTICAST

// TTL for multicast messages. If listeners are X hops away in network, must be at least X.
val GANGLIA_KEY_TTL = "ttl"
Expand All @@ -45,7 +46,7 @@ class GangliaSink(val property: Properties, val registry: MetricRegistry,
val GANGLIA_KEY_HOST = "host"
val GANGLIA_KEY_PORT = "port"

def propertyToOption(prop: String) = Option(property.getProperty(prop))
def propertyToOption(prop: String): Option[String] = Option(property.getProperty(prop))

if (!propertyToOption(GANGLIA_KEY_HOST).isDefined) {
throw new Exception("Ganglia sink requires 'host' property.")
Expand All @@ -58,11 +59,12 @@ class GangliaSink(val property: Properties, val registry: MetricRegistry,
val host = propertyToOption(GANGLIA_KEY_HOST).get
val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt
val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL)
val mode = propertyToOption(GANGLIA_KEY_MODE)
val mode: UDPAddressingMode = propertyToOption(GANGLIA_KEY_MODE)
.map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE)
val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt)
.getOrElse(GANGLIA_DEFAULT_PERIOD)
val pollUnit = propertyToOption(GANGLIA_KEY_UNIT).map(u => TimeUnit.valueOf(u.toUpperCase))
val pollUnit: TimeUnit = propertyToOption(GANGLIA_KEY_UNIT)
.map(u => TimeUnit.valueOf(u.toUpperCase))
.getOrElse(GANGLIA_DEFAULT_UNIT)

MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
Expand Down
3 changes: 2 additions & 1 deletion graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -419,5 +419,6 @@ object Graph {
* All the convenience operations are defined in the [[GraphOps]] class which may be
* shared across multiple graph implementations.
*/
implicit def graphToGraphOps[VD: ClassTag, ED: ClassTag](g: Graph[VD, ED]) = g.ops
implicit def graphToGraphOps[VD: ClassTag, ED: ClassTag]
(g: Graph[VD, ED]): GraphOps[VD, ED] = g.ops
} // end of Graph object
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
override def mapReduceTriplets[A: ClassTag](
mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduceFunc: (A, A) => A,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) = {
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None): VertexRDD[A] = {

ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.graphx.PartitionStrategy._
*/
object Analytics extends Logging {

def main(args: Array[String]) = {
def main(args: Array[String]): Unit = {
val host = args(0)
val taskType = args(1)
val fname = args(2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ class StreamingContext private[streaming] (
* Stop the execution of the streams.
* @param stopSparkContext Stop the associated SparkContext or not
*/
def stop(stopSparkContext: Boolean = true) = synchronized {
def stop(stopSparkContext: Boolean = true): Unit = synchronized {
scheduler.stop()
logInfo("StreamingContext stopped successfully")
waiter.notifyStop()
Expand Down Expand Up @@ -489,7 +489,7 @@ object StreamingContext extends Logging {
* Find the JAR from which a given class was loaded, to make it easy for users to pass
* their JARs to StreamingContext.
*/
def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls)
def jarOfClass(cls: Class[_]): Seq[String] = SparkContext.jarOfClass(cls)

private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
// Set the default cleaner delay to an hour if not already set.
Expand Down
Loading