Skip to content

Commit 8ca4ecb

Browse files
staplepwendell
authored andcommitted
[SPARK-546] Add full outer join to RDD and DStream.
leftOuterJoin and rightOuterJoin are already implemented. This patch adds fullOuterJoin. Author: Aaron Staple <aaron.staple@gmail.com> Closes #1395 from staple/SPARK-546 and squashes the following commits: 1f5595c [Aaron Staple] Fix python style 7ac0aa9 [Aaron Staple] [SPARK-546] Add full outer join to RDD and DStream. 3b5d137 [Aaron Staple] In JavaPairDStream, make class tag specification in rightOuterJoin consistent with other functions. 31f2956 [Aaron Staple] Fix left outer join documentation comments.
1 parent 74fb2ec commit 8ca4ecb

File tree

11 files changed

+250
-7
lines changed

11 files changed

+250
-7
lines changed

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
469469
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
470470
}
471471

472+
/**
473+
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
474+
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
475+
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
476+
* element (k, w) in `other`, the resulting RDD will either contain all pairs
477+
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
478+
* in `this` have key k. Uses the given Partitioner to partition the output RDD.
479+
*/
480+
def fullOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
481+
: JavaPairRDD[K, (Optional[V], Optional[W])] = {
482+
val joinResult = rdd.fullOuterJoin(other, partitioner)
483+
fromRDD(joinResult.mapValues{ case (v, w) =>
484+
(JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
485+
})
486+
}
487+
472488
/**
473489
* Simplified version of combineByKey that hash-partitions the resulting RDD using the existing
474490
* partitioner/parallelism level.
@@ -563,6 +579,38 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
563579
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
564580
}
565581

582+
/**
583+
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
584+
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
585+
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
586+
* element (k, w) in `other`, the resulting RDD will either contain all pairs
587+
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
588+
* in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/
589+
* parallelism level.
590+
*/
591+
def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], Optional[W])] = {
592+
val joinResult = rdd.fullOuterJoin(other)
593+
fromRDD(joinResult.mapValues{ case (v, w) =>
594+
(JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
595+
})
596+
}
597+
598+
/**
599+
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
600+
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
601+
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
602+
* element (k, w) in `other`, the resulting RDD will either contain all pairs
603+
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
604+
* in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions.
605+
*/
606+
def fullOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int)
607+
: JavaPairRDD[K, (Optional[V], Optional[W])] = {
608+
val joinResult = rdd.fullOuterJoin(other, numPartitions)
609+
fromRDD(joinResult.mapValues{ case (v, w) =>
610+
(JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
611+
})
612+
}
613+
566614
/**
567615
* Return the key-value pairs in this RDD to the master as a Map.
568616
*/

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,23 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
506506
}
507507
}
508508

