Skip to content

Commit 384e6e2

Browse files
committed
Merge pull request apache#171 from hlin09/hlin09
[SPARKR-159] Adds support of pipeRDD().
2 parents 2271030 + 1f5a6ac commit 384e6e2

File tree

4 files changed

+89
-0
lines changed

4 files changed

+89
-0
lines changed

pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ exportMethods(
4141
"numPartitions",
4242
"partitionBy",
4343
"persist",
44+
"pipeRDD",
4445
"reduce",
4546
"reduceByKey",
4647
"reduceByKeyLocally",

pkg/R/RDD.R

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1275,6 +1275,43 @@ setMethod("aggregateRDD",
12751275
Reduce(combOp, partitionList, zeroValue)
12761276
})
12771277

1278+
#' Pipes elements to a forked external process.
1279+
#'
1280+
#' The same as 'pipe()' in Spark.
1281+
#'
1282+
#' @param rdd The RDD whose elements are piped to the forked external process.
1283+
#' @param command The command to fork an external process.
1284+
#' @param env A named list to set environment variables of the external process.
1285+
#' @return A new RDD created by piping all elements to a forked external process.
1286+
#' @rdname pipeRDD
1287+
#' @export
1288+
#' @examples
1289+
#'\dontrun{
1290+
#' sc <- sparkR.init()
1291+
#' rdd <- parallelize(sc, 1:10)
1292+
#' collect(pipeRDD(rdd, "more")
1293+
#' Output: c("1", "2", ..., "10")
1294+
#'}
1295+
setGeneric("pipeRDD", function(rdd, command, env = list()) {
1296+
standardGeneric("pipeRDD")
1297+
})
1298+
1299+
#' @rdname pipeRDD
1300+
#' @aliases pipeRDD,RDD,character-method
1301+
setMethod("pipeRDD",
1302+
signature(rdd = "RDD", command = "character"),
1303+
function(rdd, command, env = list()) {
1304+
func <- function(part) {
1305+
trim.trailing.func <- function(x) {
1306+
sub("[\r\n]*$", "", toString(x))
1307+
}
1308+
input <- unlist(lapply(part, trim.trailing.func))
1309+
res <- system2(command, stdout = TRUE, input = input, env = env)
1310+
lapply(res, trim.trailing.func)
1311+
}
1312+
lapplyPartition(rdd, func)
1313+
})
1314+
12781315
# TODO: Consider caching the name in the RDD's environment
12791316
#' Return an RDD's name.
12801317
#'

pkg/inst/tests/test_rdd.R

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,23 @@ test_that("values() on RDDs", {
336336
expect_equal(actual, lapply(intPairs, function(x) { x[[2]] }))
337337
})
338338

339+
test_that("pipeRDD() on RDDs", {
340+
actual <- collect(pipeRDD(rdd, "more"))
341+
expected <- as.list(as.character(1:10))
342+
expect_equal(actual, expected)
343+
344+
trailed.rdd <- parallelize(sc, c("1", "", "2\n", "3\n\r\n"))
345+
actual <- collect(pipeRDD(trailed.rdd, "sort"))
346+
expected <- list("", "1", "2", "3")
347+
expect_equal(actual, expected)
348+
349+
rev.nums <- 9:0
350+
rev.rdd <- parallelize(sc, rev.nums, 2L)
351+
actual <- collect(pipeRDD(rev.rdd, "sort"))
352+
expected <- as.list(as.character(c(5:9, 0:4)))
353+
expect_equal(actual, expected)
354+
})
355+
339356
test_that("join() on pairwise RDDs", {
340357
rdd1 <- parallelize(sc, list(list(1,1), list(2,4)))
341358
rdd2 <- parallelize(sc, list(list(1,2), list(1,3)))

pkg/man/pipeRDD.Rd

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
% Generated by roxygen2 (4.1.0): do not edit by hand
2+
% Please edit documentation in R/RDD.R
3+
\docType{methods}
4+
\name{pipeRDD}
5+
\alias{pipeRDD}
6+
\alias{pipeRDD,RDD,character-method}
7+
\title{Pipes elements to a forked external process.}
8+
\usage{
9+
pipeRDD(rdd, command, env = list())
10+
11+
\S4method{pipeRDD}{RDD,character}(rdd, command, env = list())
12+
}
13+
\arguments{
14+
\item{rdd}{The RDD whose elements are piped to the forked external process.}
15+
16+
\item{command}{The command to fork an external process.}
17+
18+
\item{env}{A named list to set environment variables of the external process.}
19+
}
20+
\value{
21+
A new RDD created by piping all elements to a forked external process.
22+
}
23+
\description{
24+
The same as 'pipe()' in Spark.
25+
}
26+
\examples{
27+
\dontrun{
28+
sc <- sparkR.init()
29+
rdd <- parallelize(sc, 1:10)
30+
collect(pipeRDD(rdd, "more")
31+
Output: c("1", "2", ..., "10")
32+
}
33+
}
34+

0 commit comments

Comments
 (0)