Skip to content

Commit c233ab3

Browse files
zsxwingJoshRosen
authored andcommitted
[SPARK-4818][Core] Add 'iterator' to reduce memory consumed by join
In Scala, `map` and `flatMap` of `Iterable` will copy the contents of `Iterable` to a new `Seq`. Such as, ```Scala val iterable = Seq(1, 2, 3).map(v => { println(v) v }) println("Iterable map done") val iterator = Seq(1, 2, 3).iterator.map(v => { println(v) v }) println("Iterator map done") ``` outputed ``` 1 2 3 Iterable map done Iterator map done ``` So we should use 'iterator' to reduce memory consumed by join. Found by Johannes Simon in http://mail-archives.apache.org/mod_mbox/spark-user/201412.mbox/%3C5BE70814-9D03-4F61-AE2C-0D63F2DE4446%40mail.de%3E Author: zsxwing <zsxwing@gmail.com> Closes #3671 from zsxwing/SPARK-4824 and squashes the following commits: 48ee7b9 [zsxwing] Remove the explicit types 95d59d6 [zsxwing] Add 'iterator' to reduce memory consumed by join
1 parent de9d7d2 commit c233ab3

File tree

1 file changed

+8
-8
lines changed

1 file changed

+8
-8
lines changed

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
483483
*/
484484
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
485485
this.cogroup(other, partitioner).flatMapValues( pair =>
486-
for (v <- pair._1; w <- pair._2) yield (v, w)
486+
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
487487
)
488488
}
489489

@@ -496,9 +496,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
496496
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
497497
this.cogroup(other, partitioner).flatMapValues { pair =>
498498
if (pair._2.isEmpty) {
499-
pair._1.map(v => (v, None))
499+
pair._1.iterator.map(v => (v, None))
500500
} else {
501-
for (v <- pair._1; w <- pair._2) yield (v, Some(w))
501+
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))
502502
}
503503
}
504504
}
@@ -513,9 +513,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
513513
: RDD[(K, (Option[V], W))] = {
514514
this.cogroup(other, partitioner).flatMapValues { pair =>
515515
if (pair._1.isEmpty) {
516-
pair._2.map(w => (None, w))
516+
pair._2.iterator.map(w => (None, w))
517517
} else {
518-
for (v <- pair._1; w <- pair._2) yield (Some(v), w)
518+
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w)
519519
}
520520
}
521521
}
@@ -531,9 +531,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
531531
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
532532
: RDD[(K, (Option[V], Option[W]))] = {
533533
this.cogroup(other, partitioner).flatMapValues {
534-
case (vs, Seq()) => vs.map(v => (Some(v), None))
535-
case (Seq(), ws) => ws.map(w => (None, Some(w)))
536-
case (vs, ws) => for (v <- vs; w <- ws) yield (Some(v), Some(w))
534+
case (vs, Seq()) => vs.iterator.map(v => (Some(v), None))
535+
case (Seq(), ws) => ws.iterator.map(w => (None, Some(w)))
536+
case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), Some(w))
537537
}
538538
}
539539

0 commit comments

Comments
 (0)