Skip to content

Commit ba54e34

Browse files
shivaramDavies Liu
authored andcommitted
Merge pull request #238 from sun-rui/SPARKR-154_4
[SPARKR-154] Phase 3: implement intersection().
1 parent c9497a3 commit ba54e34

File tree

5 files changed

+97
-1
lines changed

5 files changed

+97
-1
lines changed

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ exportMethods(
2828
"fullOuterJoin",
2929
"glom",
3030
"groupByKey",
31+
"intersection",
3132
"join",
3233
"keyBy",
3334
"keys",

R/pkg/R/RDD.R

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1560,3 +1560,40 @@ setMethod("zipRDD",
15601560

15611561
PipelinedRDD(zippedRDD, partitionFunc)
15621562
})
1563+
1564+
#' Intersection of this RDD and another one.
1565+
#'
1566+
#' Return the intersection of this RDD and another one.
1567+
#' The output will not contain any duplicate elements,
1568+
#' even if the input RDDs did. Performs a hash partition
1569+
#' across the cluster.
1570+
#' Note that this method performs a shuffle internally.
1571+
#'
1572+
#' @param x An RDD.
1573+
#' @param other An RDD.
1574+
#' @param numPartitions The number of partitions in the result RDD.
1575+
#' @return An RDD which is the intersection of these two RDDs.
1576+
#' @examples
1577+
#'\dontrun{
1578+
#' sc <- sparkR.init()
1579+
#' rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
1580+
#' rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
1581+
#' collect(sortBy(intersection(rdd1, rdd2), function(x) { x }))
1582+
#' # list(1, 2, 3)
1583+
#'}
1584+
#' @rdname intersection
1585+
#' @aliases intersection,RDD
1586+
setMethod("intersection",
1587+
signature(x = "RDD", other = "RDD"),
1588+
function(x, other, numPartitions = SparkR::numPartitions(x)) {
1589+
rdd1 <- map(x, function(v) { list(v, NA) })
1590+
rdd2 <- map(other, function(v) { list(v, NA) })
1591+
1592+
filterFunction <- function(elem) {
1593+
iters <- elem[[2]]
1594+
all(as.vector(
1595+
lapply(iters, function(iter) { length(iter) > 0 }), mode = "logical"))
1596+
}
1597+
1598+
keys(filterRDD(cogroup(rdd1, rdd2, numPartitions = numPartitions), filterFunction))
1599+
})

R/pkg/R/generics.R

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,6 @@ setGeneric("zipWithUniqueId", function(x) { standardGeneric("zipWithUniqueId") }
233233

234234

235235
############ Binary Functions #############
236-
237236
#' @rdname countByKey
238237
#' @export
239238
setGeneric("countByKey", function(x) { standardGeneric("countByKey") })
@@ -242,6 +241,11 @@ setGeneric("countByKey", function(x) { standardGeneric("countByKey") })
242241
#' @export
243242
setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues") })
244243

244+
#' @rdname intersection
245+
#' @export
246+
setGeneric("intersection", function(x, other, numPartitions = 1L) {
247+
standardGeneric("intersection") })
248+
245249
#' @rdname keys
246250
#' @export
247251
setGeneric("keys", function(x) { standardGeneric("keys") })

R/pkg/inst/tests/test_rdd.R

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,22 @@ test_that("zipRDD() on RDDs", {
468468
unlink(fileName)
469469
})
470470

471+
test_that("intersection() on RDDs", {
472+
# intersection with self
473+
actual <- collect(intersection(rdd, rdd))
474+
expect_equal(sort(as.integer(actual)), nums)
475+
476+
# intersection with an empty RDD
477+
emptyRdd <- parallelize(sc, list())
478+
actual <- collect(intersection(rdd, emptyRdd))
479+
expect_equal(actual, list())
480+
481+
rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
482+
rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
483+
actual <- collect(intersection(rdd1, rdd2))
484+
expect_equal(sort(as.integer(actual)), 1:3)
485+
})
486+
471487
test_that("join() on pairwise RDDs", {
472488
rdd1 <- parallelize(sc, list(list(1,1), list(2,4)))
473489
rdd2 <- parallelize(sc, list(list(1,2), list(1,3)))

pkg/man/intersection.Rd

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
% Generated by roxygen2 (4.0.2): do not edit by hand
2+
\docType{methods}
3+
\name{intersection,RDD,RDD-method}
4+
\alias{intersection}
5+
\alias{intersection,RDD}
6+
\alias{intersection,RDD,RDD-method}
7+
\title{Intersection of this RDD and another one.}
8+
\usage{
9+
\S4method{intersection}{RDD,RDD}(x, other,
10+
numPartitions = SparkR::numPartitions(x))
11+
12+
intersection(x, other, numPartitions = 1L)
13+
}
14+
\arguments{
15+
\item{x}{An RDD.}
16+
17+
\item{other}{An RDD.}
18+
}
19+
\value{
20+
An RDD which is the intersection of these two RDDs.
21+
}
22+
\description{
23+
Return the intersection of this RDD and another one.
24+
The output will not contain any duplicate elements,
25+
even if the input RDDs did. Performs a hash partition
26+
across the cluster.
27+
Note that this method performs a shuffle internally.
28+
}
29+
\examples{
30+
\dontrun{
31+
sc <- sparkR.init()
32+
rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
33+
rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
34+
collect(sortBy(intersection(rdd1, rdd2), function(x) { x }))
35+
# list(1, 2, 3)
36+
}
37+
}
38+

0 commit comments

Comments
 (0)