Skip to content

Commit

Permalink
Merge pull request apache#102 from hlin09/hlin09
Browse files Browse the repository at this point in the history
Adds function flatMapValues.
  • Loading branch information
shivaram committed Nov 11, 2014
2 parents 32eb619 + a17f135 commit a21f146
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 1 deletion.
1 change: 1 addition & 0 deletions pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ exportMethods(
"Filter",
"filter",
"flatMap",
"flatMapValues",
"groupByKey",
"keys",
"length",
Expand Down
33 changes: 32 additions & 1 deletion pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,8 @@ setMethod("flatMap",
function(X, FUN) {
partitionFunc <- function(part) {
unlist(
lapply(part, FUN)
lapply(part, FUN),
recursive = F
)
}
lapplyPartition(X, partitionFunc)
Expand Down Expand Up @@ -934,6 +935,36 @@ setMethod("mapValues",
lapply(X, func)
})

#' Pass each value in the key-value pair RDD through a flatMap function without
#' changing the keys; this also retains the original RDD's partitioning.
#'
#' The same as 'flatMapValues()' in Spark.
#'
#' @param X The RDD to apply the transformation.
#' @param FUN the transformation to apply on the value of each element.
#' @return a new RDD created by the transformation.
#' @rdname flatMapValues
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(list(1, c(1,2)), list(2, c(3,4))))
#' collect(flatMapValues(rdd, function(x) { x }))
#' Output: list(list(1,1), list(1,2), list(2,3), list(2,4))
#'}
setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues") })

#' @rdname flatMapValues
#' @aliases flatMapValues,RDD,function-method
setMethod("flatMapValues",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
flatMapFunc <- function(x) {
lapply(FUN(x[[2]]), function(v) { list(x[[1]], v) })
}
flatMap(X, flatMapFunc)
})

############ Shuffle Functions ############

#' Partition an RDD by key
Expand Down
18 changes: 18 additions & 0 deletions pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ test_that("mapPartitions on RDD", {
expect_equal(actual, list(15, 40))
})

test_that("flatMap() on RDDs", {
flat <- flatMap(intRdd, function(x) { list(x, x) })
actual <- collect(flat)
expect_equal(actual, rep(intPairs, each=2))
})

test_that("Filter on RDD", {
filtered.rdd <- Filter(function(x) { x %% 2 == 0 }, rdd)
actual <- collect(filtered.rdd)
Expand Down Expand Up @@ -186,6 +192,18 @@ test_that("mapValues() on pairwise RDDs", {
}))
})

test_that("flatMapValues() on pairwise RDDs", {
l <- parallelize(sc, list(list(1, c(1,2)), list(2, c(3,4))))
actual <- collect(flatMapValues(l, function(x) { x }))
expect_equal(actual, list(list(1,1), list(1,2), list(2,3), list(2,4)))

# Generate x to x+1 for every value
actual <- collect(flatMapValues(intRdd, function(x) { x:(x + 1) }))
expect_equal(actual,
list(list(1L, -1), list(1L, 0), list(2L, 100), list(2L, 101),
list(2L, 1), list(2L, 2), list(1L, 200), list(1L, 201)))
})

test_that("distinct() on RDDs", {
nums.rep2 <- rep(1:10, 2)
rdd.rep2 <- parallelize(sc, nums.rep2, 2L)
Expand Down
32 changes: 32 additions & 0 deletions pkg/man/flatMapValues.Rd
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
% Generated by roxygen2 (4.0.2): do not edit by hand
\docType{methods}
\name{flatMapValues}
\alias{flatMapValues}
\alias{flatMapValues,RDD,function-method}
\title{Pass each value in the key-value pair RDD through a flatMap function without
changing the keys; this also retains the original RDD's partitioning.}
\usage{
flatMapValues(X, FUN)
\S4method{flatMapValues}{RDD,`function`}(X, FUN)
}
\arguments{
\item{X}{The RDD to apply the transformation.}
\item{FUN}{the transformation to apply on the value of each element.}
}
\value{
a new RDD created by the transformation.
}
\description{
The same as 'flatMapValues()' in Spark.
}
\examples{
\dontrun{
sc <- sparkR.init()
rdd <- parallelize(sc, list(list(1, c(1,2)), list(2, c(3,4))))
collect(flatMapValues(rdd, function(x) { x }))
Output: list(list(1,1), list(1,2), list(2,3), list(2,4))
}
}

0 comments on commit a21f146

Please sign in to comment.