Skip to content

Commit 02b4a0d

Browse files
author
Stephen Haberman
committed
Merge branches 'subtract' and 'bettersplits' into bizo
* subtract: Add RDD.subtract. * bettersplits: Update more javadocs. Tweak test names. Remove fileServerSuite.txt. Update default.parallelism docs, have StandaloneSchedulerBackend use it. Change defaultPartitioner to use upstream split size. Conflicts: core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala core/src/test/scala/spark/ShuffleSuite.scala
3 parents 9d979fb + 924f47d + 4281e57 commit 02b4a0d

File tree

10 files changed

+236
-43
lines changed

10 files changed

+236
-43
lines changed

core/src/main/scala/spark/PairRDDFunctions.scala

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import spark.partial.BoundedDouble
2323
import spark.partial.PartialResult
2424
import spark.rdd._
2525
import spark.SparkContext._
26+
import spark.Partitioner._
2627

2728
/**
2829
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
@@ -248,8 +249,8 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
248249
}
249250

250251
/**
251-
* Simplified version of combineByKey that hash-partitions the resulting RDD using the default
252-
* parallelism level.
252+
* Simplified version of combineByKey that hash-partitions the resulting RDD using the
253+
* existing partitioner/parallelism level.
253254
*/
254255
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
255256
: RDD[(K, C)] = {
@@ -259,15 +260,16 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
259260
/**
260261
* Merge the values for each key using an associative reduce function. This will also perform
261262
* the merging locally on each mapper before sending results to a reducer, similarly to a
262-
* "combiner" in MapReduce. Output will be hash-partitioned with the default parallelism level.
263+
* "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
264+
* parallelism level.
263265
*/
264266
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
265267
reduceByKey(defaultPartitioner(self), func)
266268
}
267269

268270
/**
269271
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
270-
* resulting RDD with the default parallelism level.
272+
* resulting RDD with the existing partitioner/parallelism level.
271273
*/
272274
def groupByKey(): RDD[(K, Seq[V])] = {
273275
groupByKey(defaultPartitioner(self))
@@ -295,7 +297,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
295297
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
296298
* resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
297299
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
298-
* using the default level of parallelism.
300+
* using the existing partitioner/parallelism level.
299301
*/
300302
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = {
301303
leftOuterJoin(other, defaultPartitioner(self, other))
@@ -315,7 +317,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
315317
* Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
316318
* resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
317319
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
318-
* RDD using the default parallelism level.
320+
* RDD using the existing partitioner/parallelism level.
319321
*/
320322
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = {
321323
rightOuterJoin(other, defaultPartitioner(self, other))
@@ -438,17 +440,6 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
438440
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
439441
}
440442

441-
/**
442-
* Choose a partitioner to use for a cogroup-like operation between a number of RDDs. If any of
443-
* the RDDs already has a partitioner, choose that one, otherwise use a default HashPartitioner.
444-
*/
445-
def defaultPartitioner(rdds: RDD[_]*): Partitioner = {
446-
for (r <- rdds if r.partitioner != None) {
447-
return r.partitioner.get
448-
}
449-
return new HashPartitioner(self.context.defaultParallelism)
450-
}
451-
452443
/**
453444
* Return the list of values in the RDD for key `key`. This operation is done efficiently if the
454445
* RDD has a known partitioner by only searching the partition that the key maps to.

core/src/main/scala/spark/Partitioner.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,25 @@ abstract class Partitioner extends Serializable {
99
def getPartition(key: Any): Int
1010
}
1111

12+
object Partitioner {
13+
/**
14+
* Choose a partitioner to use for a cogroup-like operation between a number of RDDs. If any of
15+
* the RDDs already has a partitioner, choose that one, otherwise use a default HashPartitioner.
16+
*
17+
* The number of partitions will be the same as the number of partitions in the largest upstream
18+
* RDD, as this should be least likely to cause out-of-memory errors.
19+
*
20+
* We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
21+
*/
22+
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
23+
val bySize = (Seq(rdd) ++ others).sortBy(_.splits.size).reverse
24+
for (r <- bySize if r.partitioner != None) {
25+
return r.partitioner.get
26+
}
27+
return new HashPartitioner(bySize.head.splits.size)
28+
}
29+
}
30+
1231
/**
1332
* A [[spark.Partitioner]] that implements hash-based partitioning using Java's `Object.hashCode`.
1433
*

core/src/main/scala/spark/RDD.scala

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import org.apache.hadoop.mapred.TextOutputFormat
1616

1717
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
1818

19+
import spark.Partitioner._
1920
import spark.partial.BoundedDouble
2021
import spark.partial.CountEvaluator
2122
import spark.partial.GroupedCountEvaluator
@@ -30,6 +31,7 @@ import spark.rdd.MapPartitionsRDD
3031
import spark.rdd.MapPartitionsWithSplitRDD
3132
import spark.rdd.PipedRDD
3233
import spark.rdd.SampledRDD
34+
import spark.rdd.SubtractedRDD
3335
import spark.rdd.UnionRDD
3436
import spark.rdd.ZippedRDD
3537
import spark.storage.StorageLevel
@@ -299,19 +301,26 @@ abstract class RDD[T: ClassManifest](
299301
*/
300302
def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)
301303