509+
/**
510+
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
511+
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
512+
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
513+
* element (k, w) in `other`, the resulting RDD will either contain all pairs
514+
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
515+
* in `this` have key k. Uses the given Partitioner to partition the output RDD.
516+
*/
517+
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
518+
: RDD[(K, (Option[V], Option[W]))] = {
519+
this.cogroup(other, partitioner).flatMapValues {
520+
case (vs, Seq()) => vs.map(v => (Some(v), None))
521+
case (Seq(), ws) => ws.map(w => (None, Some(w)))
522+
case (vs, ws) => for (v <- vs; w <- ws) yield (Some(v), Some(w))
523+
}
524+
}
525+
509526
/**
510527
* Simplified version of combineByKey that hash-partitions the resulting RDD using the
511528
* existing partitioner/parallelism level.
@@ -585,6 +602,31 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
585602
rightOuterJoin(other, new HashPartitioner(numPartitions))
586603
}
587604

605+
/**
606+
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
607+
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
608+
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
609+
* element (k, w) in `other`, the resulting RDD will either contain all pairs
610+
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
611+
* in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/
612+
* parallelism level.
613+
*/
614+
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = {
615+
fullOuterJoin(other, defaultPartitioner(self, other))
616+
}
617+
618+
/**
619+
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
620+
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
621+
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
622+
* element (k, w) in `other`, the resulting RDD will either contain all pairs
623+
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
624+
* in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions.
625+
*/
626+
def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = {
627+
fullOuterJoin(other, new HashPartitioner(numPartitions))
628+
}
629+
588630
/**
589631
* Return the key-value pairs in this RDD to the master as a Map.
590632
*

core/src/test/scala/org/apache/spark/PartitioningSuite.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,11 +193,13 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
193193
assert(grouped2.join(grouped4).partitioner === grouped4.partitioner)
194194
assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped4.partitioner)
195195
assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped4.partitioner)
196+
assert(grouped2.fullOuterJoin(grouped4).partitioner === grouped4.partitioner)
196197
assert(grouped2.cogroup(grouped4).partitioner === grouped4.partitioner)
197198

198199
assert(grouped2.join(reduced2).partitioner === grouped2.partitioner)
199200
assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner)
200201
assert(grouped2.rightOuterJoin(reduced2).partitioner === grouped2.partitioner)
202+
assert(grouped2.fullOuterJoin(reduced2).partitioner === grouped2.partitioner)
201203
assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner)
202204

203205
assert(grouped2.map(_ => 1).partitioner === None)
@@ -218,6 +220,7 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
218220
assert(intercept[SparkException]{ arrPairs.join(arrPairs) }.getMessage.contains("array"))
219221
assert(intercept[SparkException]{ arrPairs.leftOuterJoin(arrPairs) }.getMessage.contains("array"))
220222
assert(intercept[SparkException]{ arrPairs.rightOuterJoin(arrPairs) }.getMessage.contains("array"))
223+
assert(intercept[SparkException]{ arrPairs.fullOuterJoin(arrPairs) }.getMessage.contains("array"))
221224
assert(intercept[SparkException]{ arrPairs.groupByKey() }.getMessage.contains("array"))
222225
assert(intercept[SparkException]{ arrPairs.countByKey() }.getMessage.contains("array"))
223226
assert(intercept[SparkException]{ arrPairs.countByKeyApprox(1) }.getMessage.contains("array"))

core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,21 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
298298
))
299299
}
300300

301+
test("fullOuterJoin") {
302+
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
303+
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
304+
val joined = rdd1.fullOuterJoin(rdd2).collect()
305+
assert(joined.size === 6)
306+
assert(joined.toSet === Set(
307+
(1, (Some(1), Some('x'))),
308+
(1, (Some(2), Some('x'))),
309+
(2, (Some(1), Some('y'))),
310+
(2, (Some(1), Some('z'))),
311+
(3, (Some(1), None)),
312+
(4, (None, Some('w')))
313+
))
314+
}
315+
301316
test("join with no matches") {
302317
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
303318
val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w')))

core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
193193
assert(rdd.join(emptyKv).collect().size === 0)
194194
assert(rdd.rightOuterJoin(emptyKv).collect().size === 0)
195195
assert(rdd.leftOuterJoin(emptyKv).collect().size === 2)
196+
assert(rdd.fullOuterJoin(emptyKv).collect().size === 2)
196197
assert(rdd.cogroup(emptyKv).collect().size === 2)
197198
assert(rdd.union(emptyKv).collect().size === 2)
198199
}

docs/programming-guide.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -906,7 +906,7 @@ for details.
906906
<tr>
907907
<td> <b>join</b>(<i>otherDataset</i>, [<i>numTasks</i>]) </td>
908908
<td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.
909-
Outer joins are also supported through <code>leftOuterJoin</code> and <code>rightOuterJoin</code>.
909+
Outer joins are supported through <code>leftOuterJoin</code>, <code>rightOuterJoin</code>, and <code>fullOuterJoin</code>.
910910
</td>
911911
</tr>
912912
<tr>

python/pyspark/join.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,22 @@ def dispatch(seq):
8080
return _do_python_join(rdd, other, numPartitions, dispatch)
8181

8282

83+
def python_full_outer_join(rdd, other, numPartitions):
84+
def dispatch(seq):
85+
vbuf, wbuf = [], []
86+
for (n, v) in seq:
87+
if n == 1:
88+
vbuf.append(v)
89+
elif n == 2:
90+
wbuf.append(v)
91+
if not vbuf:
92+
vbuf.append(None)
93+
if not wbuf:
94+
wbuf.append(None)
95+
return [(v, w) for v in vbuf for w in wbuf]
96+
return _do_python_join(rdd, other, numPartitions, dispatch)
97+
98+
8399
def python_cogroup(rdds, numPartitions):
84100
def make_mapper(i):
85101
return lambda (k, v): (k, (i, v))

python/pyspark/rdd.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
3737
PickleSerializer, pack_long, AutoBatchedSerializer
3838
from pyspark.join import python_join, python_left_outer_join, \
39-
python_right_outer_join, python_cogroup
39+
python_right_outer_join, python_full_outer_join, python_cogroup
4040
from pyspark.statcounter import StatCounter
4141
from pyspark.rddsampler import RDDSampler, RDDStratifiedSampler
4242
from pyspark.storagelevel import StorageLevel
@@ -1375,7 +1375,7 @@ def leftOuterJoin(self, other, numPartitions=None):
13751375
13761376
For each element (k, v) in C{self}, the resulting RDD will either
13771377
contain all pairs (k, (v, w)) for w in C{other}, or the pair
1378-
(k, (v, None)) if no elements in other have key k.
1378+
(k, (v, None)) if no elements in C{other} have key k.
13791379
13801380
Hash-partitions the resulting RDD into the given number of partitions.
13811381
@@ -1403,6 +1403,27 @@ def rightOuterJoin(self, other, numPartitions=None):
14031403
"""
14041404
return python_right_outer_join(self, other, numPartitions)
14051405

1406+
def fullOuterJoin(self, other, numPartitions=None):
1407+
"""
1408+
Perform a right outer join of C{self} and C{other}.
1409+
1410+
For each element (k, v) in C{self}, the resulting RDD will either
1411+
contain all pairs (k, (v, w)) for w in C{other}, or the pair
1412+
(k, (v, None)) if no elements in C{other} have key k.
1413+
1414+
Similarly, for each element (k, w) in C{other}, the resulting RDD will
1415+
either contain all pairs (k, (v, w)) for v in C{self}, or the pair
1416+
(k, (None, w)) if no elements in C{self} have key k.
1417+
1418+
Hash-partitions the resulting RDD into the given number of partitions.
1419+
1420+
>>> x = sc.parallelize([("a", 1), ("b", 4)])
1421+
>>> y = sc.parallelize([("a", 2), ("c", 8)])
1422+
>>> sorted(x.fullOuterJoin(y).collect())
1423+
[('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]
1424+
"""
1425+
return python_full_outer_join(self, other, numPartitions)
1426+
14061427
# TODO: add option to control map-side combining
14071428
# portable_hash is used as default, because builtin hash of None is different
14081429
# cross machines.

streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -606,8 +606,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
606606
}
607607

608608
/**
609-
* Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
610-
* The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD.
609+
* Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
610+
* `other` DStream. The supplied org.apache.spark.Partitioner is used to control
611+
* the partitioning of each RDD.
611612
*/
612613
def leftOuterJoin[W](
613614
other: JavaPairDStream[K, W],
@@ -624,8 +625,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
624625
* number of partitions.
625626
*/
626627
def rightOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (Optional[V], W)] = {
627-
implicit val cm: ClassTag[W] =
628-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
628+
implicit val cm: ClassTag[W] = fakeClassTag
629629
val joinResult = dstream.rightOuterJoin(other.dstream)
630630
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
631631
}
@@ -658,6 +658,52 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
658658
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
659659
}
660660

