Skip to content

SPARK 1084.1 (resubmitted) #31

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object Bagel extends Logging {

/**
* Runs a Bagel program.
* @param sc [[org.apache.spark.SparkContext]] to use for the program.
* @param sc org.apache.spark.SparkContext to use for the program.
* @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the
* Key will be the vertex id.
* @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often
Expand All @@ -38,10 +38,10 @@ object Bagel extends Logging {
* @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices
* after each superstep and provides the result to each vertex in the next
* superstep.
* @param partitioner [[org.apache.spark.Partitioner]] partitions values by key
* @param partitioner org.apache.spark.Partitioner partitions values by key
* @param numPartitions number of partitions across which to split the graph.
* Default is the default parallelism of the SparkContext
* @param storageLevel [[org.apache.spark.storage.StorageLevel]] to use for caching of
* @param storageLevel org.apache.spark.storage.StorageLevel to use for caching of
* intermediate RDDs in each superstep. Defaults to caching in memory.
* @param compute function that takes a Vertex, optional set of (possibly combined) messages to
* the Vertex, optional Aggregator and the current superstep,
Expand Down Expand Up @@ -131,7 +131,7 @@ object Bagel extends Logging {

/**
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default
* [[org.apache.spark.HashPartitioner]] and default storage level
* org.apache.spark.HashPartitioner and default storage level
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
Expand All @@ -146,7 +146,7 @@ object Bagel extends Logging {

/**
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the
* default [[org.apache.spark.HashPartitioner]]
* default org.apache.spark.HashPartitioner
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
Expand All @@ -166,7 +166,7 @@ object Bagel extends Logging {

/**
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
* default [[org.apache.spark.HashPartitioner]],
* default org.apache.spark.HashPartitioner,
* [[org.apache.spark.bagel.DefaultCombiner]] and the default storage level
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
Expand All @@ -180,7 +180,7 @@ object Bagel extends Logging {

/**
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
* the default [[org.apache.spark.HashPartitioner]]
* the default org.apache.spark.HashPartitioner
* and [[org.apache.spark.bagel.DefaultCombiner]]
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
Expand Down
4 changes: 2 additions & 2 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@
</goals>
<configuration>
<exportAntProperties>true</exportAntProperties>
<tasks>
<target>
<property name="spark.classpath" refid="maven.test.classpath" />
<property environment="env" />
<fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
Expand All @@ -238,7 +238,7 @@
</not>
</condition>
</fail>
</tasks>
</target>
</configuration>
</execution>
</executions>
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ class SparkContext(
* using the older MapReduce API (`org.apache.hadoop.mapred`).
*
* @param conf JobConf for setting up the dataset
* @param inputFormatClass Class of the [[InputFormat]]
* @param inputFormatClass Class of the InputFormat
* @param keyClass Class of the keys
* @param valueClass Class of the values
* @param minSplits Minimum number of Hadoop Splits to generate.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class JobLogger(val user: String, val logDirName: String)
/**
* Create a log file for one job
* @param jobID ID of the job
* @exception FileNotFoundException Fail to create log file
* @throws FileNotFoundException Fail to create log file
*/
protected def createLogWriter(jobID: Int) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import scala.util.control.{ControlThrowable, NonFatal}
import com.typesafe.config.Config

/**
* An [[akka.actor.ActorSystem]] which refuses to shut down in the event of a fatal exception.
* An akka.actor.ActorSystem which refuses to shut down in the event of a fatal exception
* This is necessary as Spark Executors are allowed to recover from fatal exceptions
* (see [[org.apache.spark.executor.Executor]]).
* (see org.apache.spark.executor.Executor)
*/
object IndestructibleActorSystem {
def apply(name: String, config: Config): ActorSystem =
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/util/StatCounter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package org.apache.spark.util

/**
* A class for tracking the statistics of a set of numbers (count, mean and variance) in a
* numerically robust way. Includes support for merging two StatCounters. Based on
* [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
* Welford and Chan's algorithms for running variance]].
* numerically robust way. Includes support for merging two StatCounters. Based on Welford
* and Chan's [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance algorithms]]
* for running variance.
*
* @constructor Initialize the StatCounter with the given values.
*/
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/Vector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ object Vector {

/**
* Creates this [[org.apache.spark.util.Vector]] of given length containing random numbers
* between 0.0 and 1.0. Optional [[scala.util.Random]] number generator can be provided.
* between 0.0 and 1.0. Optional scala.util.Random number generator can be provided.
*/
def random(length: Int, random: Random = new XORShiftRandom()) =
Vector(length, _ => random.nextDouble())
Expand Down
35 changes: 25 additions & 10 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ public int compare(Integer a, Integer b) {
else if (a < b) return 1;
else return 0;
}
};
}

@SuppressWarnings("unchecked")
@Test
public void sparkContextUnion() {
// Union of non-specialized JavaRDDs
Expand Down Expand Up @@ -148,6 +149,7 @@ public void call(String s) {
Assert.assertEquals(2, foreachCalls);
}

@SuppressWarnings("unchecked")
@Test
public void lookup() {
JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
Expand Down Expand Up @@ -179,6 +181,7 @@ public Boolean call(Integer x) {
Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
}

@SuppressWarnings("unchecked")
@Test
public void cogroup() {
JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
Expand All @@ -197,6 +200,7 @@ public void cogroup() {
cogrouped.collect();
}

@SuppressWarnings("unchecked")
@Test
public void leftOuterJoin() {
JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
Expand Down Expand Up @@ -243,6 +247,7 @@ public Integer call(Integer a, Integer b) {
Assert.assertEquals(33, sum);
}

@SuppressWarnings("unchecked")
@Test
public void foldByKey() {
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
Expand All @@ -265,6 +270,7 @@ public Integer call(Integer a, Integer b) {
Assert.assertEquals(3, sums.lookup(3).get(0).intValue());
}

@SuppressWarnings("unchecked")
@Test
public void reduceByKey() {
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
Expand Down Expand Up @@ -320,8 +326,8 @@ public void approximateResults() {
public void take() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
Assert.assertEquals(1, rdd.first().intValue());
List<Integer> firstTwo = rdd.take(2);
List<Integer> sample = rdd.takeSample(false, 2, 42);
rdd.take(2);
rdd.takeSample(false, 2, 42);
}

@Test
Expand Down Expand Up @@ -359,8 +365,8 @@ public Boolean call(Double x) {
Assert.assertEquals(2.49444, rdd.stdev(), 0.01);
Assert.assertEquals(2.73252, rdd.sampleStdev(), 0.01);

Double first = rdd.first();
List<Double> take = rdd.take(5);
rdd.first();
rdd.take(5);
}

@Test
Expand Down Expand Up @@ -438,11 +444,11 @@ public Iterable<Double> call(String s) {
return lengths;
}
});
Double x = doubles.first();
Assert.assertEquals(5.0, doubles.first().doubleValue(), 0.01);
Assert.assertEquals(5.0, doubles.first(), 0.01);
Assert.assertEquals(11, pairs.count());
}

@SuppressWarnings("unchecked")
@Test
public void mapsFromPairsToPairs() {
List<Tuple2<Integer, String>> pairs = Arrays.asList(
Expand Down Expand Up @@ -509,6 +515,7 @@ public void repartition() {
}
}

@SuppressWarnings("unchecked")
@Test
public void persist() {
JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
Expand Down Expand Up @@ -573,6 +580,7 @@ public void textFilesCompressed() throws IOException {
Assert.assertEquals(expected, readRDD.collect());
}

@SuppressWarnings("unchecked")
@Test
public void sequenceFile() {
File tempDir = Files.createTempDir();
Expand Down Expand Up @@ -602,6 +610,7 @@ public Tuple2<Integer, String> call(Tuple2<IntWritable, Text> pair) {
Assert.assertEquals(pairs, readRDD.collect());
}

@SuppressWarnings("unchecked")
@Test
public void writeWithNewAPIHadoopFile() {
File tempDir = Files.createTempDir();
Expand Down Expand Up @@ -632,6 +641,7 @@ public String call(Tuple2<IntWritable, Text> x) {
}).collect().toString());
}

@SuppressWarnings("unchecked")
@Test
public void readWithNewAPIHadoopFile() throws IOException {
File tempDir = Files.createTempDir();
Expand Down Expand Up @@ -674,6 +684,7 @@ public void objectFilesOfInts() {
Assert.assertEquals(expected, readRDD.collect());
}

@SuppressWarnings("unchecked")
@Test
public void objectFilesOfComplexTypes() {
File tempDir = Files.createTempDir();
Expand All @@ -690,6 +701,7 @@ public void objectFilesOfComplexTypes() {
Assert.assertEquals(pairs, readRDD.collect());
}

@SuppressWarnings("unchecked")
@Test
public void hadoopFile() {
File tempDir = Files.createTempDir();
Expand Down Expand Up @@ -719,6 +731,7 @@ public String call(Tuple2<IntWritable, Text> x) {
}).collect().toString());
}

@SuppressWarnings("unchecked")
@Test
public void hadoopFileCompressed() {
File tempDir = Files.createTempDir();
Expand Down Expand Up @@ -824,7 +837,7 @@ public Float zero(Float initialValue) {
}
};

final Accumulator<Float> floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam);
final Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam);
rdd.foreach(new VoidFunction<Integer>() {
public void call(Integer x) {
floatAccum.add((float) x);
Expand Down Expand Up @@ -876,6 +889,7 @@ public void checkpointAndRestore() {
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect());
}

@SuppressWarnings("unchecked")
@Test
public void mapOnPairRDD() {
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4));
Expand All @@ -900,6 +914,7 @@ public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) throws Excepti

}

@SuppressWarnings("unchecked")
@Test
public void collectPartitions() {
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
Expand Down Expand Up @@ -968,14 +983,14 @@ public void countApproxDistinctByKey() {
@Test
public void collectAsMapWithIntArrayValues() {
// Regression test for SPARK-1040
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[] { 1 }));
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
JavaPairRDD<Integer, int[]> pairRDD = rdd.map(new PairFunction<Integer, Integer, int[]>() {
@Override
public Tuple2<Integer, int[]> call(Integer x) throws Exception {
return new Tuple2<Integer, int[]>(x, new int[] { x });
}
});
pairRDD.collect(); // Works fine
Map<Integer, int[]> map = pairRDD.collectAsMap(); // Used to crash with ClassCastException
pairRDD.collectAsMap(); // Used to crash with ClassCastException
}
}
5 changes: 3 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -613,12 +613,13 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.5.1</version>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>UTF-8</encoding>
<maxmem>1024m</maxmem>
<fork>true</fork>
</configuration>
</plugin>
<plugin>
Expand All @@ -633,7 +634,7 @@
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>1.0-M2</version>
<version>1.0-RC2</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
Expand Down
4 changes: 2 additions & 2 deletions repl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
</goals>
<configuration>
<exportAntProperties>true</exportAntProperties>
<tasks>
<target>
<property name="spark.classpath" refid="maven.test.classpath" />
<property environment="env" />
<fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
Expand All @@ -111,7 +111,7 @@
</not>
</condition>
</fail>
</tasks>
</target>
</configuration>
</execution>
</executions>
Expand Down
Loading