304+
/**
305+
* Return an RDD of grouped items.
306+
*/
307+
def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] =
308+
groupBy[K](f, defaultPartitioner(this))
309+
302310
/**
303311
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
304312
* mapping to that key.
305313
*/
306-
def groupBy[K: ClassManifest](f: T => K, numSplits: Int): RDD[(K, Seq[T])] = {
307-
val cleanF = sc.clean(f)
308-
this.map(t => (cleanF(t), t)).groupByKey(numSplits)
309-
}
310-
314+
def groupBy[K: ClassManifest](f: T => K, numSplits: Int): RDD[(K, Seq[T])] =
315+
groupBy(f, new HashPartitioner(numSplits))
316+
311317
/**
312318
* Return an RDD of grouped items.
313319
*/
314-
def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] = groupBy[K](f, sc.defaultParallelism)
320+
def groupBy[K: ClassManifest](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = {
321+
val cleanF = sc.clean(f)
322+
this.map(t => (cleanF(t), t)).groupByKey(p)
323+
}
315324

316325
/**
317326
* Return an RDD created by piping elements to a forked external process.
@@ -383,6 +392,26 @@ abstract class RDD[T: ClassManifest](
383392
filter(f.isDefinedAt).map(f)
384393
}
385394

395+
/**
396+
* Return an RDD with the elements from `this` that are not in `other`.
397+
*
398+
* Uses `this` partitioner/split size, because even if `other` is huge, the resulting
399+
* RDD will be <= us.
400+
*/
401+
def subtract(other: RDD[T]): RDD[T] =
402+
subtract(other, partitioner.getOrElse(new HashPartitioner(splits.size)))
403+
404+
/**
405+
* Return an RDD with the elements from `this` that are not in `other`.
406+
*/
407+
def subtract(other: RDD[T], numSplits: Int): RDD[T] =
408+
subtract(other, new HashPartitioner(numSplits))
409+
410+
/**
411+
* Return an RDD with the elements from `this` that are not in `other`.
412+
*/
413+
def subtract(other: RDD[T], p: Partitioner): RDD[T] = new SubtractedRDD[T](this, other, p)
414+
386415
/**
387416
* Reduces the elements of this RDD using the specified commutative and associative binary operator.
388417
*/

core/src/main/scala/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -693,7 +693,7 @@ class SparkContext(
693693
checkpointDir = Some(dir)
694694
}
695695

696-
/** Default level of parallelism to use when not given by user (e.g. for reduce tasks) */
696+
/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
697697
def defaultParallelism: Int = taskScheduler.defaultParallelism
698698

699699
/** Default min number of splits for Hadoop RDDs when not given by user */

core/src/main/scala/spark/api/java/JavaPairRDD.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import spark.OrderedRDDFunctions
1919
import spark.storage.StorageLevel
2020
import spark.HashPartitioner
2121
import spark.Partitioner
22+
import spark.Partitioner._
2223
import spark.RDD
2324
import spark.SparkContext.rddToPairRDDFunctions
2425

@@ -220,30 +221,30 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
220221
fromRDD(rdd.rightOuterJoin(other, partitioner))
221222

222223
/**
223-
* Simplified version of combineByKey that hash-partitions the resulting RDD using the default
224-
* parallelism level.
224+
* Simplified version of combineByKey that hash-partitions the resulting RDD using the existing
225+
* partitioner/parallelism level.
225226
*/
226227
def combineByKey[C](createCombiner: JFunction[V, C],
227228
mergeValue: JFunction2[C, V, C],
228229
mergeCombiners: JFunction2[C, C, C]): JavaPairRDD[K, C] = {
229230
implicit val cm: ClassManifest[C] =
230231
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]]
231-
fromRDD(combineByKey(createCombiner, mergeValue, mergeCombiners))
232+
fromRDD(combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(rdd)))
232233
}
233234

