Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into profiler
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Sep 27, 2014
2 parents 858e74c + f872e4f commit e68df5a
Show file tree
Hide file tree
Showing 60 changed files with 931 additions and 184 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ conf/*.cmd
conf/*.properties
conf/*.conf
conf/*.xml
conf/slaves
docs/_site
docs/api
target/
Expand Down
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ log4j.properties
log4j.properties.template
metrics.properties.template
slaves
slaves.template
spark-env.sh
spark-env.cmd
spark-env.sh.template
Expand Down
File renamed without changes.
8 changes: 7 additions & 1 deletion core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.spark

import java.io._
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

import scala.collection.mutable.{HashSet, HashMap, Map}
import scala.concurrent.Await
import scala.collection.JavaConversions._

import akka.actor._
import akka.pattern.ask
Expand Down Expand Up @@ -84,6 +86,9 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
* On the master, it serves as the source of map outputs recorded from ShuffleMapTasks.
* On the workers, it simply serves as a cache, in which a miss triggers a fetch from the
* master's corresponding HashMap.
*
* Note: because mapStatuses is accessed concurrently, subclasses should make sure it's a
* thread-safe map.
*/
protected val mapStatuses: Map[Int, Array[MapStatus]]

Expand Down Expand Up @@ -339,7 +344,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
* MapOutputTrackerMaster.
*/
private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
protected val mapStatuses = new HashMap[Int, Array[MapStatus]]
protected val mapStatuses: Map[Int, Array[MapStatus]] =
new ConcurrentHashMap[Int, Array[MapStatus]]
}

private[spark] object MapOutputTracker {
Expand Down
48 changes: 48 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
}

/**
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
* element (k, w) in `other`, the resulting RDD will either contain all pairs
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
* in `this` have key k. Uses the given Partitioner to partition the output RDD.
*/
def fullOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
: JavaPairRDD[K, (Optional[V], Optional[W])] = {
val joinResult = rdd.fullOuterJoin(other, partitioner)
fromRDD(joinResult.mapValues{ case (v, w) =>
(JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
})
}

/**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the existing
* partitioner/parallelism level.
Expand Down Expand Up @@ -563,6 +579,38 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
}

/**
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
* element (k, w) in `other`, the resulting RDD will either contain all pairs
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
* in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/
* parallelism level.
*/
def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], Optional[W])] = {
val joinResult = rdd.fullOuterJoin(other)
fromRDD(joinResult.mapValues{ case (v, w) =>
(JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
})
}

/**
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
* element (k, w) in `other`, the resulting RDD will either contain all pairs
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
* in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions.
*/
def fullOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int)
: JavaPairRDD[K, (Optional[V], Optional[W])] = {
val joinResult = rdd.fullOuterJoin(other, numPartitions)
fromRDD(joinResult.mapValues{ case (v, w) =>
(JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
})
}

/**
* Return the key-value pairs in this RDD to the master as a Map.
*/
Expand Down
42 changes: 42 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,23 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
}

/**
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
* element (k, w) in `other`, the resulting RDD will either contain all pairs
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
* in `this` have key k. Uses the given Partitioner to partition the output RDD.
*/
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Option[V], Option[W]))] = {
this.cogroup(other, partitioner).flatMapValues {
case (vs, Seq()) => vs.map(v => (Some(v), None))
case (Seq(), ws) => ws.map(w => (None, Some(w)))
case (vs, ws) => for (v <- vs; w <- ws) yield (Some(v), Some(w))
}
}

/**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the
* existing partitioner/parallelism level.
Expand Down Expand Up @@ -585,6 +602,31 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
rightOuterJoin(other, new HashPartitioner(numPartitions))
}

/**
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
* element (k, w) in `other`, the resulting RDD will either contain all pairs
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
* in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/
* parallelism level.
*/
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = {
fullOuterJoin(other, defaultPartitioner(self, other))
}

/**
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
* element (k, w) in `other`, the resulting RDD will either contain all pairs
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
* in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions.
*/
def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = {
fullOuterJoin(other, new HashPartitioner(numPartitions))
}