661+
/**
662+
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
663+
* `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
664+
* number of partitions.
665+
*/
666+
def fullOuterJoin[W](other: JavaPairDStream[K, W])
667+
: JavaPairDStream[K, (Optional[V], Optional[W])] = {
668+
implicit val cm: ClassTag[W] = fakeClassTag
669+
val joinResult = dstream.fullOuterJoin(other.dstream)
670+
joinResult.mapValues{ case (v, w) =>
671+
(JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
672+
}
673+
}
674+
675+
/**
676+
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
677+
* `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
678+
* partitions.
679+
*/
680+
def fullOuterJoin[W](
681+
other: JavaPairDStream[K, W],
682+
numPartitions: Int
683+
): JavaPairDStream[K, (Optional[V], Optional[W])] = {
684+
implicit val cm: ClassTag[W] = fakeClassTag
685+
val joinResult = dstream.fullOuterJoin(other.dstream, numPartitions)
686+
joinResult.mapValues{ case (v, w) =>
687+
(JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
688+
}
689+
}
690+
691+
/**
692+
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
693+
* `other` DStream. The supplied org.apache.spark.Partitioner is used to control
694+
* the partitioning of each RDD.
695+
*/
696+
def fullOuterJoin[W](
697+
other: JavaPairDStream[K, W],
698+
partitioner: Partitioner
699+
): JavaPairDStream[K, (Optional[V], Optional[W])] = {
700+
implicit val cm: ClassTag[W] = fakeClassTag
701+
val joinResult = dstream.fullOuterJoin(other.dstream, partitioner)
702+
joinResult.mapValues{ case (v, w) =>
703+
(JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
704+
}
705+
}
706+
661707
/**
662708
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
663709
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".

streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,42 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
568568
)
569569
}
570570

571+
/**
572+
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
573+
* `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
574+
* number of partitions.
575+
*/
576+
def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))] = {
577+
fullOuterJoin[W](other, defaultPartitioner())
578+
}
579+
580+
/**
581+
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
582+
* `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
583+
* partitions.
584+
*/
585+
def fullOuterJoin[W: ClassTag](
586+
other: DStream[(K, W)],
587+
numPartitions: Int
588+
): DStream[(K, (Option[V], Option[W]))] = {
589+
fullOuterJoin[W](other, defaultPartitioner(numPartitions))
590+
}
591+
592+
/**
593+
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
594+
* `other` DStream. The supplied org.apache.spark.Partitioner is used to control
595+
* the partitioning of each RDD.
596+
*/
597+
def fullOuterJoin[W: ClassTag](
598+
other: DStream[(K, W)],
599+
partitioner: Partitioner
600+
): DStream[(K, (Option[V], Option[W]))] = {
601+
self.transformWith(
602+
other,
603+
(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.fullOuterJoin(rdd2, partitioner)
604+
)
605+
}
606+
571607
/**
572608
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval
573609
* is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"

streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,21 @@ class BasicOperationsSuite extends TestSuiteBase {
303303
testOperation(inputData1, inputData2, operation, outputData, true)
304304
}
305305

306+
test("fullOuterJoin") {
307+
val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() )
308+
val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") )
309+
val outputData = Seq(
310+
Seq( ("a", (Some(1), Some("x"))), ("b", (Some(1), Some("x"))) ),
311+
Seq( ("", (Some(1), Some("x"))), ("a", (Some(1), None)), ("b", (None, Some("x"))) ),
312+
Seq( ("", (Some(1), None)) ),
313+
Seq( ("", (None, Some("x"))) )
314+
)
315+
val operation = (s1: DStream[String], s2: DStream[String]) => {
316+
s1.map(x => (x, 1)).fullOuterJoin(s2.map(x => (x, "x")))
317+
}
318+
testOperation(inputData1, inputData2, operation, outputData, true)
319+
}
320+
306321
test("updateStateByKey") {
307322
val inputData =
308323
Seq(

0 commit comments

Comments
 (0)