234235
/**
235236
* Merge the values for each key using an associative reduce function. This will also perform
236237
* the merging locally on each mapper before sending results to a reducer, similarly to a
237-
* "combiner" in MapReduce. Output will be hash-partitioned with the default parallelism level.
238+
* "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
239+
* parallelism level.
238240
*/
239241
def reduceByKey(func: JFunction2[V, V, V]): JavaPairRDD[K, V] = {
240-
val partitioner = rdd.defaultPartitioner(rdd)
241-
fromRDD(reduceByKey(partitioner, func))
242+
fromRDD(reduceByKey(defaultPartitioner(rdd), func))
242243
}
243244

244245
/**
245246
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
246-
* resulting RDD with the default parallelism level.
247+
* resulting RDD with the existing partitioner/parallelism level.
247248
*/
248249
def groupByKey(): JavaPairRDD[K, JList[V]] =
249250
fromRDD(groupByResultToJava(rdd.groupByKey()))
@@ -268,7 +269,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
268269
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
269270
* resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
270271
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
271-
* using the default level of parallelism.
272+
* using the existing partitioner/parallelism level.
272273
*/
273274
def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Option[W])] =
274275
fromRDD(rdd.leftOuterJoin(other))
@@ -286,7 +287,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
286287
* Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
287288
* resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
288289
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
289-
* RDD using the default parallelism level.
290+
* RDD using the existing partitioner/parallelism level.
290291
*/
291292
def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Option[V], W)] =
292293
fromRDD(rdd.rightOuterJoin(other))
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package spark.rdd
2+
3+
import java.util.{HashSet => JHashSet}
4+
import scala.collection.JavaConversions._
5+
import spark.RDD
6+
import spark.Partitioner
7+
import spark.Dependency
8+
import spark.TaskContext
9+
import spark.Split
10+
import spark.SparkEnv
11+
import spark.ShuffleDependency
12+
import spark.OneToOneDependency
13+
14+
/**
15+
* An optimized version of cogroup for set difference/subtraction.
16+
*
17+
* It is possible to implement this operation with just `cogroup`, but
18+
* that is less efficient because all of the entries from `rdd2`, for
19+
* both matching and non-matching values in `rdd1`, are kept in the
20+
* JHashMap until the end.
21+
*
22+
* With this implementation, only the entries from `rdd1` are kept in-memory,
23+
* and the entries from `rdd2` are essentially streamed, as we only need to
24+
* touch each once to decide if the value needs to be removed.
25+
*
26+
* This is particularly helpful when `rdd1` is much smaller than `rdd2`, as
27+
* you can use `rdd1`'s partitioner/split size and not worry about running
28+
* out of memory because of the size of `rdd2`.
29+
*/
30+
private[spark] class SubtractedRDD[T: ClassManifest](
31+
@transient var rdd1: RDD[T],
32+
@transient var rdd2: RDD[T],
33+
part: Partitioner) extends RDD[T](rdd1.context, Nil) {
34+
35+
override def getDependencies: Seq[Dependency[_]] = {
36+
Seq(rdd1, rdd2).map { rdd =>
37+
if (rdd.partitioner == Some(part)) {
38+
logInfo("Adding one-to-one dependency with " + rdd)
39+
new OneToOneDependency(rdd)
40+
} else {
41+
logInfo("Adding shuffle dependency with " + rdd)
42+
val mapSideCombinedRDD = rdd.mapPartitions(i => {
43+
val set = new JHashSet[T]()
44+
while (i.hasNext) {
45+
set.add(i.next)
46+
}
47+
set.iterator
48+
}, true)
49+
// ShuffleDependency requires a tuple (k, v), which it will partition by k.
50+
// We need this to partition to map to the same place as the k for
51+
// OneToOneDependency, which means:
52+
// - for already-tupled RDD[(A, B)], into getPartition(a)
53+
// - for non-tupled RDD[C], into getPartition(c)
54+
val part2 = new Partitioner() {
55+
def numPartitions = part.numPartitions
56+
def getPartition(key: Any) = key match {
57+
case (k, v) => part.getPartition(k)
58+
case k => part.getPartition(k)
59+
}
60+
}
61+
new ShuffleDependency(mapSideCombinedRDD.map((_, null)), part2)
62+
}
63+
}
64+
}
65+
66+
override def getSplits: Array[Split] = {
67+
val array = new Array[Split](part.numPartitions)
68+
for (i <- 0 until array.size) {
69+
// Each CoGroupSplit will dependend on rdd1 and rdd2
70+
array(i) = new CoGroupSplit(i, Seq(rdd1, rdd2).zipWithIndex.map { case (rdd, j) =>
71+
dependencies(j) match {
72+
case s: ShuffleDependency[_, _] =>
73+
new ShuffleCoGroupSplitDep(s.shuffleId)
74+
case _ =>
75+
new NarrowCoGroupSplitDep(rdd, i, rdd.splits(i))
76+
}
77+
}.toList)
78+
}
79+
array
80+
}
81+
82+
override val partitioner = Some(part)
83+
84+
override def compute(s: Split, context: TaskContext): Iterator[T] = {
85+
val split = s.asInstanceOf[CoGroupSplit]
86+
val set = new JHashSet[T]
87+
def integrate(dep: CoGroupSplitDep, op: T => Unit) = dep match {
88+
case NarrowCoGroupSplitDep(rdd, _, itsSplit) =>
89+
for (k <- rdd.iterator(itsSplit, context))
90+
op(k.asInstanceOf[T])
91+
case ShuffleCoGroupSplitDep(shuffleId) =>
92+
for ((k, _) <- SparkEnv.get.shuffleFetcher.fetch(shuffleId, split.index))
93+
op(k.asInstanceOf[T])
94+
}
95+
// the first dep is rdd1; add all keys to the set
96+
integrate(split.deps(0), set.add)
97+
// the second dep is rdd2; remove all of its keys from the set
98+
integrate(split.deps(1), set.remove)
99+
set.iterator
100+
}
101+
102+
override def clearDependencies() {
103+
super.clearDependencies()
104+
rdd1 = null
105+
rdd2 = null
106+
}
107+
108+
}

