Skip to content

Commit 238cfc6

Browse files
committed
Merge remote-tracking branch 'upstream/master' into topic/streaming-bp/rate-controller
2 parents 34a389d + fb36397 commit 238cfc6

File tree

593 files changed

+25415
-6608
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

593 files changed

+25415
-6608
lines changed

R/install-dev.bat

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,8 @@ set SPARK_HOME=%~dp0..
2525
MKDIR %SPARK_HOME%\R\lib
2626

2727
R.exe CMD INSTALL --library="%SPARK_HOME%\R\lib" %SPARK_HOME%\R\pkg\
28+
29+
rem Zip the SparkR package so that it can be distributed to worker nodes on YARN
30+
pushd %SPARK_HOME%\R\lib
31+
%JAVA_HOME%\bin\jar.exe cfM "%SPARK_HOME%\R\lib\sparkr.zip" SparkR
32+
popd

R/install-dev.sh

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,16 @@ LIB_DIR="$FWDIR/lib"
3434

3535
mkdir -p $LIB_DIR
3636

37-
pushd $FWDIR
37+
pushd $FWDIR > /dev/null
3838

3939
# Generate Rd files if devtools is installed
4040
Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtools); devtools::document(pkg="./pkg", roclets=c("rd")) }'
4141

4242
# Install SparkR to $LIB_DIR
4343
R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/
4444

45-
popd
45+
# Zip the SparkR package so that it can be distributed to worker nodes on YARN
46+
cd $LIB_DIR
47+
jar cfM "$LIB_DIR/sparkr.zip" SparkR
48+
49+
popd > /dev/null

R/pkg/DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ Collate:
2929
'client.R'
3030
'context.R'
3131
'deserialize.R'
32+
'mllib.R'
3233
'serialize.R'
3334
'sparkR.R'
3435
'utils.R'
35-
'zzz.R'

R/pkg/NAMESPACE

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ export("sparkR.init")
1010
export("sparkR.stop")
1111
export("print.jobj")
1212

