From 4bcf59b1b4c330b6e158aa161d1dfafb6f7c6033 Mon Sep 17 00:00:00 2001 From: hlin09 Date: Fri, 7 Nov 2014 20:23:16 -0500 Subject: [PATCH 1/2] Adds function flatMapValues. --- pkg/NAMESPACE | 1 + pkg/R/RDD.R | 33 ++++++++++++++++++++++++++++++++- pkg/inst/tests/test_rdd.R | 6 ++++++ pkg/man/flatMapValues.Rd | 32 ++++++++++++++++++++++++++++++++ 4 files changed, 71 insertions(+), 1 deletion(-) create mode 100644 pkg/man/flatMapValues.Rd diff --git a/pkg/NAMESPACE b/pkg/NAMESPACE index 9e825b6d3d4cd..21b0356681712 100644 --- a/pkg/NAMESPACE +++ b/pkg/NAMESPACE @@ -13,6 +13,7 @@ exportMethods( "Filter", "filter", "flatMap", + "flatMapValues", "groupByKey", "length", "lapply", diff --git a/pkg/R/RDD.R b/pkg/R/RDD.R index e508cc0708d39..e040763f35023 100644 --- a/pkg/R/RDD.R +++ b/pkg/R/RDD.R @@ -426,7 +426,8 @@ setMethod("flatMap", function(X, FUN) { partitionFunc <- function(part) { unlist( - lapply(part, FUN) + lapply(part, FUN), + recursive = F ) } lapplyPartition(X, partitionFunc) @@ -886,6 +887,36 @@ setMethod("mapValues", lapply(X, func) }) +#' Pass each value in the key-value pair RDD through a flatMap function without +#' changing the keys; this also retains the original RDD's partitioning. +#' +#' The same as 'flatMapValues()' in Spark. +#' +#' @param X The RDD to apply the transformation. +#' @param FUN the transformation to apply on the value of each element. +#' @return a new RDD created by the transformation. +#' @rdname flatMapValues +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' rdd <- parallelize(sc, list(list(1, c(1,2)), list(2, c(3,4)))) +#' collect(flatMapValues(rdd, function(x) { x })) +#' Output: list(list(1,1), list(1,2), list(2,3), list(2,4)) +#'} +setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues") }) + +#' @rdname flatMapValues +#' @aliases flatMapValues,RDD,function-method +setMethod("flatMapValues", + signature(X = "RDD", FUN = "function"), + function(X, FUN) { + flatMapFunc <- function(x) { + lapply(FUN(x[[2]]), function(v) { list(x[[1]], v) }) + } + flatMap(X, flatMapFunc) + }) + ############ Shuffle Functions ############ #' Partition an RDD by key diff --git a/pkg/inst/tests/test_rdd.R b/pkg/inst/tests/test_rdd.R index 0c48d14e24f66..79f40dfa70d89 100644 --- a/pkg/inst/tests/test_rdd.R +++ b/pkg/inst/tests/test_rdd.R @@ -186,6 +186,12 @@ test_that("mapValues() on pairwise RDDs", { })) }) +test_that("flatMapValues() on pairwise RDDs", { + l <- parallelize(sc, list(list(1, c(1,2)), list(2, c(3,4)))) + actual <- collect(flatMapValues(l, function(x) { x })) + expect_equal(actual, list(list(1,1), list(1,2), list(2,3), list(2,4))) +}) + test_that("distinct() on RDDs", { nums.rep2 <- rep(1:10, 2) rdd.rep2 <- parallelize(sc, nums.rep2, 2L) diff --git a/pkg/man/flatMapValues.Rd b/pkg/man/flatMapValues.Rd new file mode 100644 index 0000000000000..03112204a9231 --- /dev/null +++ b/pkg/man/flatMapValues.Rd @@ -0,0 +1,32 @@ +% Generated by roxygen2 (4.0.2): do not edit by hand +\docType{methods} +\name{flatMapValues} +\alias{flatMapValues} +\alias{flatMapValues,RDD,function-method} +\title{Pass each value in the key-value pair RDD through a flatMap function without +changing the keys; this also retains the original RDD's partitioning.} +\usage{ +flatMapValues(X, FUN) + +\S4method{flatMapValues}{RDD,`function`}(X, FUN) +} +\arguments{ +\item{X}{The RDD to apply the transformation.} + +\item{FUN}{the transformation to apply on the value of each element.} +} +\value{ +a new RDD created by the transformation. +} +\description{ +The same as 'flatMapValues()' in Spark. +} +\examples{ +\dontrun{ +sc <- sparkR.init() +rdd <- parallelize(sc, list(list(1, c(1,2)), list(2, c(3,4)))) +collect(flatMapValues(rdd, function(x) { x })) +Output: list(list(1,1), list(1,2), list(2,3), list(2,4)) +} +} + From a17f135ce62416329db0ed88f919cdbd1fe0b010 Mon Sep 17 00:00:00 2001 From: hlin09 Date: Sun, 9 Nov 2014 10:01:56 -0500 Subject: [PATCH 2/2] Adds tests for flatMap and flatMapValues. --- pkg/inst/tests/test_rdd.R | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pkg/inst/tests/test_rdd.R b/pkg/inst/tests/test_rdd.R index 79f40dfa70d89..cf3051875cd03 100644 --- a/pkg/inst/tests/test_rdd.R +++ b/pkg/inst/tests/test_rdd.R @@ -33,6 +33,12 @@ test_that("mapPartitions on RDD", { expect_equal(actual, list(15, 40)) }) +test_that("flatMap() on RDDs", { + flat <- flatMap(intRdd, function(x) { list(x, x) }) + actual <- collect(flat) + expect_equal(actual, rep(intPairs, each=2)) +}) + test_that("Filter on RDD", { filtered.rdd <- Filter(function(x) { x %% 2 == 0 }, rdd) actual <- collect(filtered.rdd) @@ -190,6 +196,12 @@ test_that("flatMapValues() on pairwise RDDs", { l <- parallelize(sc, list(list(1, c(1,2)), list(2, c(3,4)))) actual <- collect(flatMapValues(l, function(x) { x })) expect_equal(actual, list(list(1,1), list(1,2), list(2,3), list(2,4))) + + # Generate x to x+1 for every value + actual <- collect(flatMapValues(intRdd, function(x) { x:(x + 1) })) + expect_equal(actual, + list(list(1L, -1), list(1L, 0), list(2L, 100), list(2L, 101), + list(2L, 1), list(2L, 2), list(1L, 200), list(1L, 201))) }) test_that("distinct() on RDDs", {