core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,6 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
153153
driverActor ! ReviveOffers
154154
}
155155

156-
override def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2)
157-
158156
// Called by subclasses when notified of a lost worker
159157
def removeExecutor(executorId: String, reason: String) {
160158
try {
@@ -166,6 +164,9 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
166164
throw new SparkException("Error notifying standalone scheduler's driver actor", e)
167165
}
168166
}
167+
168+
override def defaultParallelism() = Option(System.getProperty("spark.default.parallelism"))
169+
.map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2))
169170
}
170171

171172
private[spark] object StandaloneSchedulerBackend {

core/src/test/scala/spark/PartitioningSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,10 @@ class PartitioningSuite extends FunSuite with LocalSparkContext {
8484
assert(grouped4.groupByKey(3).partitioner != grouped4.partitioner)
8585
assert(grouped4.groupByKey(4).partitioner === grouped4.partitioner)
8686

87-
assert(grouped2.join(grouped4).partitioner === grouped2.partitioner)
88-
assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped2.partitioner)
89-
assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped2.partitioner)
90-
assert(grouped2.cogroup(grouped4).partitioner === grouped2.partitioner)
87+
assert(grouped2.join(grouped4).partitioner === grouped4.partitioner)
88+
assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped4.partitioner)
89+
assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped4.partitioner)
90+
assert(grouped2.cogroup(grouped4).partitioner === grouped4.partitioner)
9191

9292
assert(grouped2.join(reduced2).partitioner === grouped2.partitioner)
9393
assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner)

0 commit comments

Comments
 (0)