13+
# MLlib integration
14+
exportMethods("glm",
15+
"predict")
16+
1317
# Job group lifecycle management methods
1418
export("setJobGroup",
1519
"clearJobGroup",
@@ -22,6 +26,7 @@ exportMethods("arrange",
2226
"collect",
2327
"columns",
2428
"count",
29+
"crosstab",
2530
"describe",
2631
"distinct",
2732
"dropna",
@@ -77,6 +82,7 @@ exportMethods("abs",
7782
"atan",
7883
"atan2",
7984
"avg",
85+
"between",
8086
"cast",
8187
"cbrt",
8288
"ceiling",

R/pkg/R/DataFrame.R

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1314,7 +1314,7 @@ setMethod("except",
13141314
#' write.df(df, "myfile", "parquet", "overwrite")
13151315
#' }
13161316
setMethod("write.df",
1317-
signature(df = "DataFrame", path = 'character'),
1317+
signature(df = "DataFrame", path = "character"),
13181318
function(df, path, source = NULL, mode = "append", ...){
13191319
if (is.null(source)) {
13201320
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
@@ -1328,7 +1328,7 @@ setMethod("write.df",
13281328
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
13291329
options <- varargsToEnv(...)
13301330
if (!is.null(path)) {
1331-
options[['path']] = path
1331+
options[["path"]] <- path
13321332
}
13331333
callJMethod(df@sdf, "save", source, jmode, options)
13341334
})
@@ -1337,7 +1337,7 @@ setMethod("write.df",
13371337
#' @aliases saveDF
13381338
#' @export
13391339
setMethod("saveDF",
1340-
signature(df = "DataFrame", path = 'character'),
1340+
signature(df = "DataFrame", path = "character"),
13411341
function(df, path, source = NULL, mode = "append", ...){
13421342
write.df(df, path, source, mode, ...)
13431343
})
@@ -1375,8 +1375,8 @@ setMethod("saveDF",
13751375
#' saveAsTable(df, "myfile")
13761376
#' }
13771377
setMethod("saveAsTable",
1378-
signature(df = "DataFrame", tableName = 'character', source = 'character',
1379-
mode = 'character'),
1378+
signature(df = "DataFrame", tableName = "character", source = "character",
1379+
mode = "character"),
13801380
function(df, tableName, source = NULL, mode="append", ...){
13811381
if (is.null(source)) {
13821382
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
@@ -1554,3 +1554,31 @@ setMethod("fillna",
15541554
}
15551555
dataFrame(sdf)
15561556
})
1557+
1558+
#' crosstab
1559+
#'
1560+
#' Computes a pair-wise frequency table of the given columns. Also known as a contingency
1561+
#' table. The number of distinct values for each column should be less than 1e4. At most 1e6
1562+
#' non-zero pair frequencies will be returned.
1563+
#'
1564+
#' @param col1 name of the first column. Distinct items will make the first item of each row.
1565+
#' @param col2 name of the second column. Distinct items will make the column names of the output.
1566+
#' @return a local R data.frame representing the contingency table. The first column of each row
1567+
#' will be the distinct values of `col1` and the column names will be the distinct values
1568+
#' of `col2`. The name of the first column will be `$col1_$col2`. Pairs that have no
1569+
#' occurrences will have `null` as their counts.
1570+
#'
1571+
#' @rdname statfunctions
1572+
#' @export
1573+
#' @examples
1574+
#' \dontrun{
1575+
#' df <- jsonFile(sqlCtx, "/path/to/file.json")
1576+
#' ct = crosstab(df, "title", "gender")
1577+
#' }
1578+
setMethod("crosstab",
1579+
signature(x = "DataFrame", col1 = "character", col2 = "character"),
1580+
function(x, col1, col2) {
1581+
statFunctions <- callJMethod(x@sdf, "stat")
1582+
sct <- callJMethod(statFunctions, "crosstab", col1, col2)
1583+
collect(dataFrame(sct))
1584+
})

R/pkg/R/RDD.R

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,6 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
165165
serializedFuncArr,
166166
rdd@env$prev_serializedMode,
167167
packageNamesArr,
168-
as.character(.sparkREnv[["libname"]]),
169168
broadcastArr,
170169
callJMethod(prev_jrdd, "classTag"))
171170
} else {
@@ -175,7 +174,6 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
175174
rdd@env$prev_serializedMode,
176175
serializedMode,
177176
packageNamesArr,
178-
as.character(.sparkREnv[["libname"]]),
179177
broadcastArr,
180178
callJMethod(prev_jrdd, "classTag"))
181179
}

R/pkg/R/SQLContext.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ dropTempTable <- function(sqlContext, tableName) {
457457
read.df <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) {
458458
options <- varargsToEnv(...)
459459
if (!is.null(path)) {
460-
options[['path']] <- path
460+
options[["path"]] <- path
461461
}
462462
if (is.null(source)) {
463463
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
@@ -506,7 +506,7 @@ loadDF <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) {
506506
createExternalTable <- function(sqlContext, tableName, path = NULL, source = NULL, ...) {
507507
options <- varargsToEnv(...)
508508
if (!is.null(path)) {
509-
options[['path']] <- path
509+
options[["path"]] <- path
510510
}
511511
sdf <- callJMethod(sqlContext, "createExternalTable", tableName, source, options)
512512
dataFrame(sdf)

R/pkg/R/client.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ connectBackend <- function(hostname, port, timeout = 6000) {
3636

3737
determineSparkSubmitBin <- function() {
3838
if (.Platform$OS.type == "unix") {
39-
sparkSubmitBinName = "spark-submit"
39+
sparkSubmitBinName <- "spark-submit"
4040
} else {
41-
sparkSubmitBinName = "spark-submit.cmd"
41+
sparkSubmitBinName <- "spark-submit.cmd"
4242
}
4343
sparkSubmitBinName
4444
}

R/pkg/R/column.R

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,23 @@ setMethod("substr", signature(x = "Column"),
187187
column(jc)
188188
})
189189

190+
#' between
191+
#'
192+
#' Test if the column is between the lower bound and upper bound, inclusive.
193+
#'
194+
#' @rdname column
195+
#'
196+
#' @param bounds lower and upper bounds
197+
setMethod("between", signature(x = "Column"),
198+
function(x, bounds) {
199+
if (is.vector(bounds) && length(bounds) == 2) {
200+
jc <- callJMethod(x@jc, "between", bounds[1], bounds[2])
201+
column(jc)
202+
} else {
203+
stop("bounds should be a vector of lower and upper bounds")
204+
}
205+
})
206+
190207
#' Casts the column to a different data type.
191208
#'
192209
#' @rdname column

R/pkg/R/deserialize.R

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
# Int -> integer
2424
# String -> character
2525
# Boolean -> logical
26+
# Float -> double
2627
# Double -> double
2728
# Long -> double
2829
# Array[Byte] -> raw

R/pkg/R/generics.R

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ setGeneric("count", function(x) { standardGeneric("count") })
5959
# @export
6060
setGeneric("countByValue", function(x) { standardGeneric("countByValue") })
6161

62+
# @rdname statfunctions
63+
# @export
64+
setGeneric("crosstab", function(x, col1, col2) { standardGeneric("crosstab") })
65+
6266
# @rdname distinct
6367
# @export
6468
setGeneric("distinct", function(x, numPartitions = 1) { standardGeneric("distinct") })
@@ -567,6 +571,10 @@ setGeneric("asc", function(x) { standardGeneric("asc") })
567571
#' @export
568572
setGeneric("avg", function(x, ...) { standardGeneric("avg") })
569573

574+
#' @rdname column
575+
#' @export
576+
setGeneric("between", function(x, bounds) { standardGeneric("between") })
577+
570578
#' @rdname column
571579
#' @export
572580
setGeneric("cast", function(x, dataType) { standardGeneric("cast") })
@@ -657,3 +665,7 @@ setGeneric("toRadians", function(x) { standardGeneric("toRadians") })
657665
#' @rdname column
658666
#' @export
659667
setGeneric("upper", function(x) { standardGeneric("upper") })
668+
669+
#' @rdname glm
670+
#' @export
671+
setGeneric("glm")

R/pkg/R/group.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ setMethod("count",
8787
setMethod("agg",
8888
signature(x = "GroupedData"),
8989
function(x, ...) {
90-
cols = list(...)
90+
cols <- list(...)
9191
stopifnot(length(cols) > 0)
9292
if (is.character(cols[[1]])) {
9393
cols <- varargsToEnv(...)
@@ -97,7 +97,7 @@ setMethod("agg",
9797
if (!is.null(ns)) {
9898
for (n in ns) {
9999
if (n != "") {
100-
cols[[n]] = alias(cols[[n]], n)
100+
cols[[n]] <- alias(cols[[n]], n)
101101
}
102102
}
103103
}

R/pkg/R/mllib.R

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
# mllib.R: Provides methods for MLlib integration
19+
20+
#' @title S4 class that represents a PipelineModel
21+
#' @param model A Java object reference to the backing Scala PipelineModel
22+
#' @export
23+
setClass("PipelineModel", representation(model = "jobj"))
24+
25+
#' Fits a generalized linear model
26+
#'
27+
#' Fits a generalized linear model, similarly to R's glm(). Also see the glmnet package.
28+
#'
29+
#' @param formula A symbolic description of the model to be fitted. Currently only a few formula
30+
#' operators are supported, including '~' and '+'.
31+
#' @param data DataFrame for training
32+
#' @param family Error distribution. "gaussian" -> linear regression, "binomial" -> logistic reg.
33+
#' @param lambda Regularization parameter
34+
#' @param alpha Elastic-net mixing parameter (see glmnet's documentation for details)
35+
#' @return a fitted MLlib model
36+
#' @rdname glm
37+
#' @export
38+
#' @examples
39+
#'\dontrun{
40+
#' sc <- sparkR.init()
41+
#' sqlContext <- sparkRSQL.init(sc)
42+
#' data(iris)
43+
#' df <- createDataFrame(sqlContext, iris)
44+
#' model <- glm(Sepal_Length ~ Sepal_Width, df)
45+
#'}
46+
setMethod("glm", signature(formula = "formula", family = "ANY", data = "DataFrame"),
47+
function(formula, family = c("gaussian", "binomial"), data, lambda = 0, alpha = 0) {
48+
family <- match.arg(family)
49+
model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
50+
"fitRModelFormula", deparse(formula), data@sdf, family, lambda,
51+
alpha)
52+
return(new("PipelineModel", model = model))
53+
})
54+
55+
#' Make predictions from a model
56+
#'
57+
#' Makes predictions from a model produced by glm(), similarly to R's predict().
58+
#'
59+
#' @param model A fitted MLlib model
60+
#' @param newData DataFrame for testing
61+
#' @return DataFrame containing predicted values
62+
#' @rdname glm
63+
#' @export
64+
#' @examples
65+
#'\dontrun{
66+
#' model <- glm(y ~ x, trainingData)
67+
#' predicted <- predict(model, testData)
68+
#' showDF(predicted)
69+
#'}
70+
setMethod("predict", signature(object = "PipelineModel"),
71+
function(object, newData) {
72+
return(dataFrame(callJMethod(object@model, "transform", newData@sdf)))
73+
})

R/pkg/R/pairRDD.R

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,6 @@ setMethod("partitionBy",
215215
serializedHashFuncBytes,
216216
getSerializedMode(x),
217217
packageNamesArr,
218-
as.character(.sparkREnv$libname),
219218
broadcastArr,
220219
callJMethod(jrdd, "classTag"))
221220

R/pkg/R/schema.R

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,14 @@ structType.structField <- function(x, ...) {
6969
#' @param ... further arguments passed to or from other methods
7070
print.structType <- function(x, ...) {
7171
cat("StructType\n",
72-
sapply(x$fields(), function(field) { paste("|-", "name = \"", field$name(),
73-
"\", type = \"", field$dataType.toString(),
74-
"\", nullable = ", field$nullable(), "\n",
75-
sep = "") })
76-
, sep = "")
72+
sapply(x$fields(),
73+
function(field) {
74+
paste("|-", "name = \"", field$name(),
75+
"\", type = \"", field$dataType.toString(),
76+
"\", nullable = ", field$nullable(), "\n",
77+
sep = "")
78+
}),
79+
sep = "")
7780
}
7881

7982
#' structField
@@ -123,6 +126,7 @@ structField.character <- function(x, type, nullable = TRUE) {
123126
}
124127
options <- c("byte",
125128
"integer",
129+
"float",
126130
"double",
127131
"numeric",
128132
"character",

R/pkg/R/serialize.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,8 @@ writeType <- function(con, class) {
140140
jobj = "j",
141141
environment = "e",
142142
Date = "D",
143-
POSIXlt = 't',
144-
POSIXct = 't',
143+
POSIXlt = "t",
144+
POSIXct = "t",
145145
stop(paste("Unsupported type for serialization", class)))
146146
writeBin(charToRaw(type), con)
147147
}

0 commit comments

Comments
 (0)