/**
* Return the key-value pairs in this RDD to the master as a Map.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,13 @@ private[spark] class ShuffleMapTask(
return writer.stop(success = true).get
} catch {
case e: Exception =>
if (writer != null) {
writer.stop(success = false)
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ final class ShuffleBlockFetcherIterator(
}

override def onBlockFetchFailure(e: Throwable): Unit = {
logError("Failed to get block(s) from ${req.address.host}:${req.address.port}", e)
logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e)
// Note that there is a chance that some blocks have been fetched successfully, but we
// still add them to the failed queue. This is fine because when the caller see a
// FetchFailedException, it is going to fail the entire task anyway.
Expand Down
3 changes: 3 additions & 0 deletions core/src/test/scala/org/apache/spark/PartitioningSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,13 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
assert(grouped2.join(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.fullOuterJoin(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.cogroup(grouped4).partitioner === grouped4.partitioner)

assert(grouped2.join(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.rightOuterJoin(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.fullOuterJoin(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner)

assert(grouped2.map(_ => 1).partitioner === None)
Expand All @@ -218,6 +220,7 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
assert(intercept[SparkException]{ arrPairs.join(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.leftOuterJoin(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.rightOuterJoin(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.fullOuterJoin(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.groupByKey() }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.countByKey() }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.countByKeyApprox(1) }.getMessage.contains("array"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,21 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
))
}

test("fullOuterJoin") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.fullOuterJoin(rdd2).collect()
assert(joined.size === 6)
assert(joined.toSet === Set(
(1, (Some(1), Some('x'))),
(1, (Some(2), Some('x'))),
(2, (Some(1), Some('y'))),
(2, (Some(1), Some('z'))),
(3, (Some(1), None)),
(4, (None, Some('w')))
))
}

test("join with no matches") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w')))
Expand Down
1 change: 1 addition & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(rdd.join(emptyKv).collect().size === 0)
assert(rdd.rightOuterJoin(emptyKv).collect().size === 0)
assert(rdd.leftOuterJoin(emptyKv).collect().size === 2)
assert(rdd.fullOuterJoin(emptyKv).collect().size === 2)
assert(rdd.cogroup(emptyKv).collect().size === 2)
assert(rdd.union(emptyKv).collect().size === 2)
}
Expand Down
4 changes: 2 additions & 2 deletions dev/run-tests-jenkins
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ function post_message () {
merge_note=" * This patch merges cleanly."

source_files=$(
git diff master --name-only \
git diff master... --name-only `# diff patch against master from branch point` \
| grep -v -e "\/test" `# ignore files in test directories` \
| grep -e "\.py$" -e "\.java$" -e "\.scala$" `# include only code files` \
| tr "\n" " "
)
new_public_classes=$(
git diff master ${source_files} `# diff this patch against master and...` \
git diff master... ${source_files} `# diff patch against master from branch point` \
| grep "^\+" `# filter in only added lines` \
| sed -r -e "s/^\+//g" `# remove the leading +` \
| grep -e "trait " -e "class " `# filter in lines with these key words` \
Expand Down
3 changes: 2 additions & 1 deletion docs/mllib-clustering.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import org.apache.spark.mllib.linalg.Vectors

// Load and parse the data
val data = sc.textFile("data/mllib/kmeans_data.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble)))
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()

// Cluster the data into two classes using KMeans
val numClusters = 2
Expand Down Expand Up @@ -100,6 +100,7 @@ public class KMeansExample {
}
}
);
parsedData.cache();

// Cluster the data into two classes using KMeans
int numClusters = 2;
Expand Down
15 changes: 15 additions & 0 deletions docs/mllib-feature-extraction.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,21 @@ tf.cache()
val idf = new IDF().fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)
{% endhighlight %}

MLLib's IDF implementation provides an option for ignoring terms which occur in less than a
minimum number of documents. In such cases, the IDF for these terms is set to 0. This feature
can be used by passing the `minDocFreq` value to the IDF constructor.

{% highlight scala %}
import org.apache.spark.mllib.feature.IDF

// ... continue from the previous example
tf.cache()
val idf = new IDF(minDocFreq = 2).fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)
{% endhighlight %}


</div>
</div>

Expand Down
9 changes: 5 additions & 4 deletions docs/mllib-linear-methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ val data = sc.textFile("data/mllib/ridge-data/lpsa.data")
val parsedData = data.map { line =>
val parts = line.split(',')
LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
}
}.cache()

// Building the model
val numIterations = 100
Expand Down Expand Up @@ -455,6 +455,7 @@ public class LinearRegression {
}
}
);
parsedData.cache();

// Building the model
int numIterations = 100;
Expand All @@ -470,7 +471,7 @@ public class LinearRegression {
}
}
);
JavaRDD<Object> MSE = new JavaDoubleRDD(valuesAndPreds.map(
double MSE = new JavaDoubleRDD(valuesAndPreds.map(
new Function<Tuple2<Double, Double>, Object>() {
public Object call(Tuple2<Double, Double> pair) {
return Math.pow(pair._1() - pair._2(), 2.0);
Expand Down Expand Up @@ -553,8 +554,8 @@ but in practice you will likely want to use unlabeled vectors for test data.

{% highlight scala %}

val trainingData = ssc.textFileStream('/training/data/dir').map(LabeledPoint.parse)
val testData = ssc.textFileStream('/testing/data/dir').map(LabeledPoint.parse)
val trainingData = ssc.textFileStream("/training/data/dir").map(LabeledPoint.parse).cache()
val testData = ssc.textFileStream("/testing/data/dir").map(LabeledPoint.parse)

{% endhighlight %}

Expand Down
1 change: 1 addition & 0 deletions docs/mllib-optimization.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.classification.LogisticRegressionModel
import org.apache.spark.mllib.optimization.{LBFGS, LogisticGradient, SquaredL2Updater}

val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val numFeatures = data.take(1)(0).features.size
Expand Down
2 changes: 1 addition & 1 deletion docs/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ for details.
<tr>
<td> <b>join</b>(<i>otherDataset</i>, [<i>numTasks</i>]) </td>
<td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.
Outer joins are also supported through <code>leftOuterJoin</code> and <code>rightOuterJoin</code>.
Outer joins are supported through <code>leftOuterJoin</code>, <code>rightOuterJoin</code>, and <code>fullOuterJoin</code>.
</td>
</tr>
<tr>
Expand Down
7 changes: 6 additions & 1 deletion docs/spark-standalone.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,12 @@ Finally, the following configuration options can be passed to the master and wor

# Cluster Launch Scripts

To launch a Spark standalone cluster with the launch scripts, you need to create a file called `conf/slaves` in your Spark directory, which should contain the hostnames of all the machines where you would like to start Spark workers, one per line. The master machine must be able to access each of the slave machines via password-less `ssh` (using a private key). For testing, you can just put `localhost` in this file.
To launch a Spark standalone cluster with the launch scripts, you should create a file called conf/slaves in your Spark directory,
which must contain the hostnames of all the machines where you intend to start Spark workers, one per line.
If conf/slaves does not exist, the launch scripts defaults to a single machine (localhost), which is useful for testing.
Note, the master machine accesses each of the worker machines via ssh. By default, ssh is run in parallel and requires password-less (using a private key) access to be setup.
If you do not have a password-less setup, you can set the environment variable SPARK_SSH_FOREGROUND and serially provide a password for each worker.


Once you've set up this file, you can launch or stop your cluster with the following shell scripts, based on Hadoop's deploy scripts, and available in `SPARK_HOME/bin`:

Expand Down
4 changes: 2 additions & 2 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from boto.ec2.blockdevicemapping import BlockDeviceMapping, BlockDeviceType, EBSBlockDeviceType
from boto import ec2

DEFAULT_SPARK_VERSION = "1.0.0"
DEFAULT_SPARK_VERSION = "1.1.0"

# A URL prefix from which to fetch AMI information
AMI_PREFIX = "https://raw.github.com/mesos/spark-ec2/v2/ami-list"
Expand Down Expand Up @@ -218,7 +218,7 @@ def is_active(instance):
def get_spark_shark_version(opts):
spark_shark_map = {
"0.7.3": "0.7.1", "0.8.0": "0.8.0", "0.8.1": "0.8.1", "0.9.0": "0.9.0", "0.9.1": "0.9.1",
"1.0.0": "1.0.0"
"1.0.0": "1.0.0", "1.0.1": "1.0.1", "1.0.2": "1.0.2", "1.1.0": "1.1.0"
}
version = opts.spark_version.replace("v", "")
if version not in spark_shark_map:
Expand Down
Loading

0 comments on commit e68df5a

Please sign in to comment.